1. 程式人生 > >Netty入門之一:使用者指南

Netty入門之一:使用者指南

  1. 不得不吐槽一下netty,舊版本和新版本竟然不相容。

  2. 《netty權威指南》覺得對於入門來說,並不是一本好書。

  3. netty在行業中用的比較廣泛。

Discard

程式狗可以對Hello World來高潮,因為大部分程式狗的第一段程式碼都是從Hello World開始的。

讓處女座程式狗難受的是netty user guide用了DISCARD。

Handler

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public
class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1) @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2) // Discard the received data silently. ((ByteBuf) msg).release(); // (3) } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
// Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
  1. DiscardServerHandler繼承自ChannelInboundHandlerAdapter,而ChannelInboundHandlerAdapter實現了ChannelInboundHandler並繼承ChannelHandlerAdapter。目前的做法是最好繼承自類ChannelInboundHandler而非實現ChannelInboundHandler介面。

  2. 可以重寫裡面的方法。上面的程式碼中重寫了 channelRead() 方法。當接受到資料後,這個方法會被呼叫。在這個程式碼中,訊息型別是 ByteBuf 。

  3. 因為ByteBuf是一個引用計數物件(reference-counted object ),它必須通過release()方法顯示釋放
    一種常用的模式如下:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    try {
        // Do something with msg
    } finally {
        ReferenceCountUtil.release(msg);
    }
}

4 . 當IO錯誤或者handler實現者處理事件時丟擲異常,exceptionCaught()將會被呼叫。可以具體業務邏輯實現。

DiscardServer

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class DiscardServer {

    private int port;

    public DiscardServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class) // (3)
                    .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new DiscardServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)          // (5)
                    .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)

            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new DiscardServer(port).run();
    }
}
  1. NioEventLoopGroup 是多執行緒事件迴圈(multithreaded event loop)處理IO操作。netty提供了一系列EventLoopGroup的實現。
    這裡用到了2個EventLoopGroup,boss處理連線請求。worker來處理被boss接受的連線的資料互動。
    有多少個執行緒被使用,以及它們如何對映到Channel取決於EventLoopGroup的實現。

  2. ServerBootstrap 是一個啟動伺服器的輔助類。

  3. 這段程式碼指定NioServerSocketChannel類用於例項化一個新的Channel來接受入站連線請求。

  4. The handler specified here will always be evaluated by a newly accepted Channel. The ChannelInitializer is a special handler that is purposed to help a user configure a new Channel. It is most likely that you want to configure the ChannelPipeline of the new Channel by adding some handlers such as DiscardServerHandler to implement your network application. As the application gets complicated, it is likely that you will add more handlers to the pipeline and extract this anonymous class into a top level class eventually.

  5. 設定Socket Option,看過java nio和《java 網路程式設計》的應該很懂吧。

  6. 雖然可以推測出設定Socket Option,但是用到了2個方法:option()childOption()。其中option()是設定NioServerSocketChannel的,而childOption()是設定ChannelInitializer中的SocketChannel。好的程式碼類方法的命名很重要,不是麼?否則為毛而childOption()方法設定的Socket選項是給childHandler中的Channel呢!user guide是這麼解釋的:childOption()設定的是被父ServerChannel接受的Channel。

  7. 綁定了監聽埠。隨後呼叫他的同步阻塞方法sunc()等待繫結操作完成。完成後返回一個ChannelFuture物件,後面發現,好多方法都返回這個物件。

  8. f.channel().closeFuture().sync()是等待伺服器監聽埠關閉。

  9. finally程式碼塊是優雅停機。

測試

為了方便測試,DiscardServer接收到的資料打印出來,然後用telnet進行測試。

稍微修改一下DiscardServerHandler類

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf in = (ByteBuf) msg;
        try {
            while (in.isReadable()) {
                System.out.println((char)in.readByte());
                System.out.flush();
            }
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        System.out.println("Exception happened.");
        System.out.println(cause.getMessage());
        System.out.flush();
        ctx.close();
    }
}

執行telnet localhost 8080輸入Hello World的結果
這裡寫圖片描述

再改進:伺服器進行響應(Echo)

Echo就是收到什麼,返回什麼。

只需要改DiscardServerHandler類的channelRead()方法

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ctx.write(msg);
    ctx.flush();
}

輸入abcdezQ結果
這裡寫圖片描述

這裡有3點注意:

  1. ChannelHandlerContext提供了一些方法可以觸發IO事件和IO操作。

  2. 和前面的案例不同,這裡並沒有釋放引用計數物件msg,因為只要這個物件被寫出,Netty會對它進行釋放。

  3. 知道緩衝的同學應該清楚,為毛這裡要執行flush()操作。據說這兩行程式碼可以替換成

ctx.writeAndFlush(msg);

親測有用,輸入HelloWorld的結果:
這裡寫圖片描述

TimeServer

時間伺服器,懂網路程式設計的人,對於這個案例應該和HelloWorld類似吧。

伺服器接收到客戶端的請求後,不理會客戶端傳送的任何資訊,而是將當前時間返回給客戶端,然後斷開連線。

時間伺服器

首先,客戶端可能會發送資料給伺服器,那麼伺服器不必理會這些資料;其次,客戶端可能不會發送任何資料給伺服器;再次,伺服器在任何情況下,只要客戶端有入站連線,就要將當前時間傳送給客戶端。

這裡的案例處理方式是:只要客戶端和伺服器建立連線成功,伺服器就將當前時間傳送給客戶端,因此不能再使用channelRead()方法了,而是重寫channelActive()方法。

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));

        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 當連線被建立並可以進行資料通訊時,channelActive()將會被呼叫。

  2. 建立一個ByteBuf類進行訊息傳遞。通過當前ChannelHandlerContext.alloc()方法獲取到當前的分配器ByteBufAllocator,因為要寫出一個int資料,因此分配4位元組。

  3. 呼叫ChannelHandlerContext的writeAndFlush()方法寫出訊息。

    • 這裡就體現出java.nio.ByteBuffer類和netty的io.netty.buffer.ByteBuf類之間的區別:java.io.ByteBuffer類當資料寫入時,需要執行ByteBuffer.flip()方法來調整一些位置資訊從而可以讀取ByteBuffer中的資料,但io.netty.buffer.ByteBuf不需要這麼做,據說因為它有2個指標分別用於讀取和寫入操作。這確實是一個改進,在使用java.nio.ByteBuffer時,需要時刻警惕位置資訊以防讀取或者寫入操作錯誤。
    • 此外,ChannelInboundHandlerAdapter.writeAndFlush()或write()方法會返回一個ChannelFuture物件,這個物件代表了一個還未發生的IO操作,因為netty是非同步的,比如以下程式碼中,可能訊息還沒有傳送,而連線就被關閉了。因此必須等到ChannelFuture完成後再呼叫close()方法。當寫操作完成後,ChannelFuture會通知它的Listeners。注意:Channel的close()也不立即關閉連線,它同樣返回ChannelFuture物件。
Channel ch = ...;
ch.writeAndFlush(message);
ch.close();

4 . 這裡使用ChannelFutureListener物件新增到ctx.writeAndFlush(time)返回的ChannelFuture物件中,作為Listener,然後在裡面實現連線的關閉。可以簡單的使用f.addListener(ChannelFutureListener.CLOSE);從原始碼中看到,其實現為:

ChannelFutureListener CLOSE = new ChannelFutureListener() {
    public void operationComplete(ChannelFuture future) {
        future.channel().close();
    }
};

測試

在Linux下,可以這麼測試:
rdate -o port -p host

返回人可閱讀的字元

伺服器最好能返回人可閱讀的字元。只需要修改TimeServerHandler的channelActive()方法。

    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        byte[] msg = ("Now time is: " + new Date().toString()).getBytes();
        final ByteBuf time = ctx.alloc().buffer(msg.length); // (2)
        time.writeBytes(msg);

        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(ChannelFutureListener.CLOSE);
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)
    }

telnet測試結果
這裡寫圖片描述

TimeClient

也可以不修改TimeServer對時間的輸出,但是人可閱讀從客戶端自身來處理。

客戶端程式

客戶端程式和伺服器程式的不同之處在Bootstrap和Channel。

之前的伺服器程式使用ServerBootstrap和NioServerSocketChannel

相對應的,客戶端理論上應該使用Bootstrap和NioSocketChannel

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class TimeClient {
    public static void main(String[] args) throws Exception {
        String host = "localhost";
        int port = 8080;
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });

            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}
  1. 這裡的輔助類使用了Bootstrap,而非之前伺服器程式中的ServerBootstrap。

  2. 如果只指定了一個EventLoopGroup,它既當作boss,又當作worker。

  3. 客戶端使用NioSocketChannel ,而非之前伺服器程式中的NioServerSocketChannel。

  4. 這裡並沒有使用childOption()方法,因為客戶端的SocketChannel並不是想伺服器端那樣的形式獲取的。
    記得《java網路程式設計》的同學應該知道,java.nio中,伺服器端的ServerSocketChannel方法進行監聽,而客戶端有入站請求後,可以通過accept()方法獲取到SocketChannel,在netty中,這類層次關係也是一樣的:option()方法設定NioServerSocketChannel的Socket選項,而childOption()方法用於伺服器端通過NioServerSocketChannel獲取的NioSocketChannel的Socket選項,但是客戶端的NioSocketChannel獲取方法是new的物件,因此通過option()方法設定Socket選項。

  5. 與伺服器端通過bind()方法繫結地址和埠進行監聽相對應,客戶端使用connect()方法向伺服器請求建立連線。

  6. finally程式碼塊同樣是優雅停機。

測試

前提:將之前的TimeServer改回原來的狀態,即響應一個整型的數字,然後執行起來。

執行TimeClient測試結果
這裡寫圖片描述

對基於流的傳輸的處理方式

引起問題的原因:TCP粘包問題,不懂自行搜尋。

解決方案一

回想之前的TimeClient程式,它接受一個int型別的資料,通常這樣子的資料不是分散的,但是存在分散的可能性,隨著網路流量的增加,這種可能性也隨之增加。

一種解決方案是,客戶端和伺服器之間的協議是接受的資料/傳送的資料必須是4個位元組才算完整的,那麼客戶端可以等待直到接受4位元組資料為止。

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;

public class TimeClientHandler2 extends ChannelInboundHandlerAdapter {
    private ByteBuf buf;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        buf = ctx.alloc().buffer(4); // (1)
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        buf.release(); // (1)
        buf = null;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m); // (2)
        m.release();

        if (buf.readableBytes() >= 4) { // (3)
            long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. ChannelHandler有兩個生命週期監聽方法:handlerAdded()和handlerRemoved()。可以執行任意初始化任務,只要它不阻礙了很長一段時間。==(補充一下這兩個方法合適被呼叫)==

  2. 接受的資料堆積到全域性的ByteBuf中。

  3. 一旦接收到新資料,馬上檢測全域性的ByteBuf中,資料是否充分(至少有4個位元組)。

解決方案二

第一種方案解決了TimeClient的問題,然後面對更為複雜的場景,它的不足之處就更加明顯了。

當一個應用程式更為複雜時,如有多個欄位,那麼第一種方案就很難維護。

可以將單個ChannelHandler分成多個部分減少應用程式的複雜性。

在這個問題中,用TimeDecoder來解決粘包問題,而TimeClientHandler來處理接收到伺服器返回資料的問題。

解碼器Decoder

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

public class TimeDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        if (byteBuf.readableBytes() < 4) {
            return;
        }
        list.add(byteBuf.readBytes(4));
    }
}
  1. 抽象類ByteToMessageDecoder是ChannelInboundHandlerAdapter的一個子類,==public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter==雖然命名方式上看不出來,它方便解決資料分散問題(粘包/拆包)。

  2. 當接收到新資料時,ByteToMessageDecoder呼叫decode()方法,維護內部累積的緩衝區。

  3. 當內部維護的緩衝區還沒有足夠的資料時,decode()方法可以選擇啥事都不做。當下次又有新資料接收時,ByteToMessageDecoder會繼續呼叫decode()方法。

  4. 如果decode()方法通過List寫出一個物件時,意味著解碼器成功解析了一個訊息。可能一次收到多個訊息,這種情況也不需要解碼多個訊息,ByteToMessageDecoder將會一直呼叫decode()方法,直到out沒有東西寫出為止。

現在,TimeClient有2個ChannelHandler了,只需把它加到ChannelPipeline上即可,改變之前程式碼的hannelInitializer的實現:

b.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
    }
});

使用POJO代替ByteBuf

前面的所有程式使用的都是ByteBuf,這裡將改為使用簡單java物件,這樣可是使程式更容易維護和重用。

自己定義一個時間類。

import java.util.Date;

public class UnixTime {
    private final long value;

    public UnixTime(long value) {
        this.value = value;
    }

    public UnixTime() {
        this(System.currentTimeMillis() / 1000L + 2208988800L);
    }

    public long value() {
        return value;
    }

    @Override
    public String toString() {
        return new Date((value - 2208988800L) * 1000L).toString();
    }
}

TimeClient使用UnixTime類來替換原始的ByteBuf類

可以使用UnixTime類來替換TimeDecoer類中使用的ByteBuf。

import io.netty.buffer.ByteBuf;
        import io.netty.channel.ChannelHandlerContext;
        import io.netty.handler.codec.ByteToMessageDecoder;

        import java.util.List;

public class TimeDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        if (byteBuf.readableBytes() < 4) {
            return;
        }
        list.add(new UnixTime(byteBuf.readUnsignedInt()));
    }
}

而TimeClientHandler類中收到的資料將是UnixTime型別了。

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        UnixTime ut = (UnixTime) msg;
        System.out.println(ut);
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

執行結果
這裡寫圖片描述

TimeServer使用UnixTime類來替換原始的ByteBuf類

在這個案例中,伺服器端使用POJO時,只需要修改TimeServerHandler類的channelActive()方法,所有程式碼如下:

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        ChannelFuture f = ctx.writeAndFlush(new UnixTime());
        f.addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println("我收到了資料。");
        ReferenceCountUtil.release(msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

但是之前程式碼中,伺服器端使用的ByteBuf,而這裡使用了POJO,因此需要做出一點改變,即將POJO封裝成ByteBuf,需要增加一個ChannelHandler,聰明的小夥伴看到TimeClient中的解碼器,馬上想到伺服器端是否有編碼器之類的程式碼,bingo,差了一點點。伺服器端也使用ChannelInboundHandlerAdapter的子類。

編碼器Endoer

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

public class TimeEncoder extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        UnixTime m = (UnixTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt((int)m.value());
        ctx.write(encoded, promise); // (1)
    }
}
  1. 這一行很重要。
    • 首先通過原始的ChannelPromise按原樣。當編碼的資料確實寫出後,netty會標記它成功或者失敗。
    • 這裡沒有呼叫ChannelHandlerContext的flush()方法。因為有單獨的處理器方法public void flush(ChannelHandlerContext ctx)

想客戶端一樣簡化這段程式碼,使用netty的MessageToByteEncoder,它是ChannelOutboundHandlerAdapter的子類。

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class TimeEncoder extends MessageToByteEncoder<UnixTime>{
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, UnixTime unixTime, ByteBuf byteBuf) throws Exception {
        byteBuf.writeInt((int)unixTime.value());
    }
}

然後將這個ChannelHandler級聯到伺服器程式碼TimeServer中,注意級聯順序,關鍵程式碼如下圖所示
這裡寫圖片描述