Netty網路聊天(一) 聊天室的實戰(最易懂)
之前做過一個IM的專案,裡面涉及了基本的聊天功能,所以注意這系列的文章不是練習,不含基礎和逐步學習的部分,直接開始實戰和思想引導,基礎部分需要額外的去補充,我有精力的話可以後續出一系列的文章。
為什麼第一篇是聊天室,聊天室是最容易實現的部分。也是IM結構最簡單的一部分,其次作單聊和群聊,業務邏輯層層遞增,徹底的拿下聊天室的程式碼,進階單聊和群聊就很簡單了,後續我還會推出直播間的實現。
如果單純想實現聊天室很簡單,但是我儘量會把流程都走全,為了方便理解。
主要由兩個功能類實現:初始化類+響應處理類
0. 準備工作
新增pom.xml
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.2.Final</version> </dependency> 複製程式碼
1. 輔助介面實現
輔助介面讓服務架構更加清晰,這裡有兩個介面,一個用來處理Http請求,一個處理Webocket請求。
MyHttpService.java
/** * 處理 http請求 */ public interface MyHttpService { void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request); } 複製程式碼
MySocket/">WebSocketService.java
/** * 處理 WebSocket 請求中的frame */ public interface MyWebSocketService { void handleFrame(ChannelHandlerContext ctx, WebSocketFrame frame); } 複製程式碼
那麼問題來了,誰來實現這兩個類呢,誰來處理這兩種請求的分發呢。
下面來看服務響應處理類:WebSocketServerHandler.java
2. 請求處理類
繼承SimpleChannelInboundHandler類,實現 channelRead0()
handlerAdded()
handlerRemoved()
exceptionCaught()
等方法,第一個是必選方法,其他方法供我們做一些標記和後續處理。
WebSocketServerHandler.java
@Slf4j public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> { private MyHttpService httpService; private MyWebSocketService webSocketService; public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); public WebSocketServerHandler(MyHttpService httpService, MyWebSocketService webSocketService) { super(); this.httpService = httpService; this.webSocketService = webSocketService; } @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof FullHttpRequest) { httpService.handleHttpRequest(ctx, (FullHttpRequest) msg); } else if (msg instanceof WebSocketFrame) { webSocketService.handleFrame(ctx, (WebSocketFrame) msg); } } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { channels.add(ctx.channel()); channels.writeAndFlush(new TextWebSocketFrame(ctx.channel() +"上線了")); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { channels.remove(ctx.channel()); channels.writeAndFlush(new TextWebSocketFrame(ctx.channel() +"下線了")); } /** * 發生異常時處理 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { channels.remove(ctx.channel()); ctx.close(); log.info("異常資訊:{}",cause.getMessage()); } } 複製程式碼
- 建立ChannelGroup,來儲存每個已經建立連線的channel,在
handlerAdded()
方法中channels.add(ctx.channel());
,相應的在handlerRemoved
方法中remove
。 - 在
channelRead0()
方法中,實現了對請求的識別和分別處理。 -
exceptionCaught()
方法為發生異常時處理。
3. 初始化類實現
@Slf4j public class WebSocketServer implements MyHttpService, MyWebSocketService { /** * 握手用的 變數 */ private static final AttributeKey<WebSocketServerHandshaker> ATTR_HAND_SHAKER = AttributeKey.newInstance("ATTR_KEY_CHANNEL_ID"); private static final int MAX_CONTENT_LENGTH = 65536; /** * 請求型別常量 */ private static final String WEBSOCKET_UPGRADE = "websocket"; private static final String WEBSOCKET_CONNECTION = "Upgrade"; private static final String WEBSOCKET_URI_ROOT_PATTERN = "ws://%s:%d"; /** * 使用者欄位 */ private String host; private int port; /** * 儲存 所有的連線 */ private Map<ChannelId, Channel> channelMap = new HashMap<>(); private final String WEBSOCKET_URI_ROOT; public WebSocketServer(String host, int port) { this.host = host; this.port = port; // 將 ip 和埠 按照格式 賦值給 uri WEBSOCKET_URI_ROOT = String.format(WEBSOCKET_URI_ROOT_PATTERN, host, port); } public void start(){ // 例項化 nio監聽事件池 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 例項化 nio工作執行緒池 EventLoopGroup workerGroup = new NioEventLoopGroup(); // 啟動器 ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup,workerGroup); bootstrap.channel(NioServerSocketChannel.class); bootstrap.childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pl = channel.pipeline(); // 儲存 該channel 到map中 channelMap.put(channel.id(),channel); log.info("new channel {}",channel); channel.closeFuture().addListener((ChannelFutureListener) channelFuture -> { log.info("channel close future{}",channelFuture); //關閉後 從map中移除 channelMap.remove(channelFuture.channel().id()); }); //新增 http 編解碼 pl.addLast(new HttpServerCodec()); // 聚合器 pl.addLast(new HttpObjectAggregator(MAX_CONTENT_LENGTH)); // 支援大資料流 pl.addLast(new ChunkedWriteHandler()); // 設定 websocket 服務處理方式 pl.addLast(new WebSocketServerHandler(WebSocketServer.this, WebSocketServer.this)); } }); /** * 例項化完畢後,需要完成埠繫結 */ try { ChannelFuture channelFuture = bootstrap.bind(host,port).addListener((ChannelFutureListener) channelFuture1 -> { if (channelFuture1.isSuccess()){ log.info("webSocket started"); } }).sync(); channelFuture.channel().closeFuture().addListener((ChannelFutureListener) channelFuture12 -> log.info("server channel {} closed.", channelFuture12.channel())).sync(); } catch (InterruptedException e) { e.printStackTrace(); System.out.println("繫結埠失敗"); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } log.info("webSocket shutdown"); } @Override public void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) { //判斷是不是 socket 請求 if (isWebSocketUpgrade(request)){ //如果是webSocket請求 log.info("請求是webSocket協議"); // 獲取子協議 String subProtocols = request.headers().get(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL); //握手工廠 設定 uri+協議+不允許擴充套件 WebSocketServerHandshakerFactory handshakerFactory = new WebSocketServerHandshakerFactory(WEBSOCKET_URI_ROOT,subProtocols,false); // 從工廠中例項化一個 握手請求 WebSocketServerHandshaker handshaker = handshakerFactory.newHandshaker(request); if (handshaker == null){ //握手失敗:不支援的協議 WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); }else { //響應請求:將 握手轉交給 channel處理 handshaker.handshake(ctx.channel(),request); //將 channel 與 handshaker 繫結 ctx.channel().attr(ATTR_HAND_SHAKER).set(handshaker); } return; }else { // 不處理 HTTP 請求 log.info("不處理 HTTP 請求"); } } @Override public void handleFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { /** * text frame handler */ if (frame instanceof TextWebSocketFrame){ String text = ((TextWebSocketFrame) frame).text(); TextWebSocketFrame textWebSocketFrame = new TextWebSocketFrame(text); log.info("receive textWebSocketFrame from channel: {} , 目前一共有{}個線上",ctx.channel(),channelMap.size()); //發給其它的 channel(群聊功能) for (Channel ch: channelMap.values()){ if (ch.equals(ctx.channel())){ continue; } //將 text frame 寫出 ch.writeAndFlush(textWebSocketFrame); log.info("訊息已傳送給{}",ch); log.info("write text: {} to channel: {}",textWebSocketFrame,ctx.channel()); } return; } /** * ping frame , 回覆pong frame */ if (frame instanceof PingWebSocketFrame){ log.info("receive pingWebSocket from channel: {}",ctx.channel()); ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain())); return; } /** * pong frame, do nothing */ if (frame instanceof PongWebSocketFrame){ log.info("receive pongWebSocket from channel: {}",ctx.channel()); return; } /** * close frame, close */ if (frame instanceof CloseWebSocketFrame){ log.info("receive closeWebSocketFrame from channel: {}", ctx.channel()); //獲取到握手資訊 WebSocketServerHandshaker handshaker = ctx.channel().attr(ATTR_HAND_SHAKER).get(); if (handshaker == null){ log.error("channel: {} has no handShaker", ctx.channel()); return; } handshaker.close(ctx.channel(),((CloseWebSocketFrame) frame).retain()); return; } /** * 剩下的都是 二進位制 frame ,忽略 */ log.warn("receive binary frame , ignore to handle"); } /** * 判斷是否是 webSocket 請求 */ private boolean isWebSocketUpgrade(FullHttpRequest req) { HttpHeaders headers = req.headers(); return req.method().equals(HttpMethod.GET) && headers.get(HttpHeaderNames.UPGRADE).contains(WEBSOCKET_UPGRADE) && headers.get(HttpHeaderNames.CONNECTION).contains(WEBSOCKET_CONNECTION); } } 複製程式碼
-
l.addLast(new WebSocketServerHandler(WebSocketServer.this, WebSocketServer.this));
新增自己的響應處理。WebSocketServerHandler
是第二點實現的請求處理類. -
private Map<ChannelId, Channel> channelMap = new HashMap<>();
來將ChannelId和CHannel對應儲存。方便後來對應獲取。 -
bootstrap.bind(host,port)
也可以替換成僅bind埠。
public ChannelFuture bind(String inetHost, int inetPort) { return bind(new InetSocketAddress(inetHost, inetPort)); } 複製程式碼
public synchronized InetAddress anyLocalAddress() { if (anyLocalAddress == null) { anyLocalAddress = new Inet4Address(); // {0x00,0x00,0x00,0x00} anyLocalAddress.holder().hostName = "0.0.0.0"; } return anyLocalAddress; } 複製程式碼
它預設會給 0.0.0.0
埠開放服務。 4. handleHttpRequest
和 handleFrame
是 MyWebSocketService
類的一個實現。 5. 各個細節都有註釋,仔細看註釋。
4. 啟動服務
public class Main { public static void main(String[] args) { new WebSocketServer("192.168.1.33",9999).start(); } } 複製程式碼
區域網內如何測試?
我用的是npm 的一個serve 服務來搞區域網。 官網介紹: ofollow,noindex">www.npmjs.com/package/ser… 我的文章: React打包注意事項及靜態檔案服務搭建

這下保證你的手機和電腦都在區域網內,就可以訪問你自己的群聊了。
5. 前端頁面
要送就送一套。
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <meta http-equiv="X-UA-Compatible" content="ie=edge"> <title>Document</title> <style type="text/css"> .talk_con{ width:600px; height:500px; border:1px solid #666; margin:50px auto 0; background:#f9f9f9; } .talk_show{ width:580px; height:420px; border:1px solid #666; background:#fff; margin:10px auto 0; overflow:auto; } .talk_input{ width:580px; margin:10px auto 0; } .whotalk{ width:80px; height:30px; float:left; outline:none; } .talk_word{ width:420px; height:26px; padding:0px; float:left; margin-left:10px; outline:none; text-indent:10px; } .talk_sub{ width:56px; height:30px; float:left; margin-left:10px; } .atalk{ margin:10px; } .atalk span{ display:inline-block; background:#0181cc; border-radius:10px; color:#fff; padding:5px 10px; } .btalk{ margin:10px; text-align:right; } .btalk span{ display:inline-block; background:#ef8201; border-radius:10px; color:#fff; padding:5px 10px; } </style> <script type="text/javascript"> // document.onkeydown = function (ev) { if (ev && ev.keyCode == 13){ send(); clear(); } } var socket; if (window.WebSocket) { socket = new WebSocket("ws://192.168.1.33:9999"); // socket = new WebSocket("ws://127.0.0.1:9999"); // socket = new WebSocket("ws://192.168.43.186:9999"); socket.onmessage = function (ev) { atalkAppendIn("接收:"+socket.channel + ":" + ev.data) }; socket.onopen = function () { btalkAppendIn("連線已建立"); } socket.onclose = function () { btalkAppendIn("連線關閉"); }; }else { alert("瀏覽器不支援"); } function send(){ var message = document.getElementById("talkwords"); if (!window.WebSocket){ return } if (socket.readyState === WebSocket.OPEN){ socket.send(message.value); btalkAppendIn("傳送:"+ message.value); clear(); } else { alert("WebSocket 建立失敗"); } } function atalkAppendIn(text) { var append = document.getElementById("words"); append.innerHTML+= '<div class="atalk"><span>'+ text +'</span></div>'; } function btalkAppendIn(text) { var append = document.getElementById("words"); append.innerHTML+= '<div class="btalk"><span>'+ text +'</span></div>'; } function clear () { var elementById = document.getElementById("talkwords"); elementById.value = ""; } </script> </head> <body> <div class="talk_con"> <div class="talk_show" id="words"> </div> <div class="talk_input"> <!--<select class="whotalk" id="who">--> <!--<option value="0">A說:</option>--> <!--<option value="1">B說:</option>--> <!--</select>--> <input type="text" class="talk_word" id="talkwords"> <input type="button" onclick="send()" value="傳送" class="talk_sub" id="talksub"> </div> </div> </body> </html> 複製程式碼
-
socket = new WebSocket("ws://192.168.1.33:9999");
注意這裡ip和port與服務一一對應。 -
socket.onmessage()
是獲取socket資訊。socket.onopen
是建立連線。socket.onclose
是關閉連線。socket.send(message.value);
是傳送socket資訊。


控制檯輸出:
15:12:42.443 [nioEventLoopGroup-3-6] INFO com.fantj.springbootjpa.netty.WebSocketServer - receive textWebSocketFrame from channel: [id: 0x0d08c657, L:/192.168.1.33:9999 - R:/192.168.1.33:50440] , 目前一共有2個線上 15:12:42.443 [nioEventLoopGroup-3-6] INFO com.fantj.springbootjpa.netty.WebSocketServer - 訊息已傳送給[id: 0xacd5c1ad, L:/192.168.1.33:9999 - R:/192.168.1.33:50438] 15:12:42.444 [nioEventLoopGroup-3-5] DEBUG io.netty.handler.codec.http.websocketx.WebSocket08FrameEncoder - Encoding WebSocket Frame opCode=1 length=5 15:12:42.443 [nioEventLoopGroup-3-6] INFO com.fantj.springbootjpa.netty.WebSocketServer - write text: TextWebSocketFrame(data: UnpooledUnsafeHeapByteBuf(ridx: 0, widx: 5, cap: 15)) to channel: [id: 0x0d08c657, L:/192.168.1.33:9999 - R:/192.168.1.33:50440] 複製程式碼