1. 程式人生 > >漫談Java IO之 Netty與NIO服務器

漫談Java IO之 Netty與NIO服務器

解碼 讀取 重要 char ada 管道通信 ble 發送 mage

前面介紹了基本的網絡模型以及IO與NIO,那麽有了NIO來開發非阻塞服務器,大家就滿足了嗎?有了技術支持,就回去追求效率,因此就產生了很多NIO的框架對NIO進行封裝——這就是大名鼎鼎的Netty。

前幾篇的內容,可以參考:

  1. 網絡IO的基本知識與概念
  2. 普通IO以及BIO服務器
  3. NIO的使用與服務器Hello world
  4. Netty的使用與服務器Hello world

為什麽要使用開源框架?

這個問題幾乎可以當做廢話,框架肯定要比一些原生的API封裝了更多地功能,重復造輪子在追求效率的情況並不是明智之舉。那麽先來說說NIO有什麽缺點吧:

  1. NIO的類庫和API還是有點復雜,比如Buffer的使用
  2. Selector編寫復雜,如果對某個事件註冊後,業務代碼過於耦合
  3. 需要了解很多多線程的知識,熟悉網絡編程
  4. 面對斷連重連、保丟失、粘包等,處理復雜
  5. NIO存在BUG,根據網上言論說是selector空輪訓導致CPU飆升,具體有興趣的可以看看JDK的官網

那麽有了這些問題,就急需一些大牛們開發通用框架來方便勞苦大眾了。最致命的NIO框架就是MINA和Netty了,這裏不得不說個小插曲:

先來看看MINA的主要貢獻者:

技術分享圖片

再來看看NETYY的主要貢獻者:

技術分享圖片

總結起來,有這麽幾點:

  1. MINA和Netty的主要貢獻者都是同一個人——Trustin lee,韓國Line公司的。
  2. MINA於2006年開發,到14、15年左右,基本停止維護
  3. Nety開始於2009年,目前仍由蘋果公司的norman maurer在主要維護。
  4. Norman Maurer是《Netty in Action》一書的作者

因此,如果讓你選擇你應該知道選擇誰了吧。另外,MINA對底層系統要求功底更深,且國內Netty的氛圍更好,有李林峰等人在大力宣傳(《Netty權威指南》的作者)。

講了一大堆的廢話之後,總結來說就是——Netty有前途,學它準沒錯。

Netty介紹

按照定義來說,Netty是一個異步、事件驅動的用來做高性能、高可靠性的網絡應用框架。主要的優點有:

  1. 框架設計優雅,底層模型隨意切換適應不同的網絡協議要求
  2. 提供很多標準的協議、安全、編碼解碼的支持
  3. 解決了很多NIO不易用的問題
  4. 社區更為活躍,在很多開源框架中使用,如Dubbo、RocketMQ、Spark等

主要支持的功能或者特性有:
技術分享圖片

  1. 底層核心有:Zero-Copy-Capable Buffer,非常易用的靈拷貝Buffer(這個內容很有意思,稍後專門來說);統一的API;標準可擴展的時間模型
  2. 傳輸方面的支持有:管道通信(具體不知道幹啥的,還請老司機指教);Http隧道;TCP與UDP
  3. 協議方面的支持有:基於原始文本和二進制的協議;解壓縮;大文件傳輸;流媒體傳輸;protobuf編解碼;安全認證;http和websocket

總之提供了很多現成的功能可以直接供開發者使用。

Netty服務器小例子

基於Netty的服務器編程可以看做是Reactor模型:
技術分享圖片
即包含一個接收連接的線程池(也有可能是單個線程,boss線程池)以及一個處理連接的線程池(worker線程池)。boss負責接收連接,並進行IO監聽;worker負責後續的處理。為了便於理解Netty,直接看看代碼:

package cn.xingoo.book.netty.chap04;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;

public class NettyNioServer {
    public void serve(int port) throws InterruptedException {
        final ByteBuf buffer = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi\r\n", Charset.forName("UTF-8")));
        // 第一步,創建線程池
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try{
            // 第二步,創建啟動類
            ServerBootstrap b = new ServerBootstrap();
            // 第三步,配置各組件
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                                @Override
                                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                    ctx.writeAndFlush(buffer.duplicate()).addListener(ChannelFutureListener.CLOSE);
                                }
                            });
                        }
                    });
            // 第四步,開啟監聽
            ChannelFuture f = b.bind().sync();
            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully().sync();
            workerGroup.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        NettyNioServer server = new NettyNioServer();
        server.serve(5555);
    }
}

代碼非常少,而且想要換成阻塞IO,只需要替換Channel裏面的工廠類即可:

public class NettyOioServer {
    public void serve(int port) throws InterruptedException {
        final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi\r\b", Charset.forName("UTF-8")));

        EventLoopGroup bossGroup = new OioEventLoopGroup(1);
        EventLoopGroup workerGroup = new OioEventLoopGroup();

        try{
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)//配置boss和worker
                    .channel(OioServerSocketChannel.class) // 使用阻塞的SocketChannel
         ....

概括來說,在Netty中包含下面幾個主要的組件:

  • Bootstrap:netty的組件容器,用於把其他各個部分連接起來;如果是TCP的Server端,則為ServerBootstrap.
  • Channel:代表一個Socket的連接
  • EventLoopGroup:一個Group包含多個EventLoop,可以理解為線程池
  • EventLoop:處理具體的Channel,一個EventLoop可以處理多個Channel
  • ChannelPipeline:每個Channel綁定一個pipeline,在上面註冊處理邏輯handler
  • Handler:具體的對消息或連接的處理,有兩種類型,Inbound和Outbound。分別代表消息接收的處理和消息發送的處理。
  • ChannelFuture:註解回調方法

了解上面的基本組件後,就看一下幾個重要的內容。

Netty的Buffer和零拷貝

在Unix操作系統中,系統底層可以基於mmap實現內核空間和用戶空間的內存映射。但是在Netty中並不是這個意思,它主要來自於下面幾個功能:

技術分享圖片

  1. 通過Composite和slice實現邏輯上的Buffer的組合和拆分,重新維護索引,避免內存拷貝過程。
  2. 通過DirectBuffer申請堆外內存,避免用戶空間的拷貝。不過堆外內存的申請和釋放都很麻煩,推薦小心使用。關於堆外內存的一些研究,還可以參考執勤的分享:Java堆外內存之突破JVM枷鎖 以及 Java直接內存與非直接內存性能測試
  3. 通過FileRegion包裝FileChannel,直接實現channel到channel的傳輸。

另外,Netty自己封裝實現了ByteBuf,相比於Nio原生的ByteBuffer,API上更易用了;同時支持容量的動態擴容;另外還支持Buffer的池化,高效復用Buffer。

public class ByteBufTest {
    public static void main(String[] args) {
        //創建bytebuf
        ByteBuf buf = Unpooled.copiedBuffer("hello".getBytes());
        System.out.println(buf);

        // 讀取一個字節
        buf.readByte();
        System.out.println(buf);

        // 讀取一個字節
        buf.readByte();
        System.out.println(buf);

        // 丟棄無用數據
        buf.discardReadBytes();
        System.out.println(buf);

        // 清空
        buf.clear();
        System.out.println(buf);

        // 寫入
        buf.writeBytes("123".getBytes());
        System.out.println(buf);

        buf.markReaderIndex();
        System.out.println("mark:"+buf);

        buf.readByte();
        buf.readByte();
        System.out.println("read:"+buf);

        buf.resetReaderIndex();
        System.out.println("reset:"+buf);
    }
}

輸出為:

UnpooledHeapByteBuf(ridx: 0, widx: 5, cap: 5/5)
UnpooledHeapByteBuf(ridx: 1, widx: 5, cap: 5/5)
UnpooledHeapByteBuf(ridx: 2, widx: 5, cap: 5/5)
UnpooledHeapByteBuf(ridx: 0, widx: 3, cap: 5/5)
UnpooledHeapByteBuf(ridx: 0, widx: 0, cap: 5/5)
UnpooledHeapByteBuf(ridx: 0, widx: 3, cap: 5/5)
mark:UnpooledHeapByteBuf(ridx: 0, widx: 3, cap: 5/5)
read:UnpooledHeapByteBuf(ridx: 2, widx: 3, cap: 5/5)
reset:UnpooledHeapByteBuf(ridx: 0, widx: 3, cap: 5/5)

有興趣的可以看一下上一篇分享的ByteBuffer,對比一下,就能發現在Netty中通過獨立的讀寫索引維護,避免讀寫模式的切換,更加方便了。

Handler的使用

前面介紹了Handler包含了Inbound和Outbound兩種,他們統一放在一個雙向鏈表中:

技術分享圖片

當接收消息的時候,會從鏈表的表頭開始遍歷,如果是inbound就調用對應的方法;如果發送消息則從鏈表的尾巴開始遍歷。那麽上面途中的例子,接收消息就會輸出:

InboundA --> InboundB --> InboundC

輸出消息,則會輸出:

OutboundC --> OutboundB --> OutboundA

這裏有段代碼,可以直接復制下來,試試看:

package cn.xingoo.book.netty.pipeline;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.charset.Charset;

/**
 * 註意:
 *
 * 1 ChannelOutboundHandler要在最後一個Inbound之前
 *
 */
public class NettyNioServerHandlerTest {

    final static ByteBuf buffer = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi\r\n", Charset.forName("UTF-8")));

    public void serve(int port) throws InterruptedException {


        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try{
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast("1",new InboundA());
                            pipeline.addLast("2",new OutboundA());
                            pipeline.addLast("3",new InboundB());
                            pipeline.addLast("4",new OutboundB());
                            pipeline.addLast("5",new OutboundC());
                            pipeline.addLast("6",new InboundC());
                        }
                    });
            ChannelFuture f = b.bind().sync();
            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully().sync();
            workerGroup.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        NettyNioServerHandlerTest server = new NettyNioServerHandlerTest();
        server.serve(5555);
    }

    private static class InboundA extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf)msg;
            System.out.println("InboundA read"+buf.toString(Charset.forName("UTF-8")));
            super.channelRead(ctx, msg);
        }
    }

    private static class InboundB extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf)msg;
            System.out.println("InboundB read"+buf.toString(Charset.forName("UTF-8")));
            super.channelRead(ctx, msg);
            // 從pipeline的尾巴開始找outbound
            ctx.channel().writeAndFlush(buffer);
        }
    }

    private static class InboundC extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf)msg;
            System.out.println("InboundC read"+buf.toString(Charset.forName("UTF-8")));
            super.channelRead(ctx, msg);
            // 這樣會從當前的handler向前找outbound
            //ctx.writeAndFlush(buffer);
        }
    }

    private static class OutboundA extends ChannelOutboundHandlerAdapter {
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            System.out.println("OutboundA write");
            super.write(ctx, msg, promise);
        }
    }

    private static class OutboundB extends ChannelOutboundHandlerAdapter {
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            System.out.println("OutboundB write");
            super.write(ctx, msg, promise);
        }
    }

    private static class OutboundC extends ChannelOutboundHandlerAdapter {
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            System.out.println("OutboundC write");
            super.write(ctx, msg, promise);
        }
    }
}

最後有一個TCP粘包的例子,有興趣的也可以自己試一下,代碼就不貼上來了,可以參考最後面的Github連接。

參考

  1. 《Netty實戰》
  2. 《Netty權威指南》
  3. github代碼鏈接

漫談Java IO之 Netty與NIO服務器