1. 程式人生 > >利用Netty構建自定義協議的通訊

利用Netty構建自定義協議的通訊

在複雜的網路世界中,各種應用之間通訊需要依賴各種各樣的協議,比如:HTTP,Telnet,FTP,SMTP等等。

在開發過程中,有時候我們需要構建一些適應自己業務的應用層協議,Netty作為一個非常優秀的網路通訊框架,可以幫助我們完成自定義協議的通訊。

一般而言,我們制定的協議需要兩個部分:

  • Header : 協議頭部,放置一些Meta資訊。
  • Content : 應用之間互動的資訊主體。

例如:

| Version | Content-Length | SessionId | Content |

其中Version,Content-Length,SessionId就是Header資訊,Content就是互動的主體。給這個協議起一個名字叫做luck

,依照luck協議,我們構建一個類。

// 訊息的頭部
public class LuckHeader {

    // 協議版本
    private int version;
    // 訊息內容長度
    private int contentLength;
    // 服務名稱
    private String sessionId;

    public LuckHeader(int version, int contentLength, String sessionId) {
        this.version = version;
        this.contentLength = contentLength;
        this
.sessionId = sessionId; } public int getVersion() { return version; } public void setVersion(int version) { this.version = version; } public int getContentLength() { return contentLength; } public void setContentLength(int contentLength) { this
.contentLength = contentLength; } public String getSessionId() { return sessionId; } public void setSessionId(String sessionId) { this.sessionId = sessionId; } } // 訊息的主體 public class LuckMessage { private LuckHeader luckHeader; private String content; public LuckMessage(LuckHeader luckHeader, String content) { this.luckHeader = luckHeader; this.content = content; } public LuckHeader getLuckHeader() { return luckHeader; } public void setLuckHeader(LuckHeader luckHeader) { this.luckHeader = luckHeader; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } @Override public String toString() { return String.format("[version=%d,contentLength=%d,sessionId=%s,content=%s]", luckHeader.getVersion(), luckHeader.getContentLength(), luckHeader.getSessionId(), content); } }

那麼我們在Netty中如何去對這種自定義的協議編碼(Encode)呢?

Netty中對資料進行編碼解碼需要利用Codec元件,Codec元件中分為:

  • Encoder : 編碼器,將出站的資料從一種格式轉換成另外一種格式。
  • Decoder : 解碼器,將入站的資料從一種格式轉換成另外一種格式。

LuckDecoder.java

public class LuckDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

        // 獲取協議的版本
        int version = in.readInt();
        // 獲取訊息長度
        int contentLength = in.readInt();
        // 獲取SessionId
        byte[] sessionByte = new byte[36];
        in.readBytes(sessionByte);
        String sessionId = new String(sessionByte);

        // 組裝協議頭
        LuckHeader header = new LuckHeader(version, contentLength, sessionId);

        // 讀取訊息內容
        byte[] content = in.readBytes(in.readableBytes()).array();

        LuckMessage message = new LuckMessage(header, new String(content));

        out.add(message);
    }
}

LuckEncoder.java

@ChannelHandler.Sharable
public class LuckEncoder extends MessageToByteEncoder<LuckMessage> {

    @Override
    protected void encode(ChannelHandlerContext ctx, LuckMessage message, ByteBuf out) throws Exception {

        // 將Message轉換成二進位制資料
        LuckHeader header = message.getLuckHeader();

        // 這裡寫入的順序就是協議的順序.

        // 寫入Header資訊
        out.writeInt(header.getVersion());
        out.writeInt(message.getContent().length());
        out.writeBytes(header.getSessionId().getBytes());

        // 寫入訊息主體資訊
        out.writeBytes(message.getContent().getBytes());
    }
}

編寫一個邏輯控制層,展現server接收到的協議資訊:

public class NettyLuckHandler extends SimpleChannelInboundHandler<Message> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
        // 簡單地打印出server接收到的訊息
        System.out.println(msg.toString());
    }
}

編寫完成之後,把編解碼器邏輯控制器放入初始化元件中:

public class NettyLuckInitializer extends ChannelInitializer<SocketChannel> {

    private static final LuckEncoder ENCODER = new LuckEncoder();

    @Override
    protected void initChannel(SocketChannel channel) throws Exception {

        ChannelPipeline pipeline = channel.pipeline();

        // 新增編解碼器, 由於ByteToMessageDecoder的子類無法使用@Sharable註解,
        // 這裡必須給每個Handler都新增一個獨立的Decoder.
        pipeline.addLast(ENCODER);
        pipeline.addLast(new LuckDecoder());

        // 新增邏輯控制層
        pipeline.addLast(new NettyLuckHandler());

    }
}

編寫一個服務端啟動類:

public class NettyLuckServer {

    // 指定埠號
    private static final int PORT = 8888;

    public static void main(String args[]) throws InterruptedException {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {

            ServerBootstrap serverBootstrap = new ServerBootstrap();
            // 指定socket的一些屬性
            serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)  // 指定是一個NIO連線通道
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new NettyLuckInitializer());

            // 繫結對應的埠號,並啟動開始監聽埠上的連線
            Channel ch = serverBootstrap.bind(PORT).sync().channel();

            System.out.printf("luck協議啟動地址:127.0.0.1:%d/\n", PORT);

            // 等待關閉,同步埠
            ch.closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

光有服務端並不行,沒法測試我們的server是不是成功了。所以我們還需要編寫一個客戶端程式。

LuckClientInitializer.java

public class LuckClientInitializer extends ChannelInitializer<SocketChannel> {

    private static final LuckEncoder ENCODER = new LuckEncoder();

    @Override
    protected void initChannel(SocketChannel channel) throws Exception {

        ChannelPipeline pipeline = channel.pipeline();

        // 新增編解碼器, 由於ByteToMessageDecoder的子類無法使用@Sharable註解,
        // 這裡必須給每個Handler都新增一個獨立的Decoder.
        pipeline.addLast(ENCODER);
        pipeline.addLast(new LuckDecoder());

        // and then business logic.
        pipeline.addLast(new NettyLuckClientHandler());

    }
}

LuckClientHandler.java

public class LuckClientHandler extends SimpleChannelInboundHandler<LuckMessage> {

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, LuckMessage message) throws Exception {
        System.out.println(message);
    }
}

LuckClient.java

public class LuckClient {

    public static void main(String args[]) throws InterruptedException {

        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new NettyLuckInitializer());

            // Start the connection attempt.
            Channel ch = b.connect("127.0.0.1", 8888).sync().channel();

            int version = 1;
            String sessionId = UUID.randomUUID().toString();
            String content = "I'm the luck protocol!";

            LuckHeader header = new LuckHeader(version, content.length(), sessionId);
            LuckMessage message = new LuckMessage(header, content);
            ch.writeAndFlush(message);

            ch.close();

        } finally {
            group.shutdownGracefully();
        }
    }
}

先執行NettyLuckServer.java,然後再去執行LuckClient.java可以看到控制的輸出

四月 15, 2016 11:31:34 下午 io.netty.handler.logging.LoggingHandler channelRegistered
資訊: [id: 0x92534c29] REGISTERED
四月 15, 2016 11:31:34 下午 io.netty.handler.logging.LoggingHandler bind
資訊: [id: 0x92534c29] BIND(0.0.0.0/0.0.0.0:8888)
luck協議啟動地址:127.0.0.1:8888 
四月 15, 2016 11:31:34 下午 io.netty.handler.logging.LoggingHandler channelActive
資訊: [id: 0x92534c29, L:/0:0:0:0:0:0:0:0:8888] ACTIVE
四月 15, 2016 11:31:54 下午 io.netty.handler.logging.LoggingHandler logMessage
資訊: [id: 0x92534c29, L:/0:0:0:0:0:0:0:0:8888] RECEIVED: [id: 0x67a91c6b, L:/127.0.0.1:8888 - R:/127.0.0.1:53585]
[version=1,contentLength=22,sessionId=cff7b3ea-1188-4314-abaa-de04db32d39f,content=I'm the luck protocol!]

服務端順利解析出了我們自定義的luck協議。