1. 程式人生 > >netty使用指南

netty使用指南

down eth 而是 客戶端和服務器 mail lose nco tcp、udp 。net

Netty用戶指南

一、前言

1.問題

當今世界我們需要使用通用的軟件或庫與其他組件進行通信,例如使用HTTP客戶端從服務器中獲取信息,或通過網絡服務調用一個遠程的方法。然而通用的協議及其實現通常不具備較好的伸縮性。所以問題看起來是我們怎麽不使用通用的HTTP服務器去傳輸大文件、e-mail、實事數據、多媒體數據等。我們需要的是針對特定問題而進行優化的協議實現。例如我們可能需要重新實現一個HTTP服務器來與AJAX的客戶端進行通信。另外一種情況是需要處理歷史遺留的協議保證與舊的系統兼容。這些例子的關鍵在於怎樣快速的實現協議而不損失目標系統的穩定性和性能。

2.解決方案

Netty是一個異步事件驅動的網絡應用框架,可以用來快速開發可維護的、高性能、可擴展的協議服務器和客戶端。

換句話說,Netty是一個基於NIO的客戶端和服務器框架,可以簡單快速的開發網絡應用程序,如協議的客戶端和服務器。它極大的簡化了TCP、UDP服務器之類的網絡編程。

二、開始

1.編寫DiscardServer

最簡單的協議並不是“hello world”,而是丟棄。丟棄協議會丟棄任何接受到的數據不做任何的響應。

要實現丟棄協議,需要做的就是丟棄任何接收到的數據。首先從handler的實現開始,handler會處理由Netty產生的I/O事件。

package io.netty.example.discard;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
 * Handles a server-side channel.
 */
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
        // Discard the received data silently.
        ((ByteBuf) msg).release(); // (3)
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}
  1. DiscardServerHandler繼承了ChannelInboundHandlerAdapter,而他又實現了ChannelInboundHandlerChannelInboundHandler提供了不同的事件處理方法,你可以根據需要去覆寫相應的方法。ChannelInboundHandlerAdapter提供了一些默認的實現,所以在這個例子中只需要去繼承它就可以了。
  2. 覆寫了channelRead方法,Netty從客戶端收到數據時就會調用該方法。消息的類型是ByteBuf
  3. ByteBuf是一個引用計數對象,需要進行手動的釋放。需要註意的是,handler需要釋放任何傳遞給他的引用計數對象。通常情況下channelRead()
    方法通常的實現方式如下:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    try {
        // Do something with msg
    } finally {
        ReferenceCountUtil.release(msg);
    }
}
  1. 由於IO錯誤Netty拋出異常或handle處理事件拋出異常,都會使exceptionCaught()方法被調用。在大多數情況下,都需要對異常記日誌,並且關閉相關連的channel

到目前為止實現了DISCARD服務的一般,接下來需要實現main()方法來啟動服務。

package io.netty.example.discard;
    
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
    
/**
 * Discards any incoming data.
 */
public class DiscardServer {
    
    private int port;
    
    public DiscardServer(int port) {
        this.port = port;
    }
    
    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new DiscardServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
    
            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)
    
            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
    
    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new DiscardServer(port).run();
    }
}
  1. NioEventLoopGroup 是一個多線程的事件循環,用來處理I/O操作。Netty為不同的通信方式提供了多種EventLoopGroup實現。在本例中,我們只需要實現服務器端的應用,所以需要兩個NioEventLoopGroup 。第一個通常稱為boss,用來接收客戶端的鏈接請求。第二個稱為worker,用來處理boss已接收連接的I/O請求和把接收的連接註冊到worker
  2. ServerBootstrap是用來創建服務器的輔助類。
  3. 使用NioServerSocketChannel類來實例化channel,用來接收連接請求。
  4. 在這裏設置的handler會被每一個新channel調用,ChannelInitializer是一個特殊的handler用來配置一個新的channel。在本例中,我們將DiscardServerHandler添加到新channel 的管道中。隨著應用程序的復雜度增加,可能會向管道中加入更多的handler。
  5. 可以通過option()方法給channel設置一些參數。
  6. option()方法是用來設置NioServerSocketChannel參數的,而childOption()是給接收的連接設置參數的。
  7. 剩下的就是綁定端口然後啟動服務了。

2. 測試DiscardServer是否成功

最簡單的方法是使用telnet命令。例如輸入telnet localhost 8080。DiscarServer丟棄了任何接受的數據,我們可以把DiscardServer的接收的數據打印出來。

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ByteBuf in = (ByteBuf) msg;
    try {
        while (in.isReadable()) { // (1)
            System.out.print((char) in.readByte());
            System.out.flush();
        }
    } finally {
        ReferenceCountUtil.release(msg); // (2)
    }
}
  1. 循環可以等價於System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII))
  2. 等價於in.release()

3.寫一個Echo Server

一個服務器通常需要對請求作出響應,而一個Echo服務僅僅需要做的是把請求的內容返回給客戶端。

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ctx.write(msg); // (1)
    ctx.flush(); // (2)
}
  1. ChannelHandlerContext對象提供了各種出發IO時間的操作。通過調用write(Object)方法把數據發給客戶端。在這裏沒有手動的釋放msg,這是因為當把msg寫入時Netty會自動的釋放它。
  2. ctx.write(Object)並不會把數據寫到外部,而是在內部的緩沖區中,通過調用ctx.flush()把數據刷出到外部。可以簡潔的調用ctx.wirteAndFlush(msg)達到同樣的效果。

4. 寫一個Timer Server

TIME協議與前面的例子不同之處在於,它發送一個32位的整數,不接收任何請求,並且只要消息發送了就立刻關閉連接。

因為我們不需要接收任何數據,而且在連接建立時就發送數據,所以不能使用channelRead()方法。需要覆寫channelActive()方法

package io.netty.example.time;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
        
        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 當一個連接建立時,activeChannel()方法會被調用,然後寫一個32位的整數。

  2. 為了發送一個新的信息,需要分配一個緩沖區。通過調用ctx.alloc()獲取ByteBufAllocator來分配緩沖區。

  3. 在Netty中的Buffer不需要像Java NIO一樣調用flip(),這是因為Netty中的Buffer具有兩個指針,分別用於讀寫操作。當進行寫操作時寫指針在移動而讀指針不移動,讀寫指針分別代表數據的開始和結束。

    另外需要指出的是,ctx.write()返回一個ChannelFuture對象,該對象代表著一個還未發生的IO操作。這意味著,任何一個請求操作可能都未發生,這是因為在Netty中,所有操作都是異步的。例如下面的代碼可能在發送信息前關閉連接:

    Channel ch = ...;
    ch.writeAndFlush(message);
    ch.close();

    所以要在ChannelFuture完成前調用close(),當操作完成時,ChannelFuture會通知他的監聽器。close()可能也不會立即關閉連接。

  4. 本例中添加一個匿名內部類作為監聽器,來關閉連接。也可以使用預定義的監聽器:

    f.addListener(ChannelFutureListener.CLOSE);

5.Time Client

不同於DISCARD和ECHO,TIME協議需要一個客戶端將32位的整數轉為一個日期。Netty中的客戶端和服務器最大的不同在於使用了不同的BootStrapChannel現實。

package io.netty.example.time;

public class TimeClient {
    public static void main(String[] args) throws Exception {
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)
            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}
  1. BootStapServerBootStrap很相似,但它是用於客戶端的。
  2. 只需指定一個EventLoopGroup,在客戶端中不需要boss。
  3. 使用NioSocketChannel而不是NioServerSocketChannel
  4. 不需要childOption()
  5. 使用connect()方法而不是bind()

TimeClientHandler中,將整數翻譯成日期格式的類型。

package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg; // (1)
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

6.處理基於流的傳輸問題。

TCP/IP協議接收數據並儲存到Socket緩沖區中,但是緩沖區不是數據包的隊列,而是字節的隊列,這意味著你發送了兩條消息,但操作系統會並不認為是兩條消息而是一組字節。所以在讀數據時並不能確定讀到了對方發過來的數據。

在TIME協議中,在調用m.readUnsignedInt()時緩沖區中需要有四個字節,如果緩沖區中還未接收到四個字節時就會拋出異常。

解決方法是,再加一個ChannelHandleChannelPipeline。該handler專門處理編碼問題。

package io.netty.example.time;

public class TimeDecoder extends ByteToMessageDecoder { // (1)
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
        if (in.readableBytes() < 4) {
            return; // (3)
        }
        out.add(in.readBytes(4)); // (4)
    }
}
  1. ByteToMessageDecoderChannelInboundHandler的一個實現,專門用於編碼問題。
  2. 當新的數據到達時,Netty會調用decode方法,並且其內部維護著一個累加Buffer。
  3. 當累加Buffer中沒有足夠的數據時,可以不在out中添加任何數據。當新數據到達後Netty又會調用decode方法。
  4. 如果decode()添加一個對象到out中,意味著編碼信息成功了。Netty會丟棄Buffer中已讀取的部分數據。

TimeDecoder添加到ChannelPipeline中:

b.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
    }
});

另外一種更簡單的方式是使用ReplayingDecoder

public class TimeDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(
            ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        out.add(in.readBytes(4));
    }
}

當調用in.readBytes(4)拋出異常時,ReplayingDecoder會捕捉異常並重復執行decode()

7.使用POJO代替ByteBuf

在之前的TIME服務中,都是直接使用ByteBuf作為協議的數據結構。在Handler中使用POJO對象,可以把從ByteBuf抽取POJO的代碼分離開。

首先定義UnixTime類:

package io.netty.example.time;

import java.util.Date;

public class UnixTime {

    private final long value;
    
    public UnixTime() {
        this(System.currentTimeMillis() / 1000L + 2208988800L);
    }
    
    public UnixTime(long value) {
        this.value = value;
    }
        
    public long value() {
        return value;
    }
        
    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}

TimeDecoder中解碼產生UnixTime對象

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    if (in.readableBytes() < 4) {
        return;
    }
    out.add(new UnixTime(in.readUnsignedInt()));
}

TimeClientHandler中不再需要使用ByteBuf了。

在服務器端,首先更改TimeServerHandler

@Override
public void channelActive(ChannelHandlerContext ctx) {
    ChannelFuture f = ctx.writeAndFlush(new UnixTime());
    f.addListener(ChannelFutureListener.CLOSE);
}

還需要創建一個編碼器,將UnixTime轉為ByteBuf以便網絡傳輸

public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
        out.writeInt((int)msg.value());
    }
}

netty使用指南