1. 程式人生 > >Netty專欄 (一)——— Netty初步介紹和簡單Demo

Netty專欄 (一)——— Netty初步介紹和簡單Demo

Netty初步介紹和簡單Demo

@author 魯偉林
記錄《Netty 實戰》中各章節學習過程,寫下一些自己的思考和總結,幫助使用Netty框架的開發技術人員們,能夠有所得,避免踩坑。
本部落格目錄結構將嚴格按照書本《Netty 實戰》,省略與Netty無關的內容,可能出現跳小章節。
本部落格中涉及的完整程式碼,GitHub地址: https://github.com/thinkingfioa/netty-learning/tree/master/netty-in-action。
本人部落格地址: https://blog.csdn.net/thinking_fioa

1. Netty - 非同步和事件驅動

1. Netty 能夠幫助搭建允許系統能夠擴充套件到支援150000名併發使用者。

2. Netty 設計關鍵: 非同步 + 事件驅動

1.1 Java網路程式設計(BIO)

典型的BIO服務端:

1. 一個主執行緒在某個port監聽,等待客戶端連線。

2. 當接收到客戶端發起的連線時,建立一個新的執行緒去處理客戶端請求。

3. 主執行緒重新回到port監聽,等待下一個客戶端連線。

缺點:

1. 每個新的客戶端Socket都需要建立一個新的Thread處理,將會導致大量的執行緒處於休眠狀態。

2. 每個執行緒都有呼叫棧的記憶體分配,連線數非常多時,耗費較多記憶體。

3. 連線數比較多時,建立大量執行緒,上下文切換所帶來的開銷較大。

程式碼:

public void serve(int port) throws IOException {
    // 建立Socket
    ServerSocket serverSocket = new ServerSocket(port);
    // 等待客戶端連線
    Socket clientSocket = serverSocket.accept();
    // 建立輸入流
    BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
    PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
    String request, response;
    while((request = in.readLine()) != null) {
        if("Done".equals(request)) {
            break;
        }
        response = processRequest(request);
        out.println(response);
    }
}

1.2 Java NIO

1. 使用Selector來實現Java的非阻塞I/O操作。將多個Socket的讀寫狀態繫結到Selector上,允許在任何時間檢查任意的讀操作/寫操作的完成狀態。

2. 允許單個執行緒處理多個併發的連線。

1.3 Netty的核心元件

Netty的主要構件塊:

1. Channel

2. 回撥

3. Future

4. 事件和ChannelHandler

1.3.1 Channel

Channel是傳入(入站)或者傳出(出站)資料的載體(如一個檔案、一個Socket或一個硬體裝置)。可以被開啟或者被關閉,連線或斷開連線。

1.3.2 回撥

回撥只是:先寫一段程式碼,該段程式碼在將來某個適當的時候會被執行。Netty大量使用了回撥,比如:某ChannelHandler中的channelActive()方法則是一個回撥,表示連線建立時,請執行該段回撥程式碼。

1.3.3 Future

非同步操作佔位符。在操作完成時,提供結果的訪問。

JDK提供的Future和ChannelFuture對比:

1. JDK提供的Future需要手動檢查對應的操作是否完成,或一直阻塞直到它完成

2. ChannelFuture能夠註冊Listener監聽器,監聽器的回撥函式operationComplete()能非同步的在操作完成時被呼叫。

程式碼:

public static void connect() {
    Channel channel = CHANNEL_FROM_SOMEWHERE;

    ChannelFuture future = channel.connect(new InetSocketAddress("127.0.0.1", 9080));
    future.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if(future.isSuccess()) {
                ByteBuf buf = Unpooled.copiedBuffer("hello", Charset.defaultCharset());
                ChannelFuture wf = future.channel().writeAndFlush(buf);
                // ...
            } else {
                    // 失敗後可嘗試重連/切換鏈路
                future.cause().printStackTrace();
            }
        }
    })
}

1.3.4 事件和ChannelHandler

1. 事件:發生某種事件觸發適當的動作。比如入站觸發事件: 鏈路啟用(channelActive)/資料可讀(channelRead)/發生異常(exceptionCaught)/...

2. Channelhandler:一組為了響應特定事件而被執行的回撥函式。如:ChannelInboundHanderAdapter.java是一個入站事件

1.3.5 Channel和EventLoop關係:

Channel和EventLoop都是Netty核心概念,而且有一些約定俗成的規定,能幫助程式設計和理解:

1. 單個Channel只會對映到單個EventLoop

2. 單個EventLoop可以處理多個Channel(1:n關係)

3. 一個EventLoop在其生命週期內只能繫結到一個執行緒上4. 由於單個Channel在其生命週期中只會有一個I/O執行緒,所以ChannelPipeline中多個ChannelHandler無需關心同步互斥問題

2. 第一款Netty應用程式

1. ChannelHandler用於構建應用業務邏輯。往往封裝了為響應特定事件而編寫的回撥函式

2. 本節主要講解一個超級簡單的Netty應用程式,回顯服務: 客戶端建立連線後,傳送一個或多個訊息。服務端收到後,將訊息返回。

2.3 編寫Echo伺服器

Netty服務端至少需要兩個部分: 一個ChannelHandler + 引導(Bootstrap)

2.3.1 ChannelHandler和業務邏輯

繼承ChannelInboundHandlerAdapter類,感興趣的入站方法:

1. channelRead() - 對於每個傳入的訊息都要呼叫

2. channelReadComplete() - 當前批量讀取中的最後一條資料

3. exceptionCaught() - 讀取操作期間,有異常丟擲時呼叫

程式碼:

@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter{

    /**
     * 每次傳入的訊息都要呼叫
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf in = (ByteBuf) msg;
        System.out.println(
                "Server received: " + in.toString(CharsetUtil.UTF_8));
        ctx.write(in);
    }

    /**
     * 讀完當前批量中的最後一條資料後,觸發channelReadComplete(...)方法
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx)
            throws Exception {
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
                .addListener(ChannelFutureListener.CLOSE);
    }

    /**
     * 異常捕獲
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,
                                Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

解釋:

1. channelRead和channelReadComplete理解:當批量訊息後最後一條資料被channelRead(...)後觸發channelReadComplete事件。

2. ctx.write(...)只是將訊息暫時存放在ChannelOutboundBuffer中,等待flush(...)操作

3. @Sharable註解:本質是宣告該ChannelHandler全域性單例。可被多個Channel安全的共享。標註了@Sharable註解的ChannelHandler請注意不能有對應的狀態

2.3.2 引導伺服器

1. 引導伺服器主要開啟Netty的Channel。並分配對應的EventLoop和ChannelPipeline。

2. 一個Channel只有一個ChannelPipeline。ChannelPipeline是由一組ChannelHandler組成的責任鏈。

程式碼:

EventLoopGroup group = new NioEventLoopGroup();
try {
    ServerBootstrap b = new ServerBootstrap();
    b.group(group)
            .channel(NioServerSocketChannel.class)
            .localAddress(new InetSocketAddress(port))
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new EchoServerHandler());
                }
            });
} finally {
    group.shutdownGracefully().sync();
}

2.4 編寫Echo客戶端

客戶端將會:

1. 建立連線

2. 傳送訊息

3. 關閉連線

2.4.1 ChannelHandler客戶端邏輯

1. Java是通過GC可達性分析來實現垃圾回收。對於Netty傳輸中的ByteBuf,使用的是引用計數演算法。也就是說:如果你使用了Netty,需要你親自考慮是否需要手動釋放物件。判斷方法,後文將會給出

2. 擴充套件SimpleChannelInboundHandler類處理任務的Handler,無需手動釋放物件。SimpleChannelInboundHandler.java中方法channelRead()中會負責釋放引用。

3. 客戶端傳送訊息條數和服務端接收的訊息條數是不對應的。除非處理了TCP的粘包黏包。

程式碼:

// SimpleChannelInboundHandler<T>中channelRead方法負責釋放物件msg引用
public abstract class SimpleChannelInboundHandler<I> ...{
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        boolean release = true;
        try {
        // ...
        } finally {
            if (autoRelease && release) {
                  // 減少物件msg引用計數
                ReferenceCountUtil.release(msg);
            }
        }
    }
}    

問:ChannelHandler中何時需要主動釋放引用?

1. 擴充套件的類不是: SimpleChannelInboundHandler,且該物件msg不會傳給下一個ChannelHandler

2. 擴充套件的類不是: SimpleChannelInboundHandler,且該物件msg不會被ctx.write(...)

2.4.2 引導客戶端

給出引導客戶端關鍵程式碼,完整程式碼請參考地址

程式碼:

EventLoopGroup group = new NioEventLoopGroup();
try {
    Bootstrap b = new Bootstrap();
    b.group(group)
            .channel(NioSocketChannel.class)
            .remoteAddress(new InetSocketAddress(host, port))
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch)
                        throws Exception {
                    ch.pipeline().addLast(
                            new EchoClientHandler());
                }
            });
    // 下面兩行程式碼可以刪除
    ChannelFuture f = b.connect().sync();
    f.channel().closeFuture().sync();
} finally {
    group.shutdownGracefully().sync();
}

附錄