1. 程式人生 > >Netty作為服務端的websocket通訊

Netty作為服務端的websocket通訊

http協議是無狀態的,因此導致客戶端每次通訊都需要攜帶標識(session)給服務端,以此來識別是哪個客戶端傳送過來的資訊。但是當服務端主動推送給客戶端時就無法實現了,因為服務端不知道客戶端在哪,此時通常的做法時客戶端輪詢服務端,不停的給服務端傳送訊息,來接受服務端資訊。很明顯這種方式會浪費大量的資源,並且HTTP訊息本身攜帶的資料就比較大,頻繁傳送更會增加網路負擔。websocket就是為了解決這個問題而誕生的。

一、websocket

WebSocket協議是基於TCP的一種新的網路協議。它實現了瀏覽器與伺服器全雙工(full-duplex)通訊——允許伺服器主動傳送資訊給客戶端。websocket的特點是事件驅動、非同步、使用ws或者wss協議的客戶端socket、能夠實現真正意義上的推送功能。通訊流程:


但是websocket本身依賴於tomcat,然而tomcat併發量不大,連線數低,會導致出現斷連的情況,因此對於websocket通訊要求不高的可以直接使用tomcat即可,但是遇到高併發就難以支援了。通常會選用Netty作為通訊的服務端。

二、Netty

Netty是由JBOSS提供的一個java開源框架。Netty提供非同步的、事件驅動的網路應用程式框架和工具,用以快速開發高效能、高可靠性的網路伺服器和客戶端程式。Netty是基於Java NIO實現的非同步通訊框架,其主要特點是簡單,要比原生的JavaNIO開發方便很多,同時Netty封裝了大量好用的元件,方便開發。下面基於Netty實現websocket通訊。

三、實現websocket通訊

public class WebSocketServer {
	
	public void run() {
		// 服務端啟動輔助類,用於設定TCP相關引數
		ServerBootstrap bootstrap = new ServerBootstrap();
		// 獲取Reactor執行緒池
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workGroup = new NioEventLoopGroup();
		// 設定為主從執行緒模型
		bootstrap.group(bossGroup, workGroup)
		// 設定服務端NIO通訊型別
		.channel(NioServerSocketChannel.class)
		// 設定ChannelPipeline,也就是業務職責鏈,由處理的Handler串聯而成,由從執行緒池處理
		.childHandler(new ChannelInitializer<Channel>() {
			// 新增處理的Handler,通常包括訊息編解碼、業務處理,也可以是日誌、許可權、過濾等
			@Override
			protected void initChannel(Channel ch) throws Exception {
				// 獲取職責鏈
				ChannelPipeline pipeline = ch.pipeline();
				// 
				pipeline.addLast("http-codec", new HttpServerCodec());
				pipeline.addLast("aggregator", new HttpObjectAggregator(65535));
				pipeline.addLast("http-chunked", new ChunkedWriteHandler());
				pipeline.addLast("handler", new WebSocketHandler());
			}
		})
		// bootstrap 還可以設定TCP引數,根據需要可以分別設定主執行緒池和從執行緒池引數,來優化服務端效能。
		// 其中主執行緒池使用option方法來設定,從執行緒池使用childOption方法設定。
		// backlog表示主執行緒池中在套介面排隊的最大數量,佇列由未連線佇列(三次握手未完成的)和已連線佇列
		.option(ChannelOption.SO_BACKLOG, 5)
		// 表示連線保活,相當於心跳機制,預設為7200s
		.childOption(ChannelOption.SO_KEEPALIVE, true);
		
		try {
			// 繫結埠,啟動select執行緒,輪詢監聽channel事件,監聽到事件之後就會交給從執行緒池處理
			Channel channel = bootstrap.bind(8081).sync().channel();
			// 等待服務埠關閉
			channel.closeFuture().sync();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			// 優雅退出,釋放執行緒池資源
			bossGroup.shutdownGracefully();
			workGroup.shutdownGracefully();
		}
	}

	public static void main(String[] args) {
		new WebSocketServer().run();
	}

}

public class WebSocketHandler extends ChannelInboundHandlerAdapter{
	//用於websocket握手的處理類
	private WebSocketServerHandshaker handshaker;
	
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg)
			throws Exception {
		if (msg instanceof FullHttpRequest) {
			// websocket連線請求
			handleHttpRequest(ctx, (FullHttpRequest)msg);
		} else if (msg instanceof WebSocketFrame) {
			// websocket業務處理
			handleWebSocketRequest(ctx, (WebSocketFrame)msg);
		}
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
			throws Exception {
		ctx.close();
	}
	
	private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
		// Http解碼失敗,向伺服器指定傳輸的協議為Upgrade:websocket
		if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
			sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
			return;
		}
		// 握手相應處理,建立websocket握手的工廠類,
		WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://localhost:8081/ws", null, false);
		// 根據工廠類和HTTP請求建立握手類
		handshaker = wsFactory.newHandshaker(req);
		if (handshaker == null) {
			// 不支援websocket
			WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
		} else {
			// 通過它構造握手響應訊息返回給客戶端
			handshaker.handshake(ctx.channel(), req);
		}
	}
	
	private void handleWebSocketRequest(ChannelHandlerContext ctx, WebSocketFrame req) throws Exception {
		if (req instanceof CloseWebSocketFrame) {
			// 關閉websocket連線
			handshaker.close(ctx.channel(), (CloseWebSocketFrame)req.retain());
			return;
		}
		if (req instanceof PingWebSocketFrame) {
			ctx.channel().write(new PongWebSocketFrame(req.content().retain()));
			return;
		}
		if (!(req instanceof TextWebSocketFrame)) {
			throw new UnsupportedOperationException("當前只支援文字訊息,不支援二進位制訊息");
		}
		if (ctx == null || this.handshaker == null || ctx.isRemoved()) {
			throw new Exception("尚未握手成功,無法向客戶端傳送WebSocket訊息");
		}
		ctx.channel().write(new TextWebSocketFrame(((TextWebSocketFrame)req).text()));
	}
	
	private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
		// BAD_REQUEST(400) 客戶端請求錯誤返回的應答訊息
		if (res.status().code() != 200) {
			// 將返回的狀態碼放入快取中,Unpooled沒有使用快取池
			ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
			res.content().writeBytes(buf);
			buf.release();
			HttpUtil.setContentLength(res, res.content().readableBytes());
		}
		// 傳送應答訊息
		ChannelFuture cf = ctx.channel().writeAndFlush(res);
		// 非法連線直接關閉連線
		if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) {
			cf.addListener(ChannelFutureListener.CLOSE);
		}
	}

}
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>WebSocket Chat</title>
</head>
<body>
    <script type="text/javascript">
        var socket;
        if (!window.WebSocket) {
            window.WebSocket = window.MozWebSocket;
        }
        if (window.WebSocket) {
            socket = new WebSocket("ws://localhost:8081/ws");
            socket.onopen = function(event) {
                var ta = document.getElementById('responseText');
                ta.value = "連線開啟!";
            };
            socket.onclose = function(event) {
                var ta = document.getElementById('responseText');
                ta.value = ta.value + "連線被關閉";
            };
            socket.onmessage = function(event) {
                var ta = document.getElementById('responseText');
                ta.value = ta.value + '\n' + event.data;
            };
        } else {
            alert("你的瀏覽器不支援 WebSocket!");
        }

        function send(message) {
            if (!window.WebSocket) {
                return;
            }
            if (socket.readyState == WebSocket.OPEN) {
                socket.send(message);
            } else {
                alert("連線沒有開啟.");
            }
        }
    </script>
    <form onsubmit="return false;">
        <h3>WebSocket 聊天室:</h3>
        <textarea id="responseText" style="width: 500px; height: 300px;"></textarea>
        <br> 
        <input type="text" name="message"  style="width: 300px" value="Welcome to www.waylau.com">
        <input type="button" value="傳送訊息" onclick="send(this.form.message.value)">
        <input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="清空聊天記錄">
    </form>
    <br> 
    <br> 
</body>
</html>