1. 程式人生 > >Netty實現聊天通訊(4.0)

Netty實現聊天通訊(4.0)

package com.test.netty4;  

import io.netty.channel.ChannelHandlerContext;  
import io.netty.handler.codec.http.FullHttpRequest;  

public interface IHttpService {  

    void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request);  

}  

package com.test.netty4;  

import io.netty
.channel.ChannelHandlerContext; import io.netty.handler.codec.http.websocketx.WebSocketFrame; public interface IWebSocketService { void handleFrame(ChannelHandlerContext ctx, WebSocketFrame frame); }

package com.test.netty4;  

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap
; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelId; import io.netty.channel.ChannelInitializer; import io.netty
.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.util.AttributeKey; import io.netty.util.concurrent.GlobalEventExecutor; public class WebSocketServer implements IWebSocketService, IHttpService { public static void main(String[] args) { new WebSocketServer(9999).start(); } // ----------------------------static fields ----------------------------- private static final String HN_HTTP_CODEC = "HN_HTTP_CODEC"; private static final String HN_HTTP_AGGREGATOR = "HN_HTTP_AGGREGATOR"; private static final String HN_HTTP_CHUNK = "HN_HTTP_CHUNK"; private static final String HN_SERVER = "HN_LOGIC"; // handshaker attachment key private static final AttributeKey<WebSocketServerHandshaker> ATTR_HANDSHAKER = AttributeKey.newInstance("ATTR_KEY_CHANNELID"); private static final int MAX_CONTENT_LENGTH = 65536; private static final String WEBSOCKET_UPGRADE = "websocket"; private static final String WEBSOCKET_CONNECTION = "Upgrade"; private static final String WEBSOCKET_URI_ROOT_PATTERN = "ws://%s:%d"; // ------------------------ member fields ----------------------- private String host; // 繫結的地址 private int port; // 繫結的埠 /** * 儲存所有WebSocket連線 */ private Map<ChannelId, Channel> channelMap = new ConcurrentHashMap<ChannelId, Channel>(); private ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private final String WEBSOCKET_URI_ROOT; public WebSocketServer(int port) { this("localhost", port); } public WebSocketServer(String host, int port) { this.host = host; this.port = port; WEBSOCKET_URI_ROOT = String.format(WEBSOCKET_URI_ROOT_PATTERN, host, port); } public void start() { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup); b.channel(NioServerSocketChannel.class); b.childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pl = ch.pipeline(); // 儲存該Channel的引用 channelGroup.add(ch); channelMap.put(ch.id(), ch); System.out.println("new channel {}"+ ch); ch.closeFuture().addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { System.out.println("channel close {}"+ future.channel()); // Channel 關閉後不再引用該Channel channelMap.remove(future.channel().id()); } }); pl.addLast(HN_HTTP_CODEC, new HttpServerCodec()); pl.addLast(HN_HTTP_AGGREGATOR, new HttpObjectAggregator(MAX_CONTENT_LENGTH)); pl.addLast(HN_HTTP_CHUNK, new ChunkedWriteHandler()); pl.addLast(HN_SERVER, new WebSocketServerHandler(WebSocketServer.this, WebSocketServer.this)); } }); try { // 繫結埠 ChannelFuture future = b.bind(host, port).addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { System.out.println("websocket started."); } } }).sync(); future.channel().closeFuture().addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { System.out.println("server channel {} closed."+future.channel()); } }).sync(); } catch (InterruptedException e) { e.printStackTrace(); // logger.error(e.toString()); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } System.out.println("websocket server shutdown"); } /* * @see cc.lixiaohui.demo.netty4.websocket.IHttpService#handleHttpRequest(io.netty.handler.codec.http.FullHttpRequest) */ public void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) { if (isWebSocketUpgrade(req)) { // 該請求是不是websocket upgrade請求 System.out.println("upgrade to websocket protocol"); String subProtocols = req.headers().get(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL); WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(WEBSOCKET_URI_ROOT, subProtocols, false); WebSocketServerHandshaker handshaker = factory.newHandshaker(req); if (handshaker == null) {// 請求頭不合法, 導致handshaker沒建立成功 WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { // 響應該請求 handshaker.handshake(ctx.channel(), req); // 把handshaker 繫結給Channel, 以便後面關閉連線用 ctx.channel().attr(ATTR_HANDSHAKER).set(handshaker);// attach handshaker to this channel } return; } // TODO 忽略普通http請求 System.out.println("ignoring normal http request"); } /* * @see * cc.lixiaohui.demo.netty4.websocket.IWebSocketService#handleFrame(io.netty * .channel.Channel, io.netty.handler.codec.http.websocketx.WebSocketFrame) */ public void handleFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { // text frame if (frame instanceof TextWebSocketFrame) { String text = ((TextWebSocketFrame) frame).text(); // TextWebSocketFrame rspFrame = new TextWebSocketFrame(text); System.out.println("recieve TextWebSocketFrame from channel {}"+ ctx.channel()); // 發給其他所有channel // for (Channel ch : channelMap.values()) { // if (ctx.channel().equals(ch)) { // continue; // } // TextWebSocketFrame rspFrame = new TextWebSocketFrame(text); // ch.writeAndFlush(rspFrame); // System.out.println("write text[{}] to channel {}"+ text+ ch); // } TextWebSocketFrame rspFrame = new TextWebSocketFrame(text); channelGroup.writeAndFlush(rspFrame); return; } // ping frame, 回覆pong frame即可 if (frame instanceof PingWebSocketFrame) { System.out.println("recieve PingWebSocketFrame from channel {}"+ ctx.channel()); ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain())); return; } if (frame instanceof PongWebSocketFrame) { System.out.println("recieve PongWebSocketFrame from channel {}"+ ctx.channel()); return; } // close frame, if (frame instanceof CloseWebSocketFrame) { System.out.println("recieve CloseWebSocketFrame from channel {}"+ ctx.channel()); WebSocketServerHandshaker handshaker = ctx.channel().attr(ATTR_HANDSHAKER).get(); if (handshaker == null) { System.out.println("channel {} have no HandShaker"+ ctx.channel()); return; } handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); return; } // 剩下的是binary frame, 忽略 System.out.println("unhandle binary frame from channel {}"+ctx.channel()); } //三者與:1.GET? 2.Upgrade頭 包含websocket字串? 3.Connection頭 包含 Upgrade字串? private boolean isWebSocketUpgrade(FullHttpRequest req) { HttpHeaders headers = req.headers(); return req.getMethod().equals(HttpMethod.GET) && headers.get(HttpHeaderNames.UPGRADE).contains(WEBSOCKET_UPGRADE) && headers.get(HttpHeaderNames.CONNECTION).contains(WEBSOCKET_CONNECTION); } }

package com.test.netty4;  

import io.netty.channel.ChannelHandlerContext;  
import io.netty.channel.SimpleChannelInboundHandler;  
import io.netty.handler.codec.http.FullHttpRequest;  
import io.netty.handler.codec.http.websocketx.WebSocketFrame;  



public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {  

    @SuppressWarnings("unused")  

    private IWebSocketService websocketService;  

    private IHttpService httpService;  

    public WebSocketServerHandler(IWebSocketService websocketService, IHttpService httpService) {  
        super();  
        this.websocketService = websocketService;  
        this.httpService = httpService;  
    }  

    /* 
     * @see 
     * io.netty.channel.SimpleChannelInboundHandler#channelRead0(io.netty.channel 
     * .ChannelHandlerContext, java.lang.Object) 
     */  
    @Override  
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {  
        if (msg instanceof FullHttpRequest) {  
            httpService.handleHttpRequest(ctx, (FullHttpRequest) msg);  
        } else if (msg instanceof WebSocketFrame) {  
            websocketService.handleFrame(ctx, (WebSocketFrame) msg);  
        }  
    }  

    /*  
     * @see io.netty.channel.ChannelInboundHandlerAdapter#channelReadComplete(io.netty.channel.ChannelHandlerContext) 
     */  
    @Override  
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {  
        ctx.flush();  
    } 


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {

    }


}  

2、html頁面程式碼


    <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">  
    <html xmlns="http://www.w3.org/1999/xhtml">  
    <head>  
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />  
    <title></title>  
    </head>  
      </head>  
      <script type="text/javascript">  
      var socket;  

      if(!window.WebSocket){  
          window.WebSocket = window.MozWebSocket;  
      }  

      if(window.WebSocket){  
          socket = new WebSocket("ws://localhost:9999");  
          socket.onmessage = function(event){             
                appendln("接收:" + event.data);  
          };  

          socket.onopen = function(event){  
                appendln("WebSocket 連線已建立");  

          };  

          socket.onclose = function(event){  
                appendln("WebSocket 連線已關閉");  
          };  
      }else{  
            alert("瀏覽器不支援WebSocket協議");  
      }  

      function send(message){  
        if(!window.WebSocket){return;}  
        if(socket.readyState == WebSocket.OPEN){  
            socket.send(message);  
            appendln("傳送:" + message);  
        }else{  
            alert("WebSocket連線建立失敗");  
        }  

      }  

      function appendln(text) {  
        var ta = document.getElementById('responseText');  
        ta.value += text + "\r\n";  
      }  

      function clear() {  
        var ta = document.getElementById('responseText');  
        ta.value = "";  
      }  

      </script>  
      <body>  
        <form onSubmit="return false;">  
            <input type = "text" name="message" value="你好啊"/>  
            <br/><br/>  
            <input type="button" value="傳送 WebSocket 請求訊息" onClick="send(this.form.message.value)"/>  
            <hr/>  
            <h3>服務端返回的應答訊息</h3>  
            <textarea id="responseText" style="width: 800px;height: 300px;"></textarea>  
        </form>  
      </body>  
    </html>  

3、啟動 WebSocketServer 的main方法,然後開啟多個html頁面,即可實現聊天互通。更
此處是基於netty-all-4.1.12.Final.jar實現的。更詳細的介紹可自行網路搜尋。