1. 程式人生 > >netty的websocket服務端開發

netty的websocket服務端開發

前兩篇文章分析了node.js版本的socket.io的服務端與客戶端的程式碼,其實他們socket.io就是在websocket的基礎上進一步做了一層封裝,添加了一些心跳,重連線。。。

這篇文章就先來看看在netty中是如何建立websocket服務端的吧,先來回顧一下websocket建立連線以及通訊的過程:

(1)客戶端向服務端傳送http報文格式的請求,而且是GET方法的請求,不過這裡與普通的http請求有稍微不同的地方,那就是頭部connection欄位是Upgrade,然後又Upgrad欄位,值是websocket,其請求格式如下:

     * GET /chat HTTP/1.1
     * Host: server.example.com
     * Upgrade: websocket
     * Connection: Upgrade
     * Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
     * Sec-WebSocket-Origin: http://example.com
     * Sec-WebSocket-Protocol: chat, superchat
     * Sec-WebSocket-Version: 13

(2)由伺服器端向客戶端返回特定格式的http報文,表示當前websocket建立,報文格式如下:

     * HTTP/1.1 101 Switching Protocols
     * Upgrade: websocket
     * Connection: Upgrade
     * Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
     * Sec-WebSocket-Protocol: chat

(3)當連線建立以後,那麼雙方就可以通過剛剛建立連線用的socket來發送資料了,這裡傳送的資料必須經過websocket的幀格式的編碼,具體它的資訊就去看websocket的協議標準就知道了。。。

其實知道了整個建立連線以及通訊的過程,那麼就能夠知道如何在channel的pipeline上面安排各種handler的順序了,如下:

(1)首先要安排http報文的decode,aggregator以及encode的handler,因為最開始還是採用http通訊的

(2)接收到websocket的建立連線報文之後,通過一些處理按照規定的格式將http返回傳送給客戶端,那麼這就表示websocket的連線已經建立了

(3)移除前面設定的所有的http報文處理的handler,然後加上websocket的frame的decode以及encodehandler,用於將從客戶端收到的資料轉化為封裝好的格式,以及將要傳送的資料轉化成byte.

那麼接下來來看看具體是如何在netty中建立websocket的服務端的吧,直接上主程式的程式碼:

public class Fjs {
	public void run() throws Exception {
		EventLoopGroup bossGroup = new NioEventLoopGroup();   //這個是用於serversocketchannel的eventloop
		EventLoopGroup workerGroup = new NioEventLoopGroup();    //這個是用於處理accept到的channel
		try {
			ServerBootstrap b = new ServerBootstrap();    //構建serverbootstrap物件
			b.group(bossGroup, workerGroup);   //設定時間迴圈物件,前者用來處理accept事件,後者用於處理已經建立的連線的io
			b.channel(NioServerSocketChannel.class);   //用它來建立新accept的連線,用於構造serversocketchannel的工廠類
			
			
			b.childHandler(new ChannelInitializer<SocketChannel>(){      //為accept channel的pipeline預新增的inboundhandler
				@Override     //當新連線accept的時候,這個方法會呼叫
				protected void initChannel(SocketChannel ch) throws Exception {
					
					//ch.pipeline().addLast(new ReadTimeoutHandler(10));
					//ch.pipeline().addLast(new WriteTimeoutHandler(1));
					
					ch.pipeline().addLast("decoder", new HttpRequestDecoder());   //用於解析http報文的handler
					ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));   //用於將解析出來的資料封裝成http物件,httprequest什麼的
					ch.pipeline().addLast("encoder", new HttpResponseEncoder());   //用於將response編碼成httpresponse報文傳送
					ch.pipeline().addLast("handshake", new WebSocketServerProtocolHandler("", "", true));  //websocket的handler部分定義的,它會自己處理握手等操作
					//ch.pipeline().addLast("chunkedWriter", new ChunkedWriteHandler());
					//ch.pipeline().addLast(new HttpHanlder());
					ch.pipeline().addLast(new WebSocketHandler());
				}
				
			});
			//bind方法會建立一個serverchannel,並且會將當前的channel註冊到eventloop上面,
			//會為其繫結本地埠,並對其進行初始化,為其的pipeline加一些預設的handler
			ChannelFuture f = b.bind(80).sync();    
			f.channel().closeFuture().sync();  //相當於在這裡阻塞,直到serverchannel關閉
		} finally {
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	}
	
	public static void main(String args[]) throws Exception {
		new Fjs().run();
	}
}

這裡可以看看在channel上面安排的handler的情況,主要是就是http部分的decode以及encode的handler,另外這裡有一個比較重要的handler,WebSocketServerProtocolHandler它對整個websocket的通訊進行了初始化,包括握手,以及以後的一些通訊控制。。。

然後再來看看我們自己定義的handler的處理吧:

	@Override
	public void channelRead(final ChannelHandlerContext ctx, Object msg)
			throws Exception {
		// TODO Auto-generated method stub
		
		WebSocketFrame frame = (WebSocketFrame)msg;
		ByteBuf buf = frame.content();  //真正的資料是放在buf裡面的
		
		
		String aa = buf.toString(Charset.forName("utf-8"));  //將資料按照utf-8的方式轉化為字串
		System.out.println(aa);
		WebSocketFrame out = new TextWebSocketFrame(aa);  //建立一個websocket幀,將其傳送給客戶端
		ctx.pipeline().writeAndFlush(out).addListener(new ChannelFutureListener(){

			@Override
			public void operationComplete(ChannelFuture future)
					throws Exception {
				// TODO Auto-generated method stub
				ctx.pipeline().close();  //從pipeline上面關閉的時候,會關閉底層的chanel,而且會從eventloop上面取消註冊
			}
			
		});
		
	}


其實這裡也就主要是channelRead方法,將從客戶端傳送過來的資料讀出來輸出,然後在返回一個字串就好了。。。還是比較的簡單。。

上面的兩段程式碼就基本上能夠使用netty來建立websocket規範的服務端了,可以看到整個用法還是很簡單的,這裡還有必要去看看WebSocketServerProtocolHandler做了些什麼事情。。。

    //噹噹前這個handler被新增到pipeline之後會呼叫這個方法
    public void handlerAdded(ChannelHandlerContext ctx) {
        ChannelPipeline cp = ctx.pipeline();
        if (cp.get(WebSocketServerProtocolHandshakeHandler.class) == null) {
            // 這裡相當於要在當前的handler之前新增用於處理websocket建立連線時候的握手handler
            ctx.pipeline().addBefore(ctx.name(), WebSocketServerProtocolHandshakeHandler.class.getName(),
                    new WebSocketServerProtocolHandshakeHandler(websocketPath, subprotocols, allowExtensions));
        }
    }

首先可以看到它覆蓋了這個方法,那麼在當前這個handler加入pipeline之後會在這個pipeline之前新增另外一個handler,這個handler是用於websocket的握手的。。。

好吧,那麼我們來看看這個用於websocket建立連線的握手的handler是怎麼個用法吧:

    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
        FullHttpRequest req = (FullHttpRequest) msg;
        if (req.getMethod() != GET) {  //如果不是get請求,那麼就出錯了
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN));
            return;
        }
        //建立愛你websocket進行連線握手的工廠類,因為不同版本的連線握手不太一樣
        final WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                getWebSocketLocation(ctx.pipeline(), req, websocketPath), subprotocols, allowExtensions);
        final WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req);  //這裡會根據不同的websocket版本來安排不同的握手handler
        if (handshaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
        } else {
        	//其實這裡在將用於建立連線的http報文傳送回去之後,會將前面新增的http部分的handler都移除,然後加上用於decode和encode針對websocket幀的handler
            final ChannelFuture handshakeFuture = handshaker.handshake(ctx.channel(), req);   //這裡說白了就是進行握手,向客戶端返回用於建立連線的報文
            handshakeFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        ctx.fireExceptionCaught(future.cause());
                    } else {
                        ctx.fireUserEventTriggered(  //用於啟用握手已經完成的事件,可以讓使用者的程式碼收到通知
                                WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE);
                    }
                }
            });
            WebSocketServerProtocolHandler.setHandshaker(ctx, handshaker);  //儲存當前的shaker
            ctx.pipeline().replace(this, "WS403Responder",
                    WebSocketServerProtocolHandler.forbiddenHttpRequestResponder());  //將當前這個handler替換,因為只有剛開始使用http協議進行通訊,接下來就沒有了
        }
    }

其實基本還是很簡單的,無非就是根據收到的http請求,獲取當前建立連線的websocket客戶端的版本資訊,然後通過版本獲取相應的用於握手的handler,然後進行相應的處理,將用於建立websocket連線的報文傳送給客戶端就可以了,不過這裡還有一些細節,那就是會在之後將處理http的handler移除,換成處理websocket幀的handler,因為以後的通訊就不是按照http來的了,而且會將當前這個handler也替換掉,畢竟以後就不會再用了嘛。。

那麼接下來回到WebSocketServerProtocolHandler,因為本身它是一個messagetomessagedecoder,那麼來看看它的decode方法:

    protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> out) throws Exception {
        if (frame instanceof CloseWebSocketFrame) {  //如果是用於關閉
            WebSocketServerHandshaker handshaker = getHandshaker(ctx);
            frame.retain();
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame); //傳送用於關閉的websocket幀
            return;
        }
        super.decode(ctx, frame, out);  
    }

上面獲取的shaker是在前面儲存的,畢竟不同版本的websocket對應不同的。。。再來看看它父類的方法吧:

    protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> out) throws Exception {
        if (frame instanceof PingWebSocketFrame) {
            frame.content().retain();
            ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content()));
            return;
        }
        if (frame instanceof PongWebSocketFrame) {
            // Pong frames need to get ignored
            return;
        }

        out.add(frame.retain());
    }

那麼到這裡,整個websocket在netty中建立服務端的流程都已經很清楚了。。。

以後如果有時間的自己寫一個socket.io的吧,因為現在看到的那些都不太好用的樣子。。。