1. 程式人生 > >Netty入門之WebSocket初體驗

Netty入門之WebSocket初體驗

說一說IO通訊

BIO通訊:

BIO即同步阻塞模式一請求一應答的通訊模型,該模型最大的問題就是缺乏彈性伸縮能力,當客戶端併發訪問量增加後,服務端的執行緒個數和客戶端併發訪問數呈1:1的正比關係,由於執行緒是JAVA虛擬機器非常寶貴的系統資源,當執行緒數膨脹之後,系統的效能將急劇下降,隨著併發訪問量的繼續增大,系統會發生執行緒堆疊溢位、建立新執行緒失敗等問題,並最終導致程序宕機或者僵死,不能對外提供服務。

BIO的服務端通訊模型:

  • 採用BIO通訊模型的服務端,通常由一個獨立的Acceptor執行緒負責監聽客戶端的連線
  • 當接收到客戶端的連線請求後,會為每一個客戶端請求建立新的執行緒進行請求的處理
  • 處理完成後通過輸出流返回資訊給客戶端,響應完成後銷燬執行緒
  • 典型的一請求一應答的通訊模型
  • 缺點:缺乏彈性伸縮能力

一個執行緒處理一個Socket連線,因為Java Socket是通過InputStream和OutputStream來進行網路讀寫操作,而這倆個的讀寫都是阻塞模式,所以當某個Socket鏈路的讀寫操作沒有完成時,排在後面的Socket連線是無法得到處理的,長時間的等待可能會導致超時,因此在同步阻塞模式下,通常會採用一個Socket鏈路獨佔一個執行緒的模型。

BIO通訊模型圖:
Netty入門之WebSocket初體驗


偽非同步IO通訊(BIO優化版本):

為了解決同步阻塞IO(BIO)所面臨的一個鏈路需要一個執行緒處理的問題,後來有人對它的執行緒模型進行了優化,後端通過一個執行緒池來處理多個客戶端的請求接入,形成客戶端個數M:執行緒池最大執行緒數N的比例關係,其中M可以遠遠大於N,通過執行緒池可以靈活的調配執行緒資源,設定執行緒的最大值,防止由於海量併發接入導致執行緒耗盡。

偽非同步IO通訊特性:

  • 採用執行緒池和任務佇列實現
  • 執行緒池負責連線
  • M請求N應答
  • 執行緒池阻塞

當有新的客戶端接入的時候,將客戶端的Socket封裝成一個Task(該任務實現java.lang.Runnable介面)投遞到後端的執行緒池中進行處理,JDK的執行緒池維護一個訊息佇列和N個活躍執行緒對訊息佇列中的任務進行處理。由於執行緒池可以設定訊息佇列的大小和最大執行緒數,因此,它的資源佔用是可控的,無論多少個客戶端併發訪問,都不會導致資源的耗盡和宕機。

但是偽非同步IO通訊也有其缺陷,當有大量客戶端請求的時候,隨著併發訪問量的增長,偽非同步IO就會造成執行緒池阻塞。

偽非同步IO通訊模型圖:
Netty入門之WebSocket初體驗


NIO通訊:

NIO是非阻塞IO(Non-block IO),也有人稱之為New IO,因為它相對於之前的IO類庫是新增的,所以被稱為New IO,這是它的官方叫法。它是在 JDK 1.4 中引入的。NIO 彌補了原來同步阻塞I/O 的不足,它在標準 Java 程式碼中提供了高速的、面向塊的 I/O。通過定義包含資料的類,以及通過以塊的形式處理這些資料,NIO 不用使用本機程式碼就可以利用底層優化,這是原來的 I/O 包所無法做到的。

NIO之緩衝區Buffer:

我們首先介紹緩衝區(Buffer)的概念,Buffer 是一個物件, 它包含一些要寫入或者要讀出的資料。 在 NIO類庫 中加入 Buffer 物件,體現了新庫與原 I/O 的一個重要區別。在面向流的 I/O 中,我們將資料直接寫入或者將資料直接讀到 Stream 物件中。

在 NIO 庫中,所有資料都是用緩衝區進行處理的。在讀取資料時,它是直接讀到緩衝區中;在寫入資料時,它也是寫入到緩衝區中。任何時候訪問 NIO 中的資料,我們都是通過緩衝區進行讀寫操作。

緩衝區實質上是一個數組。通常它是一個位元組陣列(ByteBuffer),也可以使用其它種類的陣列。但是一個緩衝區不僅僅是一個數組,緩衝區提供了對資料的結構化訪問,及維護讀寫位置(limit)等資訊。

最常用的緩衝區是ByteBuffer,一個ByteBuffer提供了一組功能用於操作byte陣列。除了ByteBuffer,還有其它的一些緩衝區,事實上,每一種Java基本型別(除了Boolean型別)都對應有一種緩衝區,如下所示:

NIO之通道Channel:

Channel是一個通道,可以通過它讀取和寫入資料,它就像自來水管一樣,網路資料通過Channel讀取和寫入。通道與流的不同之處在於通道是雙向的。而流只是在一個方向上移動(一個流必須是 InputStream 或者 OutputStream 的子類),而通道可以用於讀、寫或者同時用於讀寫。

NIO之多路複用器Selector:

它是JAVA NIO程式設計的基礎,熟練的掌握Selector對於掌握NIO程式設計至關重要。多路複用器提供選擇已經就緒的任務的能力。簡單來講,Selector會不斷的輪詢註冊在其上的Channel,如果某個Channel上面有新的TCP連線接入、讀和寫事件,這個Channel就處於就緒狀態,會被Selector輪詢出來,然後通過SelectionKey可以獲取就緒Channel的集合進行後續的IO操作。

一個多路複用器Selector可以同時輪詢多個Channel,由於JDK使用了epoll()代替傳統的select實現,所以它並沒有最大連線控制代碼1024/2048的限制。這也就意味著只需要一個執行緒負責Selector的輪詢,就可以接入成千上萬的客戶端,這的確是一個巨大的改進。


AIO通訊:

與NIO不同,aio需要一個連線註冊讀寫事件和回撥方法,當進行讀寫操作時,只須直接呼叫API的read或write方法即可。這兩種方法均為非同步的,對於讀操作而言,當有流可讀取時,作業系統會將可讀的流傳入read方法的緩衝區,並通知應用程式;對於寫操作而言,當作業系統將write方法傳遞的流寫入完畢時,作業系統主動通知應用程式。 即可以理解為,read/write方法都是非同步的,完成後會主動呼叫回撥函式。

AIO非同步通道提供了兩種方式獲取操作結果:

  1. 通過java.util.concurrent.Future類來表示非同步操作的結果
  2. 在執行非同步操作的時候傳入一個java.nio.channels.CompletionHandler介面的實現類作為操作完成的回撥。 AIO的非同步套接字通道是真正的非同步非阻塞IO,對應於UNIX網路程式設計中的事件驅動IO(AIO),它不需要通過多路複用器(Selector)對註冊的通道進行輪詢操作即可實現非同步讀寫,從而簡化了NIO的程式設計模型。

AIO通訊的特性:

  • 連線註冊讀寫事件和回撥函式
  • 讀寫方法非同步
  • 主動通知程式

四種IO對比:

- 同步阻塞I/O(BIO) 偽非同步I/O 非阻塞I/O(NIO) 非同步I/O(AIO)
客戶端個數:IO執行緒 1 : 1 M : N(其中M可以大於N) M : 1(1個IO執行緒處理多個客戶端連線) M : 0(不需要啟動額外的IO執行緒,被動回撥)
IO型別(阻塞) 阻塞IO 阻塞IO 非阻塞IO 非阻塞IO
IO型別(同步) 同步IO 同步IO 同步IO(IO多路複用) 非同步IO
API使用難度 簡單 簡單 非常複雜 複雜
除錯難度 簡單 簡單 複雜 複雜
可靠性 非常差
吞吐量

Netty入門

在開始本節之前,我先講一個親身經歷的故事:曾經有兩個專案組同時用到了NIO程式設計技術,一個專案組選擇自己開發NIO服務端,直接使用JDK原生的API,結果2個多月過去了,他們的NIO服務端始終無法穩定,問題頻出。由於NIO通訊是它們的核心元件之一,因此,專案的進度受到了嚴重的影響,領導對此非常惱火。另一個專案組直接使用Netty作為NIO服務端,業務的定製開發工作量非常小,測試表明,功能和效能都完全達標,專案組幾乎沒有在NIO服務端上花費額外的時間和精力,專案進展也非常順利。

這兩個專案組的不同遭遇提醒我們:開發出高質量的NIO程式並不是一件簡單的事情,除去NIO固有的複雜性和BUG不談,作為一個NIO服務端需要能夠處理網路的閃斷、客戶端的重複接入、客戶端的安全認證、訊息的編解碼、半包讀寫等等,如果你沒有足夠的NIO程式設計經驗積累,一個NIO框架的穩定往往需要半年甚至更長的時間。更為糟糕的是一旦在生產環境中發生問題,往往會導致跨節點的服務呼叫中斷,嚴重的可能會導致整個叢集環境都不可用,需要重啟伺服器,這種非正常停機會帶來巨大的損失。

從可維護性角度看,由於NIO採用了非同步非阻塞程式設計模型,而且是一個IO執行緒處理多條鏈路,它的除錯和跟蹤非常麻煩,特別是生產環境中的問題,我們無法有效除錯和跟蹤,往往只能靠一些日誌來輔助分析,定位難度很大。

不選擇JAVA原生NIO程式設計的原因:

  1. NIO的類庫和API繁雜,使用麻煩,你需要熟練掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等
  2. 需要具備其它的額外技能做鋪墊,例如熟悉Java多執行緒程式設計,因為NIO程式設計涉及到Reactor模式,你必須對多執行緒和網路程式設計非常熟悉,才能編寫出高質量的NIO程式
  3. 可靠效能力補齊,工作量和難度都非常大。例如客戶端面臨斷連重連、網路閃斷、半包讀寫、失敗快取、網路擁塞和異常碼流的處理等等,NIO程式設計的特點是功能開發相對容易,但是可靠效能力補齊工作量和難度都非常大
  4. JDK NIO的BUG,例如臭名昭著的epoll bug,它會導致Selector空輪詢,最終導致CPU佔用100%。官方聲稱在JDK1.6版本的update18修復了該問題,但是直到JDK1.7版本該問題仍舊存在,只不過該bug發生概率降低了一些而已,它並沒有被根本解決。該BUG以及與該BUG相關的問題單如下:

異常堆疊如下:

java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:210)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69)
        - locked <0x0000000750928190> (a sun.nio.ch.Util$2)
        - locked <0x00000007509281a8> (a java.util.Collections$UnmodifiableSet)
        - locked <0x0000000750946098> (a sun.nio.ch.EPollSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80)
        at net.spy.memcached.MemcachedConnection.handleIO(MemcachedConnection.java:217)
        at net.spy.memcached.MemcachedConnection.run(MemcachedConnection.java:836)

由於上述原因,在大多數場景下,我不建議大家直接使用JDK的NIO類庫,除非你精通NIO程式設計或者有特殊的需求,在絕大多數的業務場景中,我們可以使用NIO框架Netty來進行NIO程式設計,它既可以作為客戶端也可以作為服務端,同時支援UDP和非同步檔案傳輸,功能非常強大。

為什麼選擇Netty:

Netty是業界最流行的NIO框架之一,它的健壯性、功能、效能、可定製性和可擴充套件性在同類框架中都是首屈一指的,它已經得到成百上千的商用專案驗證,例如Hadoop的RPC框架avro使用Netty作為底層通訊框架。很多其它業界主流的RPC框架,也使用Netty來構建高效能的非同步通訊能力。

通過對Netty的分析,我們將它的優點總結如下:

  1. API使用簡單,開發門檻低;
  2. 功能強大,預置了多種編解碼功能,支援多種主流協議;
  3. 定製能力強,可以通過ChannelHandler對通訊框架進行靈活的擴充套件;
  4. 效能高,通過與其它業界主流的NIO框架對比,Netty的綜合性能最優;
  5. 成熟、穩定,Netty修復了已經發現的所有JDK NIO BUG,業務開發人員不需要再為NIO的BUG而煩惱;
  6. 社群活躍,版本迭代週期短,發現的BUG可以被及時修復,同時,更多的新功能會被加入;
  7. 經歷了大規模的商業應用考驗,質量已經得到驗證。在網際網路、大資料、網路遊戲、企業應用、電信軟體等眾多行業得到成功商用,證明了它可以完全滿足不同行業的商業應用。

正是因為這些優點,Netty逐漸成為Java NIO程式設計的首選框架。


WebSocket入門

WebSocket 是什麼?

  • WebSocket 是一種網路通訊協議。RFC6455 定義了它的通訊標準。
  • WebSocket 是 HTML5 開始提供的一種在單個 TCP 連線上進行全雙工通訊的協議。
  • WebSocket 是解決客戶端與服務端實時通訊而產生的技術

為什麼需要 WebSocket ?

瞭解計算機網路協議的人,應該都知道:HTTP 協議是一種無狀態的、無連線的、單向的應用層協議。它採用了請求/響應模型。通訊請求只能由客戶端發起,服務端對請求做出應答處理。這種通訊模型有一個弊端:HTTP 協議無法實現伺服器主動向客戶端發起訊息。

這種單向請求的特點,註定瞭如果伺服器有連續的狀態變化,客戶端要獲知就非常麻煩。大多數 Web 應用程式將通過頻繁的非同步JavaScript和XML(AJAX)請求實現長輪詢。輪詢的效率低,非常浪費資源(因為必須不停連線,或者 HTTP 連線始終開啟)。
Netty入門之WebSocket初體驗

因此,工程師們一直在思考,有沒有更好的方法。WebSocket 就是這樣發明的。WebSocket 連線允許客戶端和伺服器之間進行全雙工通訊,以便任一方都可以通過建立的連線將資料推送到另一端。WebSocket 只需要建立一次連線,就可以一直保持連線狀態。這相比於輪詢方式的不停建立連線顯然效率要大大提高。
Netty入門之WebSocket初體驗

WebSocket建立連線步驟:

  1. 客戶端發起握手請求
  2. 服務端響應請求
  3. 連線建立

WebSocket的優點:

  • 節省通訊開銷
  • 伺服器主動傳送資料給客戶端
  • 實時通訊,適合實現聊天室等功能

WebSocket生命週期:

  1. 開啟事件:@OnOpen 此事件發生在端點上建立新連線時並且在任何其他事件發生之前
  2. 訊息事件:@OnMessage 此事件接收WebSocket對話中另一端傳送的訊息。
  3. 錯誤事件:@OnError 此事件在WebSocket連線或者端點發生錯誤時產生
  4. 關閉事件:@OnClose 此事件表示WebSocket端點的連線目前部分地關閉,它可以由參與連線的任意一個端點發出
  • @OnOpen 指示當此端點建立新的連線時呼叫此方法。此事件伴隨著三部分資訊:WebSocket Session物件,用於表示已經建立好的連線;配置物件(EndpointConfig的例項),包含了用來配置端點的資訊;一組路徑引數,用於開啟階段握手時WebSocket端點入站匹配URI。@OnOpen註解的方法是沒有任何返回值的公有方法,這些方法有一個可選的Session引數、一個可選的EndpointConfig引數,以及任意數量的被@PathParam註解的String引數。

  • @OnMessage 處理入站的訊息。java培訓機構裡面是這樣講解的,連線上的訊息將以3種基本形式抵達:文字訊息、二進位制訊息或者Pong訊息。最基本的形式是選擇使用帶String引數的方法來處理文字訊息;使用ByteBuffer或者是byte[]引數的方法來處理二進位制文字;若你的訊息僅僅是處理Pong訊息,則可以使用Java WebSocket API中的PongMessage介面的一個例項。當然可以使用一個boolean型引數表示對到來的訊息進行分片。當boolean型引數值為false時,表示後續還有整個文字訊息序列中的更多訊息分片的到來,當設定為true時,表示當前訊息是訊息分片中最後一個分片。訊息的處理還有很多選項,比如使用JavaI/O,甚至可以讓WebSocket實現把入站訊息轉換成自己選擇的物件。這個將在訊息通訊基礎中提到,WebSocket應用一般是非同步的雙向訊息。因此通過@OnMessage註解的此類方法上有一個額外選項:方法可以有返回值或者返回為空。當使用@OnMessage註解的方法有返回型別時,WebSocket實現立即將返回值作為訊息返回給剛剛在方法中處理的訊息的傳送者。

  • @OnError 可以處理WebSocket實現處理入站訊息時發生的任何異常。處理入站訊息時,可能會發生3中基本的錯誤型別。首先,WebSocket實現產生的錯誤可能會發生,這些異常屬於SessionException型別,其次,錯誤可能會發生在當WebSocket實現試圖將入站訊息解碼成開發人員所需要的物件時。此類錯誤都是DecodeException型別。最後是由WebSocket端點的其他方法產生的執行時錯誤。WebSocket實現將記錄WebSocket端點操作過程中產生的任何異常。

  • @OnClose 它對於在WebSocket連線關閉時做其他的通用清理工作。@OnClose 可以用來註解多種不同型別的方法來關閉事件。

WebSocket關閉連線的兩種方式:

  1. 伺服器關閉底層TCP連線
  2. 客戶端發起TCP Close

參考:

https://www.cnblogs.com/jingmoxukong/p/7755643.html
http://www.ruanyifeng.com/blog/2017/05/websocket.html
https://www.cnblogs.com/fuqiang88/p/5956363.html


使用Netty實現WebSocket服務端

功能介紹:

  • Netty開發服務端
  • HTML實現客戶端
  • 實現服務端與客戶端的實時互動

pom.xml檔案配置的依賴項如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.zero.netty</groupId>
    <artifactId>websocket</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <!-- netty -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.31.Final</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.22</version>
        </dependency>

        <!-- logger -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.2.3</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>
    </dependencies>
</project>

定義一個全域性配置類:

package org.zero.netty.websocket.config;

import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

/**
 * @program: Netty-WebSocket
 * @description: 工程的全域性配置類
 * @author: 01
 * @create: 2018-11-03 17:28
 **/
public class NettyConfig {

    /**
     * 儲存每一個客戶端接入進來時的channel物件
     */
    public final static ChannelGroup GROUP =
            new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}

建立一個介面,用作專案中的全域性常量定義:

package org.zero.netty.websocket.config;

/**
 * 專案中的全域性常量定義
 *
 * @author 01
 */
public interface Constants {
    String WEB_SOCKET_URL = "ws://localhost:8080/websocket";
    String WEBSOCKET_STR = "websocket";
    String UPGRADE_STR = "Upgrade";
    int OK_CODE = 200;

    String HTTP_CODEC = "http-codec";
    String AGGREGATOR = "aggregator";
    String HTTP_CHUNKED = "http-chunked";
    String HANDLER = "handler";
    int MAX_CONTENT_LENGTH = 65536;
    int PORT = 8080;
}

編寫接收處理並響應客戶端WebSocket請求的核心業務處理類,程式碼如下:

package org.zero.netty.websocket.core;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.zero.netty.websocket.config.Constants;
import org.zero.netty.websocket.config.NettyConfig;

import java.util.Date;

/**
 * @program: Netty-WebSocket
 * @description: 接收處理並響應客戶端WebSocket請求的核心業務處理類
 * @author: 01
 * @create: 2018-11-03 17:34
 **/
@Slf4j
@ChannelHandler.Sharable
public class MyWebsocketHandler extends SimpleChannelInboundHandler<Object> {
    private WebSocketServerHandshaker handshaker;

    /**
     * 服務端處理客戶端WebSocket請求的核心方法
     *
     * @param ctx ctx
     * @param msg msg
     * @throws Exception Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 處理客戶端向服務端發起http握手請求的業務
        if (msg instanceof FullHttpRequest) {
            handHttpRequest(ctx, (FullHttpRequest) msg);
        }
        // 處理websocket連線
        else if (msg instanceof WebSocketFrame) {
            handWebsocketFrame(ctx, (WebSocketFrame) msg);
        }
    }

    /**
     * 處理客戶端與服務端之間的websocket業務
     *
     * @param ctx   ctx
     * @param frame frame
     */
    private void handWebsocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
        // 判斷是否是關閉websocket的指令
        if (frame instanceof CloseWebSocketFrame) {
            handshaker.close(ctx.channel(), ((CloseWebSocketFrame) frame).retain());
            log.debug("接收到關閉websocket的指令");
        }

        // 判斷是否是ping訊息
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            log.debug("接收到ping訊息");
            return;
        }

        // 判斷是否是二進位制訊息,如果是二進位制訊息,則丟擲異常
        if (!(frame instanceof TextWebSocketFrame)) {
            log.error("目前不支援二進位制訊息");
            throw new UnsupportedOperationException("【" + this.getClass().getName() + "】不支援的訊息");
        }

        // 獲取客戶端向服務端傳送的訊息
        String requestStr = ((TextWebSocketFrame) frame).text();
        log.debug("服務端收到客戶端的訊息: {}", requestStr);

        // 返回應答訊息
        String responseStr = new Date().toString()
                + ctx.channel().id() +
                " ===>>> " + requestStr;
        TextWebSocketFrame tws = new TextWebSocketFrame(responseStr);

        // 群發,服務端向每個連線上來的客戶端群發訊息
        NettyConfig.GROUP.writeAndFlush(tws);
        log.debug("群發訊息完成. 群發的訊息為: {}", responseStr);
    }

    /**
     * 處理客戶端向服務端發起http握手請求的業務
     *
     * @param ctx     ctx
     * @param request request
     */
    private void handHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) {
        String upgrade = request.headers().get(Constants.UPGRADE_STR);
        // 非websocket的http握手請求處理
        if (!request.decoderResult().isSuccess() || !Constants.WEBSOCKET_STR.equals(upgrade)) {
            sendHttpResponse(ctx, request,
                    new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            log.warn("非websocket的http握手請求");
            return;
        }

        WebSocketServerHandshakerFactory wsFactory =
                new WebSocketServerHandshakerFactory(Constants.WEB_SOCKET_URL, null, false);
        handshaker = wsFactory.newHandshaker(request);
        if (handshaker == null) {
            // 響應不支援的請求
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
            log.warn("不支援的請求");
        } else {
            handshaker.handshake(ctx.channel(), request);
            log.debug("正常處理");
        }
    }

    /**
     * 服務端主動向客戶端傳送訊息
     *
     * @param ctx      ctx
     * @param request  request
     * @param response response
     */
    private void sendHttpResponse(ChannelHandlerContext ctx,
                                  FullHttpRequest request,
                                  DefaultFullHttpResponse response) {
        // 不成功的響應
        if (response.status().code() != Constants.OK_CODE) {
            ByteBuf buf = Unpooled.copiedBuffer(response.status().toString(), CharsetUtil.UTF_8);
            response.content().writeBytes(buf);
            buf.release();
            log.warn("不成功的響應");
        }

        // 服務端向客戶端傳送資料
        ChannelFuture channelFuture = ctx.channel().writeAndFlush(response);
        if (!HttpUtil.isKeepAlive(request) ||
                response.status().code() != Constants.OK_CODE) {
            // 如果是非Keep-Alive,或不成功都關閉連線
            channelFuture.addListener(ChannelFutureListener.CLOSE);
            log.info("websocket連線關閉");
        }
    }

    /**
     * 客戶端與服務端建立連線的時候呼叫
     *
     * @param ctx ctx
     * @throws Exception Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 將channel新增到channel group中
        NettyConfig.GROUP.add(ctx.channel());
        log.info("客戶端與服務端連線開啟...");
    }

    /**
     * 客戶端與服務端斷開連線的時候呼叫
     *
     * @param ctx ctx
     * @throws Exception Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // 從channel group中移除這個channel
        NettyConfig.GROUP.remove(ctx.channel());
        log.info("客戶端與服務端關閉連線...");
    }

    /**
     * 服務端接收客戶端傳送過來的資料結束之後呼叫
     *
     * @param ctx ctx
     * @throws Exception Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // 清空資料
        ctx.flush();

        log.info("flush資料 {}", ctx.name());
    }

    /**
     * 工程出現異常的時候呼叫
     *
     * @param ctx   ctx
     * @param cause cause
     * @throws Exception Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 列印異常堆疊
        cause.printStackTrace();
        // 主動關閉連線
        ctx.close();
        log.error("WebSocket連線異常");
    }
}

定義一個初始化類,用於初始化連線時的各個元件。程式碼如下:

package org.zero.netty.websocket.core;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.zero.netty.websocket.config.Constants;

/**
 * @program: Netty-WebSocket
 * @description: 初始化連線時的各個元件
 * @author: 01
 * @create: 2018-11-03 21:53
 **/
public class MyWebsocketChannelHandler extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(Constants.HTTP_CODEC, new HttpServerCodec());
        ch.pipeline().addLast(Constants.AGGREGATOR, new HttpObjectAggregator(Constants.MAX_CONTENT_LENGTH));
        ch.pipeline().addLast(Constants.HTTP_CHUNKED, new ChunkedWriteHandler());
        ch.pipeline().addLast(Constants.HANDLER, new MyWebsocketHandler());
    }
}

最後我們還需要編寫程式的啟動類,負責啟動應用。程式碼如下:

package org.zero.netty.websocket;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.zero.netty.websocket.config.Constants;
import org.zero.netty.websocket.core.MyWebsocketChannelHandler;

/**
 * @program: Netty-WebSocket
 * @description: 程式的入口,負責啟動應用
 * @author: 01
 * @create: 2018-11-03 22:06
 **/
@Slf4j
public class Main {

    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workGroup);
            bootstrap.channel(NioServerSocketChannel.class);
            bootstrap.childHandler(new MyWebsocketChannelHandler());
            log.info("服務端開啟等待客戶端連線...");

            Channel channel = bootstrap.bind(Constants.PORT).sync().channel();
            channel.closeFuture().sync();
        } catch (Exception e) {
            log.error("服務端啟動失敗", e);
        } finally {
            // 退出程式
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
            log.info("服務端已關閉");
        }
    }
}

HTML實現客戶端

在上一小節中,我們完成了服務端的開發,這一小節我們來編寫一個簡單的html網頁作為我們的客戶端。程式碼如下:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8" http-equiv="Content-Type" content="text/html;charset=utf-8">
    <title>WebSocket客戶端</title>
    <script type="text/javascript">
        var socket;
        if (!window.WebSocket) {
            window.WebSocket = window.MozWebSocket;
        }

        if (window.WebSocket) {
            socket = new WebSocket("ws://localhost:8080/websocket");
            socket.onmessage = function (ev) {
                var ta = document.getElementById('responseContent');
                ta.value += ev.data + "\r\n";
            };

            socket.onopen = function (ev) {
                var ta = document.getElementById('responseContent');
                ta.value = "您當前的瀏覽器支援WebSocket, 請進行後續操作\r\n";
            };

            socket.onclose = function (ev) {
                var ta = document.getElementById('responseContent');
                ta.value = "WebSocket連線已經關閉\r\n";
            };

            socket.onerror = function (ev) {
                var ta = document.getElementById('responseContent');
                ta.value = ev.data + "WebSocket連線異常\r\n";
            };
        } else {
            alert("您的瀏覽器不支援WebSocket");
        }

        function send(message) {
            if (!window.WebSocket) {
                return;
            }

            if (socket.readyState === WebSocket.OPEN) {
                socket.send(message);
            } else {
                alert("WebSocket連線建立失敗, 請重試");
                console.log(socket.readyState)
            }
        }
    </script>
</head>
<body>
<form onsubmit="return false;">
    <label for="message">
        <input type="text" id="message" name="message" value=""/>
    </label>
    <br><br>
    <input type="button" value="傳送WebSocket請求訊息" onclick="send(this.form.message.value)"/>
    <br><br>
    <hr color="red" size="5">
    <h2>客戶端接收到服務端返回的應答訊息: </h2>
    <label for="responseContent">
        <textarea id="responseContent" style="width: 1024px;height: 300px"></textarea>
    </label>
</form>
</body>
</html>

在瀏覽器中開啟,效果如下:
Netty入門之WebSocket初體驗

啟動服務端,重新整理網頁:
Netty入門之WebSocket初體驗

傳送訊息:
Netty入門之WebSocket初體驗

至此我們就成功使用netty完成了一個websocket通訊的demo,該demo原始碼的GitHub地址如下:

https://github.com/Binary-ZeroOne/netty-websocket-demo