1. 程式人生 > >Netty4.1.1實現群聊功能的程式碼詳細解析

Netty4.1.1實現群聊功能的程式碼詳細解析

學習Netty已經有一段時間了,其實過程也很坎坷。一開始上手就看文件學習,發現根本看不懂,畢竟中介軟體類別的東西比之前學習WEB框架更具有挑戰性。那怎麼辦呢?當然還是需要先熟悉Java NIO,如果通讀(不要求深入理解)相關API文件,即當對NIO存在一個較為清晰的認識後,回過頭來再次學習Netty就會發現容易理解很多,這可能就是事半功倍吧。

雖然Netty的第一個上手專案本來就是要實現一個客戶端服務端通訊業務,但是當我們可以靠自己敲出一個有群聊功能的小程式加強前面學習過的知識點的話還是相當不錯的。

參考文章:https://waylau.com/netty-chat/

程式碼當然是要上的,我的大部門程式碼都存在註釋,當然需要深入講解的地方我會展開來講以加深記憶。

雖然最新的Netty版本是5.x,但是考慮到Netty4的普及性以及可參考資料的豐富性,此處演示程式碼依賴環境為netty4.1.1

程式結構如下:

服務端業務邏輯處理程式:SimpleChatServerHandler

服務端通道管道初始化:SimpleChatServerInitial

服務端啟動:SimpleChatServer

客戶端業務邏輯處理程式:SimpleChatClientHandler

客戶端通道管道初始化:SimpleChatClientInitial

客戶端啟動類:SimpleChatClient

SimpleChatServerHandler

package netty.in.action.chatRoom;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
public class SimpleChatServerHandler extends SimpleChannelInboundHandler<String> { // 靜態channel組用來存放客戶端連線的channel public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); // 處理客戶端發來的訊息 @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception { Channel incoming = channelHandlerContext.channel(); for (Channel channel : channels) { if (channel != incoming) { channel.writeAndFlush("[" + incoming.remoteAddress() + "]:" + s + "\n"); } else { channel.writeAndFlush("you:" + s + "\n"); } } } // 處理使用者連線的方法 @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // 獲取客戶端連線的channel Channel incoming = ctx.channel(); // 通知目前已經連線的所有使用者新使用者的連線 for (Channel channel : channels) { channel.writeAndFlush("[USER]-" + incoming.remoteAddress() + "加入\n"); } // 將channel加入channelGroup channels.add(incoming); } // 處理使用者斷開連線的方法 @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel leaving = ctx.channel(); for (Channel channel : channels) { channel.writeAndFlush("[USER]-" + leaving.remoteAddress() + "離開\n"); } channels.remove(leaving); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); System.out.println("[USER]-" + channel.remoteAddress() + "線上\n"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); System.out.println("[USER]-" + channel.remoteAddress() + "掉線\n"); } // 異常處理 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { Channel channel = ctx.channel(); System.out.println("[USER]-" + channel.remoteAddress() + "異常\n"); cause.printStackTrace(); // 直接關閉連線 ctx.close(); } }

上面的程式碼中業務邏輯程式碼都很好理解,如果不是很懂建議看看文章開頭的參考文章。

需要去理解的是ChannelGroup,這是什麼?業務邏輯中將客戶端的channel存在其中,並且依賴其完成廣播行為。看看DefaultChannelGroup的原始碼,關注**add()**方法:

public boolean add(Channel channel) {
        ConcurrentMap<ChannelId, Channel> map = channel instanceof ServerChannel ? this.serverChannels : this.nonServerChannels;
        boolean added = map.putIfAbsent(channel.id(), channel) == null;
        if (added) {
            channel.closeFuture().addListener(this.remover);
        }
        if (this.stayClosed && this.closed) {
            channel.close();
        }
        return added;
    }

其實一目瞭然,底層是一個ConcurrentMap,這也就解釋了為什麼DefaultChannelGroup能做到執行緒安全。

判斷是否是ServerChannel的子類,然後分開存放。

Map的put與putIfAbsent區別:

put在放入資料時,如果放入資料的key已經存在與Map中,最後放入的資料會覆蓋之前存在的資料,

而putIfAbsent在放入資料時,如果存在重複的key,那麼putIfAbsent不會放入值( 如果傳入key對應的value已經存在,就返回存在的value,不進行替換。如果不存在,就新增key和value,返回null )。

接下來關注GlobalEventExecutor,初始化DefaultChannelGroup為什麼要傳入這樣的例項?

GlobalEventExecutor是一種單執行緒單例執行器,它會自動啟動,並且當任務佇列中沒有任務時掛起1秒鐘後停止,該執行者無法排程大量任務 。

public static final GlobalEventExecutor INSTANCE;
static {
        SCHEDULE_QUIET_PERIOD_INTERVAL = TimeUnit.SECONDS.toNanos(1L);
        INSTANCE = new GlobalEventExecutor();
    }

執行器執行緒在netty是普遍存在的,主要用來處理task。這些task是多種多樣的,比如write或者close。其實觀察netty的原始碼就可以發現,許多方法走到最後總是繞不開一個executor。本來在netty中所有的channel都有一個executor,它是什麼呢?

public EventExecutor executor() {
        return (EventExecutor)(this.executor == null ? this.channel().eventLoop() : this.executor);
    }

回到了起點嘿嘿。每個Channel繫結一個EventLoop不會被改變,很多Channel會共享同一個EventLoop。

回過頭來看GlobalEventExecutor,字面意思是全域性事件執行器。其實舉一個例子就很好理解了,在廣播時我們使用瞭如下程式碼:

for (Channel channel : channels) {
    if (channel != incoming) {
        channel.writeAndFlush("[" + incoming.remoteAddress() + "]:" + s + "\n");
    } else {
        channel.writeAndFlush("you:" + s + "\n");
    }
}

其實一行程式碼就可以搞定:

channels.writeAndFlush(s + "\n"); // 注意末尾的\n不要落了,後面會講為什麼

之所以要向上面那樣寫的複雜,主要是為了區分開發給別人和發給自己的訊息。下面這一行程式碼的writeAndFlush這個task最終就是交給GlobalEventExecutor來執行的。

SimpleChatServerInitial

package netty.in.action.chatRoom;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;


public class SimpleChatServerInitial extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
        channel.pipeline()
                .addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()))
                .addLast("encoder", new StringEncoder())
                .addLast("decoder", new StringDecoder())
                .addLast("handler", new SimpleChatServerHandler());

        System.out.println("[USER]-" + channel.remoteAddress() + "連線上");
    }
}

關於編碼器解碼器大家可以積極參考相關文件,其實它們就是一類特殊的handlers。

傳送或接收訊息後,Netty必須將訊息資料從一種形式轉化為另一種。接收訊息後,需要將訊息從位元組碼轉成Java物件(由某種解碼器解碼);傳送訊息前,需要將Java物件轉成位元組(由某些型別的編碼器進行編碼)。這種轉換一般發生在網路程式中,因為網路上只能傳輸位元組資料。
有多種基礎型別的編碼器和解碼器,要使用哪種取決於想實現的功能。要弄清楚某種型別的編解碼器,從類名就可以看出,如“ByteToMessageDecoder”、“MessageToByteEncoder”,還有Google的協議“ProtobufEncoder”和“ProtobufDecoder”。
嚴格的說其他handlers可以做編碼器和介面卡,使用不同的Adapter classes取決你想要做什麼。如果是解碼器則有一個ChannelInboundHandlerAdapter或ChannelInboundHandler,所有的解碼器都繼承或實現它們。“channelRead”方法/事件被覆蓋,這個方法從入站(inbound)通道讀取每個訊息。重寫的channelRead方法將呼叫每個解碼器的“decode”方法並通過ChannelHandlerContext.fireChannelRead(Object msg)
傳遞給ChannelPipeline中的下一個ChannelInboundHandler。
類似入站訊息,當你傳送一個訊息出去(出站)時,除編碼器將訊息轉成位元組碼外還會轉發到下一個ChannelOutboundHandler。

真正需要注意的負責處理TCP通訊中的粘包拆包問題的特殊的Decoder。

什麼是粘包拆包?

假設客戶端分別傳送兩個資料包D1,D2個服務端,但是傳送過程中資料是何種形式進行傳播這個並不清楚,分別有下列4種情況

  • 服務端一次接受到了D1和D2兩個資料包,兩個包粘在一起,稱為粘包;
  • 服務端分兩次讀取到資料包D1和D2,沒有發生粘包和拆包;
  • 服務端分兩次讀到了資料包,第一次讀到了D1和D2的部分內容,第二次讀到了D2的剩下部分,這個稱為拆包;
  • 伺服器分三次讀到了資料部分,第一次讀到了D1包,第二次讀到了D2包的部分內容,第三次讀到了D2包的剩下內容。

更新詳細的解釋請前往:https://blog.csdn.net/a953713428/article/details/67100345 (Netty學習(四)-TCP粘包和拆包)。

明白了粘包拆包,總是要找到解決方式:

上層應用協議為了對訊息進行區分,一般採用如下4種方式

  • 訊息長度固定,累計讀取到訊息長度總和為定長Len的報文之後即認為是讀取到了一個完整的訊息。計數器歸位,重新讀取。
  • 將回車換行符作為訊息結束符。
  • 將特殊的分隔符作為訊息分隔符,回車換行符是他的一種。
  • 通過在訊息頭定義長度欄位來標識訊息總長度。

這裡使用的是DelimiterBasedFrameDecoder,它對應的是第三種解決方式。這也就是為什麼前文每條訊息的後面我們都要加上特殊字元**\n**的原因。如果想要指定其他特殊字元作為訊息分割符可以如下:

ByteBuf delimiter = Unpooled.copiedBuffer("\t".getBytes());
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192,delimiter)); 

其他解決方式請參考:https://blog.csdn.net/a953713428/article/details/68231119 (Netty學習(五)-DelimiterBasedFrameDecoder)

SimpleChatServer

package netty.in.action.chatRoom;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class SimpleChatServer {

    private int port;

    public SimpleChatServer(int port) {
        this.port = port;
    }

    public void run() {
        // 建立兩個EventLoopGroup,一個用來處理客戶端連線,一個用來處理訊息
        EventLoopGroup connectGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 服務端引導
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(connectGroup, workerGroup)
                    // 指定IO方式
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new SimpleChatServerInitial())
                    // 指定最大連線數
                    .option(ChannelOption.SO_BACKLOG, 128)
                    // 會向兩個小時沒有傳送過過訊息的客戶端傳送一個活動探測客戶端狀態
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            System.out.println("服務端已經啟動");
            ChannelFuture future = bootstrap.bind(port).sync();
            // 因為服務端啟動後一直會阻塞,實際上這一句程式碼是不會執行到的
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            connectGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
            System.out.println("服務端已經關閉");
        }
    }

    public static void main(String[] args) {
        new SimpleChatServer(8889).run();
    }
}

上面的程式碼很常規,需要解釋的就是ChannelOption各個引數:

1、ChannelOption.SO_BACKLOG

​ ChannelOption.SO_BACKLOG對應的是tcp/ip協議listen函式中的backlog引數,函式listen(int socketfd,int backlog)用來初始化服務端可連線佇列,服務端處理客戶端連線請求是順序處理的,所以同一時間只能處理一個客戶端連線,多個客戶端來的時候,服務端將不能處理的客戶端連線請求放在佇列中等待處理,backlog引數指定了佇列的大小

2、ChannelOption.SO_REUSEADDR

​ ChanneOption.SO_REUSEADDR對應於套接字選項中的SO_REUSEADDR,這個引數表示允許重複使用本地地址和埠,

​ 比如,某個伺服器程序佔用了TCP的80埠進行監聽,此時再次監聽該埠就會返回錯誤,使用該引數就可以解決問題,該引數允許共用該埠,這個在伺服器程式中比較常使用,

​ 比如某個程序非正常退出,該程式佔用的埠可能要被佔用一段時間才能允許其他程序使用,而且程式死掉以後,核心一需要一定的時間才能夠釋放此埠,不設定SO_REUSEADDR就無法正常使用該埠。

3、ChannelOption.SO_KEEPALIVE

​ Channeloption.SO_KEEPALIVE引數對應於套接字選項中的SO_KEEPALIVE,該引數用於設定TCP連線,當設定該選項以後,連線會測試連結的狀態,這個選項用於可能長時間沒有資料交流的連線。當設定該選項以後,如果在兩小時內沒有資料的通訊時,TCP會自動傳送一個活動探測資料報文。

4、ChannelOption.SO_SNDBUF和ChannelOption.SO_RCVBUF

​ ChannelOption.SO_SNDBUF引數對應於套接字選項中的SO_SNDBUF,ChannelOption.SO_RCVBUF引數對應於套接字選項中的SO_RCVBUF這兩個引數用於操作接收緩衝區和傳送緩衝區的大小,接收緩衝區用於儲存網路協議站內收到的資料,直到應用程式讀取成功,傳送緩衝區用於儲存傳送資料,直到傳送成功。

5、ChannelOption.SO_LINGER

​ ChannelOption.SO_LINGER引數對應於套接字選項中的SO_LINGER,Linux核心預設的處理方式是當用戶呼叫close()方法的時候,函式返回,在可能的情況下,儘量傳送資料,不一定保證會發生剩餘的資料,造成了資料的不確定性,使用SO_LINGER可以阻塞close()的呼叫時間,直到資料完全傳送

6、ChannelOption.TCP_NODELAY

​ ChannelOption.TCP_NODELAY引數對應於套接字選項中的TCP_NODELAY,該引數的使用與Nagle演算法有關,Nagle演算法是將小的資料包組裝為更大的幀然後進行傳送,而不是輸入一次傳送一次,因此在資料包不足的時候會等待其他資料的到了,組裝成大的資料包進行傳送,雖然該方式有效提高網路的有效負載,但是卻造成了延時,而該引數的作用就是禁止使用Nagle演算法,使用於小資料即時傳輸,於TCP_NODELAY相對應的是TCP_CORK,該選項是需要等到傳送的資料量最大的時候,一次性發送資料,適用於檔案傳輸。

7、IP_TOS

IP引數,設定IP頭部的Type-of-Service欄位,用於描述IP包的優先順序和QoS選項。

8、ALLOW_HALF_CLOSURE

Netty引數,一個連線的遠端關閉時本地端是否關閉,預設值為False。值為False時,連線自動關閉;為True時,觸發ChannelInboundHandler的userEventTriggered()方法,事件為ChannelInputShutdownEvent。

以上覆制於:https://www.jianshu.com/p/975b30171352 (Netty ChannelOption引數詳解)客戶端業務邏輯

SimpleChatClientHandler

package netty.in.action.chatRoom;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class SimpleChatClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        System.out.println(s);
    }
}

SimpleChatClientInitial

package netty.in.action.chatRoom;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;


public class SimpleChatClientInitial extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
        channel.pipeline()
                .addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()))
                .addLast("encoder", new StringEncoder())
                .addLast("decoder", new StringDecoder())
                .addLast("handler", new SimpleChatClientHandler());
    }
}

SimpleChatClient

package netty.in.action.chatRoom;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

public class SimpleChatClient {

    private String host;
    private int port;

    public SimpleChatClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void run() {

        EventLoopGroup group = new NioEventLoopGroup();
        try {
            // 客戶端引導
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new SimpleChatClientInitial());


            Channel channel = bootstrap.connect(host, port).sync().channel();
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
            while (true) {
                channel.writeAndFlush(bufferedReader.readLine() + "\r\n");
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace(