Netty應用示例(四)websocket應用示例
1、server端實現
原始碼實現:
public void run(int port){ EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); try{ ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { ChannelPipeline pipeline = sc.pipeline(); // 服務端,對請求解碼 pipeline.addLast("http-codec", new HttpServerCodec()); // 聚合器,把多個訊息轉換為一個單一的FullHttpRequest或是FullHttpResponse pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); // 塊寫入處理器 pipeline.addLast("http-chunked",new ChunkedWriteHandler()); //自定義處理器 pipeline.addLast("handler", new WebSocketServerHandler()); } }); ChannelFuture cf = b.bind(port).sync(); System.out.println("Web socket server started at port " + port + '.'); System.out.println("Open your browser and navigate to http://localhost:" + port + '/'); cf.channel().closeFuture().sync(); }catch(Exception e){ e.printStackTrace(); }finally{ bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } }
- HttpServerCodec:將請求和應答訊息解碼為HTTP訊息
- HttpObjectAggregator:將HTTP訊息的多個部分合成一條完整的HTTP訊息
- ChunkedWriteHandler:向客戶端傳送HTML5檔案
2、WebsocketHandler實現
原始碼實現:
public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> { private static final Logger logger = Logger.getLogger(WebSocketServerHandler.class.getName()); private WebSocketServerHandshaker handshaker; @Override protected void messageReceived(ChannelHandlerContext ctx, Object obj) throws Exception { // 傳統的HTTP接入 if(obj instanceof FullHttpRequest){ handleHttpRequest(ctx,(FullHttpRequest)obj); } // WebSocket接入 else if(obj instanceof WebSocketFrame){ handleWebSocketFrame(ctx,(WebSocketFrame)obj); } } private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception{ System.out.println("handleHttpRequest"); // 如果HTTP解碼失敗,返回HHTP異常 if(!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))){ sendHttpResponse(ctx,req,new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); } // 構造握手響應返回,本機測試 WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://localhost:8080/websocket", null, false); handshaker =wsFactory.newHandshaker(req); if(handshaker==null){ WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); }else{ handshaker.handshake(ctx.channel(), req); } } private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame){ // 判斷是否是關閉鏈路的指令 if(frame instanceof CloseWebSocketFrame){ handshaker.close(ctx.channel(), (CloseWebSocketFrame)frame.retain()); return; } // 判斷是否是Ping訊息 if(frame instanceof PingWebSocketFrame){ ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return; } // 本例程僅支援文字訊息,不支援二進位制訊息 if(!(frame instanceof TextWebSocketFrame)){ throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName())); } // 返回應答訊息 String request = ((TextWebSocketFrame)frame).text(); if(logger.isLoggable(Level.FINE)){ logger.fine(String.format("%s received %s", ctx.channel(), request)); } ctx.channel().write(new TextWebSocketFrame(request+ " , 歡迎使用Netty WebSocket服務,現在時刻:"+ new java.util.Date().toString())); } private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res){ // 返回應答給客戶端 if(res.status().code() !=200){ ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); try { Http2Headers http2Headers = HttpUtil.toHttp2Headers(res); http2Headers.addLong(HttpHeaderNames.CONTENT_LENGTH, res.content().readableBytes()); }catch (Exception e){ e.printStackTrace(); } } // 如果是非Keep-Alive,關閉連線 ChannelFuture cf = ctx.channel().writeAndFlush(res); try { Http2Headers http2Headers = HttpUtil.toHttp2Headers(res); Boolean keepAlive = Boolean.valueOf(http2Headers.get(HttpHeaderNames.CONTENT_LENGTH).toString()); if(!keepAlive || res.status().code() != 200){ cf.addListener(ChannelFutureListener.CLOSE); } }catch (Exception e){ e.printStackTrace(); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
鏈路建立成功之前:
- 第一次握手由HTTP協議承載,所以是一個HTTP訊息,根據訊息頭中是否包含"Upgrade"欄位來判斷是否是websocket。
- 通過校驗後,構造WebSocketServerHandshaker,通過它構造握手響應資訊返回給客戶端,同時將WebSocket相關的編碼和解碼類動態新增到ChannelPipeline中。
鏈路建立成功之後:
- 客戶端通過文字框提交請求給服務端,Handler收到之後已經解碼之後的WebSocketFrame訊息。
- 如果是關閉按鏈路的指令就關閉鏈路
- 如果是維持鏈路的ping訊息就返回Pong訊息。
- 否則就返回應答訊息
3、測試結果
websocket測試工具:
http://coolaf.com/tool/chattest ,在瀏覽器中輸以上網址,可以看到如下介面:

測試工具.png
服務端輸出如下:

服務端輸出.png
客戶端傳送資料結果如下:

客戶端傳送資料.png