1. 程式人生 > >利用Netty來構建WebSocket後端服務系統的例子程式

利用Netty來構建WebSocket後端服務系統的例子程式

最近在研究Netty來構建SOA架構,其中也包括了前端接入的HTTP/WebSocket方面的接入響應,而WebSocket方面的接入響應對於移動端的訊息推送研發至關重要,這裡就將在這塊研發時的非同步socket響應服務例子程式筆記記錄下來,系統總共分為4個處理類,即:

HttpRequestHandler  --  HTTP請求處理類
TextWebSocketFrameHandler -- 對應Text訊息的處理類
WebSocketServer  -- 系統主類
WebSocketServerInitializer -- 服務主程式的初始化類

WebSocketServer 類程式碼:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public final class WebSocketServer {

 private int port = 0;

 public WebSocketServer(int port) {
  this.port = port;
 }

 public void run() throws Exception {
  EventLoopGroup bossGroup = new NioEventLoopGroup();
  EventLoopGroup workerGroup = new NioEventLoopGroup();
  try {
   ServerBootstrap b = new ServerBootstrap();
   b.group(bossGroup, workerGroup)
     .channel(NioServerSocketChannel.class)
     .childHandler(new WebSocketServerInitializer())
     .option(ChannelOption.SO_BACKLOG, 128)
     .childOption(ChannelOption.SO_KEEPALIVE, true);
   
   System.out.println("WebsocketChatServer 啟動了");

   // 繫結埠,開始接收進來的連線
   ChannelFuture f = b.bind(port).sync();

   // 等待伺服器 socket 關閉 。在這個例子中,這不會發生,但你可以優雅地關閉你的伺服器。
   f.channel().closeFuture().sync();
  } finally {
   workerGroup.shutdownGracefully();
   bossGroup.shutdownGracefully();

   System.out.println("WebsocketChatServer 關閉了");
  }
 }
 
 public static void main(String[] args) throws Exception {
  int port = 0;
  
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
       
        // start current instance
        new WebSocketServer(port).run();
 }
 
}

HttpRequestHandler類程式碼:

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.DefaultFileRegion;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedNioFile;

import java.io.File;
import java.io.RandomAccessFile;
import java.net.URISyntaxException;
import java.net.URL;

public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

 private final String wsUri;
 private static final File INDEX;
 
 // static HTTP request handling operation.
 static {
  URL location = HttpRequestHandler.class.getProtectionDomain()
    .getCodeSource().getLocation();
  try {
   String path = location.toURI() + "WebSocketClient.html";
   path = !path.contains("file:") ? path : path.substring(5);
   INDEX = new File(path);
  } catch (URISyntaxException e) {
   throw new IllegalStateException("Unable to locate WebsocketChatClient.html", e);
  }
 }
 
 // constructor function call for current class
 public HttpRequestHandler(String wsUri) {
  this.wsUri = wsUri;
 }

 @Override
 public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request)
   throws Exception {
  if (wsUri.equalsIgnoreCase(request.getUri())) {
   ctx.fireChannelRead(request.retain());
  } else {
   if (HttpHeaders.is100ContinueExpected(request)) {
    send100Continue(ctx);
   }
   
   RandomAccessFile file = new RandomAccessFile(INDEX, "r");
   HttpResponse response = new DefaultHttpResponse(
     request.getProtocolVersion(), HttpResponseStatus.OK);
   response.headers().set(HttpHeaders.Names.CONTENT_TYPE,
     "text/html; charset=UTF-8");

   boolean keepAlive = HttpHeaders.isKeepAlive(request);
   if (keepAlive) {
    response.headers().set(HttpHeaders.Names.CONTENT_LENGTH,  file.length());
    response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
   }
   ctx.write(response);

   if (ctx.pipeline().get(SslHandler.class) == null) {
    ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
   } else {
    ctx.write(new ChunkedNioFile(file.getChannel()));
   }
   ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
   if (!keepAlive) {
    future.addListener(ChannelFutureListener.CLOSE);
   }

   file.close();
  }
 }

 private static void send100Continue(ChannelHandlerContext ctx) {
  FullHttpResponse response = new DefaultFullHttpResponse(
    HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
  ctx.writeAndFlush(response);
 }

 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
   throws Exception {
  Channel incoming = ctx.channel();
  System.out.println("Client:" + incoming.remoteAddress() + "異常");
  
  // 當出現異常就關閉連線
  cause.printStackTrace();
  ctx.close();
 }

}

TextWebSocketFrameHandler類程式碼:

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;

public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
 
 public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
 
 @Override
 protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
  Channel incoming = ctx.channel();
  for (Channel channel : channels) {
   if (channel != incoming) {
    //channel.writeAndFlush(new TextWebSocketFrame("[" + incoming.remoteAddress() + "]" + msg.text()));
   } else {
    channel.writeAndFlush(new TextWebSocketFrame("[伺服器端返回]:" + msg.text()));
    
    //output current message to context.
    StringBuffer sb = new StringBuffer();
    sb.append(incoming.remoteAddress()).append("->").append(msg.text());
    System.out.println(sb.toString());
   }
  }
 }
 
 @Override
 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
  Channel incoming = ctx.channel();
  for (Channel channel : channels) {
   channel.writeAndFlush(new TextWebSocketFrame("[SERVER] - "  + incoming.remoteAddress() + " 加入"));
  }
  channels.add(ctx.channel());
  System.out.println("Client:" + incoming.remoteAddress() + "加入");
 }
 
 @Override
 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
  Channel incoming = ctx.channel();
  for (Channel channel : channels) {
   channel.writeAndFlush(new TextWebSocketFrame("[SERVER] - "  + incoming.remoteAddress() + " 離開"));
  }
  System.out.println("Client:" + incoming.remoteAddress() + "離開");
  channels.remove(ctx.channel());
 }
 
 @Override
 public void channelActive(ChannelHandlerContext ctx) throws Exception {
  Channel incoming = ctx.channel();
  System.out.println("Client:" + incoming.remoteAddress() + "線上");
 }
 
 @Override
 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  Channel incoming = ctx.channel();
  System.out.println("Client:" + incoming.remoteAddress() + "掉線");
 }

 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
   throws Exception {
  Channel incoming = ctx.channel();
  System.out.println("Client:" + incoming.remoteAddress() + "異常");
  
  // 當出現異常就關閉連線
  cause.printStackTrace();
  ctx.close();
 }

}

WebSocketServerInitializer類程式碼:

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {
 
 @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
       
        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new HttpObjectAggregator(64*1024));
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new HttpRequestHandler("/ws"));
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        pipeline.addLast(new TextWebSocketFrameHandler());
       
    }
 
}

客戶端將通過HTML5的WebSocket來撰寫,特別注意WebSocket需要Firefox/Chrome/IE 比較高的版本才能夠支援,譬如IE11.

<!DOCTYPE html>
<html>
<head>
<meta charset="GBK">
<title>WebSocket聊天程式</title>
</head>
<body>
    <script type="text/javascript">
        var socket;
       
        function connectServer() {
         if (!window.WebSocket) {
             window.WebSocket = window.MozWebSocket;
         }
         if (window.WebSocket) {
             socket = new WebSocket("ws://localhost:8080/ws");
             socket.onmessage = function(event) {
                 var ta = document.getElementById('responseText');
                 ta.value = ta.value + '\n' + event.data
             };
             socket.onopen = function(event) {
                 var ta = document.getElementById('responseText');
                 ta.value = "連線開啟!";
             };
             socket.onclose = function(event) {
                 var ta = document.getElementById('responseText');
                 ta.value = ta.value + "連線被關閉";
             };
         } else {
             alert("你的瀏覽器不支援 WebSocket!");
         }
        }
       
        function send(message) {
            if (!window.WebSocket) {
                return;
            }
            if (socket.readyState == WebSocket.OPEN) {
                socket.send(message);
            } else {
                alert("連線沒有開啟.");
            }
        }
    </script>
    <form onsubmit="return false;">
     <input type="button" onclick="javascript:connectServer()" value="連線伺服器">
        <h3>WebSocket 聊天室:</h3>
        <textarea id="responseText" style="width: 500px; height: 300px;"></textarea>
        <br>
        <input type="text" name="message"  style="width: 300px" value="聊天文字在此....">
        <input type="button" value="傳送訊息" onclick="send(this.form.message.value)">
        <input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="清空聊天記錄">
    </form>
    <br>
    <br>
</body>
</html>



相關推薦

利用Netty構建WebSocket服務系統例子程式

最近在研究Netty來構建SOA架構,其中也包括了前端接入的HTTP/WebSocket方面的接入響應,而WebSocket方面的接入響應對於移動端的訊息推送研發至關重要,這裡就將在這塊研發時的非同步socket響應服務例子程式筆記記錄下來,系統總共分為4個處理類,即:Htt

使用.net core ABP和Angular模板構建部落格管理系統(建立服務

建立實體 如下所示專案下建立blog/notes資料夾,並加入我們的實體檔案 /// <summary> /// 文章資訊 /// </summary> public class Note : En

Apple使用Apache Mesos重建Siri服務

基於 work 多少 高效 初始化 包括 復制 應用 角度 蘋果公司宣布,將使用開源的集群管理軟件Apache Mesos,作為該公司廣受歡迎的、基於iOS的智能個人助理軟件Siri的後端服務。Mesosphere的博客指出,蘋果已經創建了一個命名為J.A.R.V.I.S.

Android服務器的搭建方法

指定 decode abi 搭建服務器 服務器 coder example 轉換成 是我 一直做Android前端,今天突然心血來潮想搭建一個後臺玩玩。平時都是需要什麽樣的接口直接出個接口文檔扔給後臺的兄弟,自己從來不操心他們內部的實現問題。今天懷著好奇的心理去搭建了一個J

nginx_upstream_check_module監控服務器http

mys 使用 健康狀況 註意 bubuko 指定 clas style div nginx_upstream_check_module 是專門提供負載均衡器內節點的健康檢查的外部模塊,由淘寶的姚偉斌大神開發,通過它可以用來檢測後端 realserver 的健康狀態。如果後端

maven+SpringMVC搭建RESTful服務框架

rest 頁面 pen update 靈活性 date cte requests compile 今天來嘗試一下搭建基於SpringMVC的RESTful標準的後端服務。 首先,什麽是MVC?M-model,模型,指業務數據層,具體一點就是業務邏輯與數據庫的交互;V-vi

LVS(Linux Viretual Server) 負載均衡器 + 服務

tput IV 方式 hash AD 通信 raid1 持久連接 ash 定義:   LVS是Linux Virtual Server的簡寫,意即Linux虛擬服務器,是一個虛擬的服務器集群系統。 結構:  一般來說,LVS集群采用三層結構,其主要組成部分為:   A、

高可用haproxy調度服務器實現動靜分離集群架構

eal 文件配置 instance killall col .html 相同 virtual 關閉 項目要求: (1) 動靜分離部署wordpress,動靜都要能實現負載均衡,要註意會話的問題; (2) 設計拓撲; (3) haproxy的設計要求: (a) stats p

Nginx反向代理與服務采用連接池參數分析,長連接減少TIME_WAIT

數據 模型 業務需求 技術 nginx服務器 程序 創建 反向代理 還需 前面已經講過,在使用locust直連後端服務器時,可以通過設置HTTP頭部為keep-alive,並在客戶端斷開連接,減少服務器的連接壓力。因為由客戶端斷開連接,客戶端的連接會變為TIME_WAIT狀

Nginx服務大量TIME-WAIT的解決

原因 在HTTP1.1協議中,有個 Connection 頭,Connection有兩個值,close和keep-alive,這個頭就相當於客戶端告訴服務端,服務端你執行完成請求之後,是關閉連線還是保持連線,保持連線就意味著在保持連線期間,只能由客戶端主動斷開連線。還有一個keep-alive的頭,設定的值

前端服務優化與服務優化

一、前端服務優化 1、首先程式碼上: 建議看這篇:https://www.kancloud.cn/kancloud/web_performance_optimization/80987 2、VUE 建議看這篇:https://juejin.im/post/5b960fcae51d450e9d

vue,vuex的後臺管理專案架子structure-admin,服務nodejs,前端vue頁面

1、vuex來實現狀態管理2、靜態頁面,未引入後端服務3、元件是用的是element-ui4、頁面佈局是上左右,左右佈局使用的彈性和佈局flex,左邊定寬,右邊計算寬度5、左右的滾動條是相互獨立的,去掉body上的滾動條6、沒有業務程式碼,僅僅是一個靜態的vuex的架子說明:之前使用左側menu的fixe

快速新建簡單的koa2服務

既然前端工程化是基於NodeJS,那麼選擇NodeJs做前後端分離部署也是理所應當的。其實只需要實現靜態資源和代理的話,用nginx才是最好的選擇,用NodeJS是為了日後能進一步在服務端上實現自動構建或服務端渲染。 同步到交流學習社群:https://www.mwcxs.top/page/43

websocket 推送訊息實現

引入jar包 <dependency>     <groupId>org.springframework</groupId>     <artifactId>spring-websocket</artifactId>

微信服務架構及其過載控制系統DAGOR

微信架構介紹   眼下的微信後端包含3000多個移動服務,包括即時訊息、社交網路、移動支付和第三方授權。該平臺每天收到的外部請求在10 ^10個至10^11個。每個這樣的請求都會觸發多得多的內部微服務請求,因而微信後端整體每秒需要處理數億個請求。   微信的微服務系統容納在微信業務

nginx反向代理-服務器組設置

web服務器 正向 local 請求方法 否則 字段 對組 網站 page nginx服務器的反向代理時其最常用的重要功能之一,在實際工作中應用廣泛,涉及的配置指令也比較多。下面會盡量詳細地介紹對應的指令,及其使用狀態。 反向代理一般是互聯網需要向內網拉取資源,比如訪問一個

mui 之ajax遇到的坑 後臺接受資料為空 服務為php

mui.ajax的官方模板為             mui.ajax(Host+'/app/index/newsinfo',{                 data:{id:newsId},                 dataType:'json',//伺服器返回

如何健壯你的服務

對每一個程式設計師而言,故障都是懸在頭上的達摩克利斯之劍,都唯恐避之不及,如何避免故障是每一個程式設計師都在苦苦追尋希望解決的問題。對於這一問題,大家都可以從需求分析、架構設計、程式碼編寫、測試、code review、上線、線上服務運維等各個視角給出自己的答案。 我們

spring rest 容易被忽視的服務 chunked 效能問題

spring boot 容易被忽視的後端服務 chunked 效能問題 標籤(空格分隔): springboot springmvc chunked 作者:王清培(Plen wang) 滬江Java資深架構師 背景 spring boot 建立的預設 spring mvc 專案 整合 JAX-RS 規範框

nginx反向代理配置多個服務時的session問題

今天在部署nginx的時候出現了一個問題: 當nginx中中只部署一個後端服務的時候可以正常登入,但是當部署兩個後端服務的時候,就會出現登入失敗的問題。 一開始查詢,從控制檯檢視,登入使用者名稱傳了個亂碼過去,從而設定了charset屬性,但是還是登入不上去。但將負載均衡