Netty作為服務端的websocket通訊
阿新 • • 發佈:2019-01-22
http協議是無狀態的,因此導致客戶端每次通訊都需要攜帶標識(session)給服務端,以此來識別是哪個客戶端傳送過來的資訊。但是當服務端主動推送給客戶端時就無法實現了,因為服務端不知道客戶端在哪,此時通常的做法時客戶端輪詢服務端,不停的給服務端傳送訊息,來接受服務端資訊。很明顯這種方式會浪費大量的資源,並且HTTP訊息本身攜帶的資料就比較大,頻繁傳送更會增加網路負擔。websocket就是為了解決這個問題而誕生的。
一、websocket
WebSocket協議是基於TCP的一種新的網路協議。它實現了瀏覽器與伺服器全雙工(full-duplex)通訊——允許伺服器主動傳送資訊給客戶端。websocket的特點是事件驅動、非同步、使用ws或者wss協議的客戶端socket、能夠實現真正意義上的推送功能。通訊流程:
但是websocket本身依賴於tomcat,然而tomcat併發量不大,連線數低,會導致出現斷連的情況,因此對於websocket通訊要求不高的可以直接使用tomcat即可,但是遇到高併發就難以支援了。通常會選用Netty作為通訊的服務端。
二、Netty
Netty是由JBOSS提供的一個java開源框架。Netty提供非同步的、事件驅動的網路應用程式框架和工具,用以快速開發高效能、高可靠性的網路伺服器和客戶端程式。Netty是基於Java
NIO實現的非同步通訊框架,其主要特點是簡單,要比原生的JavaNIO開發方便很多,同時Netty封裝了大量好用的元件,方便開發。下面基於Netty實現websocket通訊。
三、實現websocket通訊
public class WebSocketServer { public void run() { // 服務端啟動輔助類,用於設定TCP相關引數 ServerBootstrap bootstrap = new ServerBootstrap(); // 獲取Reactor執行緒池 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); // 設定為主從執行緒模型 bootstrap.group(bossGroup, workGroup) // 設定服務端NIO通訊型別 .channel(NioServerSocketChannel.class) // 設定ChannelPipeline,也就是業務職責鏈,由處理的Handler串聯而成,由從執行緒池處理 .childHandler(new ChannelInitializer<Channel>() { // 新增處理的Handler,通常包括訊息編解碼、業務處理,也可以是日誌、許可權、過濾等 @Override protected void initChannel(Channel ch) throws Exception { // 獲取職責鏈 ChannelPipeline pipeline = ch.pipeline(); // pipeline.addLast("http-codec", new HttpServerCodec()); pipeline.addLast("aggregator", new HttpObjectAggregator(65535)); pipeline.addLast("http-chunked", new ChunkedWriteHandler()); pipeline.addLast("handler", new WebSocketHandler()); } }) // bootstrap 還可以設定TCP引數,根據需要可以分別設定主執行緒池和從執行緒池引數,來優化服務端效能。 // 其中主執行緒池使用option方法來設定,從執行緒池使用childOption方法設定。 // backlog表示主執行緒池中在套介面排隊的最大數量,佇列由未連線佇列(三次握手未完成的)和已連線佇列 .option(ChannelOption.SO_BACKLOG, 5) // 表示連線保活,相當於心跳機制,預設為7200s .childOption(ChannelOption.SO_KEEPALIVE, true); try { // 繫結埠,啟動select執行緒,輪詢監聽channel事件,監聽到事件之後就會交給從執行緒池處理 Channel channel = bootstrap.bind(8081).sync().channel(); // 等待服務埠關閉 channel.closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 優雅退出,釋放執行緒池資源 bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } public static void main(String[] args) { new WebSocketServer().run(); } }
public class WebSocketHandler extends ChannelInboundHandlerAdapter{
//用於websocket握手的處理類
private WebSocketServerHandshaker handshaker;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
if (msg instanceof FullHttpRequest) {
// websocket連線請求
handleHttpRequest(ctx, (FullHttpRequest)msg);
} else if (msg instanceof WebSocketFrame) {
// websocket業務處理
handleWebSocketRequest(ctx, (WebSocketFrame)msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
}
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
// Http解碼失敗,向伺服器指定傳輸的協議為Upgrade:websocket
if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
// 握手相應處理,建立websocket握手的工廠類,
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://localhost:8081/ws", null, false);
// 根據工廠類和HTTP請求建立握手類
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
// 不支援websocket
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
// 通過它構造握手響應訊息返回給客戶端
handshaker.handshake(ctx.channel(), req);
}
}
private void handleWebSocketRequest(ChannelHandlerContext ctx, WebSocketFrame req) throws Exception {
if (req instanceof CloseWebSocketFrame) {
// 關閉websocket連線
handshaker.close(ctx.channel(), (CloseWebSocketFrame)req.retain());
return;
}
if (req instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(req.content().retain()));
return;
}
if (!(req instanceof TextWebSocketFrame)) {
throw new UnsupportedOperationException("當前只支援文字訊息,不支援二進位制訊息");
}
if (ctx == null || this.handshaker == null || ctx.isRemoved()) {
throw new Exception("尚未握手成功,無法向客戶端傳送WebSocket訊息");
}
ctx.channel().write(new TextWebSocketFrame(((TextWebSocketFrame)req).text()));
}
private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
// BAD_REQUEST(400) 客戶端請求錯誤返回的應答訊息
if (res.status().code() != 200) {
// 將返回的狀態碼放入快取中,Unpooled沒有使用快取池
ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
HttpUtil.setContentLength(res, res.content().readableBytes());
}
// 傳送應答訊息
ChannelFuture cf = ctx.channel().writeAndFlush(res);
// 非法連線直接關閉連線
if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) {
cf.addListener(ChannelFutureListener.CLOSE);
}
}
}
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>WebSocket Chat</title>
</head>
<body>
<script type="text/javascript">
var socket;
if (!window.WebSocket) {
window.WebSocket = window.MozWebSocket;
}
if (window.WebSocket) {
socket = new WebSocket("ws://localhost:8081/ws");
socket.onopen = function(event) {
var ta = document.getElementById('responseText');
ta.value = "連線開啟!";
};
socket.onclose = function(event) {
var ta = document.getElementById('responseText');
ta.value = ta.value + "連線被關閉";
};
socket.onmessage = function(event) {
var ta = document.getElementById('responseText');
ta.value = ta.value + '\n' + event.data;
};
} else {
alert("你的瀏覽器不支援 WebSocket!");
}
function send(message) {
if (!window.WebSocket) {
return;
}
if (socket.readyState == WebSocket.OPEN) {
socket.send(message);
} else {
alert("連線沒有開啟.");
}
}
</script>
<form onsubmit="return false;">
<h3>WebSocket 聊天室:</h3>
<textarea id="responseText" style="width: 500px; height: 300px;"></textarea>
<br>
<input type="text" name="message" style="width: 300px" value="Welcome to www.waylau.com">
<input type="button" value="傳送訊息" onclick="send(this.form.message.value)">
<input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="清空聊天記錄">
</form>
<br>
<br>
</body>
</html>