1. 程式人生 > >Netty 整合 MessagePack 序列化框架 + LengthFieldBasedFrameDecoder 自定義解碼器

Netty 整合 MessagePack 序列化框架 + LengthFieldBasedFrameDecoder 自定義解碼器

環境準備及說明

 如果是匯入二進位制開發包,則如下所示:

需要開發包的可以參考《 MessagePack 開發入門詳解》。

如果是 Maven 專案,則新增如下依賴:

		<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
		<dependency>
			<groupId>io.netty</groupId>
			<artifactId>netty-all</artifactId>
			<version>4.1.30.Final</version>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.msgpack/msgpack -->
		<dependency>
			<groupId>org.msgpack</groupId>
			<artifactId>msgpack</artifactId>
			<version>0.6.12</version>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.javassist/javassist -->
		<dependency>
			<groupId>org.javassist</groupId>
			<artifactId>javassist</artifactId>
			<version>3.24.0-GA</version>
		</dependency>

1)netty-all:是 Netty 開發包

2)msgpack:是 Messagepck 序列化開發包

3)javassist:是 msgpack 自己的依賴包

    本文示例專案結構如下:

1)User:網路傳輸的 POJO 物件,注意:序列化 POJO 必須加 org.msgpack.annotation.Message 註解:@Message

2)echo:包中為 netty 通訊的客戶端與服務端

3)messagepack:包中為 MessagePack 編解碼器

特別提醒:

1)雖然 MessagePack 用於序列化物件,但是普通 String、Integer 等等同樣也是物件,所以照樣可以傳輸普通的字串等訊息
2)需要序列化的 POJO 物件上必須加上 org.msgpack.annotation.Message 註解:@Message,否則傳輸會失敗,而且也不報錯,很難排查
3)MessagePack 序列化物件後的訊息,經過傳送後,接收端 channelRead(ChannelHandlerContext ctx, Object msg)
    3.1)即使傳送的是 User 物件,接收端的 msg 也不能進行 User user = (User)msg 強轉,否則客戶端會被強制斷開連線
    3.2)如果傳送的是 User 物件,接收端可以轉為 List<Object> objects = (List<Object>) msg,list 中的元素對應 User 的屬性值
    3.3)如果傳送的不是 POJO 物件,而是簡單的 String 物件,則不能轉為 List<Object>,否則客戶端也會被強制斷開

MessagePack 編解碼器

     利用 Netty 的編解碼框架可以非常方便的整合第三方序列化框架,Netty 預集成了幾種常用的編解碼框架,使用者也可以根據自己專案的實際情況整合其它編解碼框架,或者進行自定義。

MessagePack 編碼器

package com.example.messagepack;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.msgpack.MessagePack;

/**
 * Created by Administrator on 2018/11/25 0025.
 * MessagePack 編碼器 —— 繼承 Netty 的 MessageToByteEncoder,比重寫方法
 */
public class MsgpackEncoder extends MessageToByteEncoder<Object> {

    /**
     * 重寫方法,負責將 Object 型別的 POJO 物件編碼為 byte 陣列,然後寫入 ByteBuf 中
     *
     * @param channelHandlerContext
     * @param o
     * @param byteBuf
     * @throws Exception
     */

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
        MessagePack messagePack = new MessagePack();

        /** 序列化物件*/
        byte[] raw = messagePack.write(o);
        byteBuf.writeBytes(raw);
    }
}

MessagePack 解碼器

package com.example.messagepack;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import org.msgpack.MessagePack;

import java.util.List;

/**
 * Created by Administrator on 2018/11/25 0025.
 * MessagePack 解碼器 - 繼承 Netty 的 MessageToMessageDecoder,並重寫方法
 */
public class MsgpackDecoder extends MessageToMessageDecoder<ByteBuf> {
    /**
     * 重寫方法,首先從資料報 byteBuf 中獲取需要解碼的 byte 陣列,
     * 然後呼叫 MessagePack 的 read 方法將其反序列化為 Object 物件,將解碼後的物件加入到解碼列表 list 中,
     * 這樣就完成了 MessagePack 的解碼操作
     *
     * @param channelHandlerContext
     * @param byteBuf
     * @param list
     * @throws Exception
     */
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        int length = byteBuf.readableBytes();
        byte[] array = new byte[length];
        byteBuf.getBytes(byteBuf.readerIndex(), array, 0, length);

        MessagePack messagePack = new MessagePack();
        list.add(messagePack.read(array));
    }
}

POJO  User

package com.example.domain;

import org.msgpack.annotation.Message;

import java.util.Date;

/**
 * Created by Administrator on 2018/11/25 0025.
 * 使用者 實體
 * 需要序列化的 POJO 物件上必須加上 org.msgpack.annotation.Message 註解:@Message
 */
@Message
public class User {
    private Integer pId;
    private String pName;
    private Date birthday;
    private Boolean isMarry;

    public Date getBirthday() {
        return birthday;
    }

    public void setBirthday(Date birthday) {
        this.birthday = birthday;
    }

    public Integer getpId() {
        return pId;
    }

    public void setpId(Integer pId) {
        this.pId = pId;
    }

    public String getpName() {
        return pName;
    }

    public void setpName(String pName) {
        this.pName = pName;
    }

    public Boolean getIsMarry() {
        return isMarry;
    }

    public void setIsMarry(Boolean isMarry) {
        this.isMarry = isMarry;
    }

    @Override
    public String toString() {
        return "User{" +
                "birthday=" + birthday +
                ", pId=" + pId +
                ", pName='" + pName + '\'' +
                ", isMarry=" + isMarry +
                '}';
    }
}

Netty 網路通訊

     首先模擬的情況是:客戶端連線上伺服器後,給伺服器連發訊息,伺服器接收後會將原資訊進回覆,同時會解決 TCP 粘包與拆包。會使用 Netty 的 LengthFieldPrepender、LengthFieldBasedFrameDecoder 編解碼器處理半包訊息,不會出現 TCP 粘包/拆包。

服務端

EchoServer 內容如下:

package com.example.echo;

import com.example.messagepack.MsgpackDecoder;
import com.example.messagepack.MsgpackEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

/**
 * Created by Administrator on 2018/11/11 0011.
 * Echo 伺服器
 */
public class EchoServer {
    public static void main(String[] args) {
        int port = 9898;
        new EchoServer().bind(port);
    }

    public void bind(int port) {
        /**
         * interface EventLoopGroup extends EventExecutorGroup extends ScheduledExecutorService extends ExecutorService
         * 配置服務端的 NIO 執行緒池,用於網路事件處理,實質上他們就是 Reactor 執行緒組
         * bossGroup 用於服務端接受客戶端連線,workerGroup 用於進行 SocketChannel 網路讀寫*/
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            /** ServerBootstrap 是 Netty 用於啟動 NIO 服務端的輔助啟動類,用於降低開發難度
             * */
            final ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                            // 設定TCP連線超時時間
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            System.out.println(Thread.currentThread().getName() + ",伺服器初始化通道...");
                            /**
                             * 為了處理半包訊息,新增如下兩個 Netty 內建的編解碼器
                             * LengthFieldPrepender:前置長度域編碼器——放在MsgpackEncoder編碼器前面
                             * LengthFieldBasedFrameDecoder:長度域解碼器——放在MsgpackDecoder解碼器前面
                             * 關於 長度域編解碼器處理半包訊息,本文不做詳細講解,會有專門篇章進行說明
                             */
                            ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2));
                            ch.pipeline().addLast("MessagePack encoder", new MsgpackEncoder());
                            ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
                            ch.pipeline().addLast("MessagePack Decoder", new MsgpackDecoder());
                            ch.pipeline().addLast(new EchoServerHandler());
                        }
                    });

            /**伺服器啟動輔助類配置完成後,呼叫 bind 方法繫結監聽埠,呼叫 sync 方法同步等待繫結操作完成*/
            ChannelFuture f = b.bind(port).sync();

            System.out.println(Thread.currentThread().getName() + ",伺服器開始監聽埠,等待客戶端連線.........");

            /**下面會進行阻塞,等待伺服器連線關閉之後 main 方法退出,程式結束* */
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            /**優雅退出,釋放執行緒池資源*/
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

EchoServerHandler 內容如下:

package com.example.echo;

import com.example.domain.User;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by Administrator on 2017/5/16.
 * ChannelInboundHandlerAdapter extends ChannelHandlerAdapter 用於對網路事件進行讀寫操作
 */
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 因為多執行緒,所以使用原子操作類來進行計數
     */
    private static AtomicInteger atomicInteger = new AtomicInteger();

    /**
     * 收到客戶端訊息,自動觸發
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println((atomicInteger.addAndGet(1)) + "--->" + Thread.currentThread().getName() + ",The server receive  order : " + msg);

        /**
         * 如果傳輸的是 POJO 物件,則可以轉成 List<Object>
         * list 中的每一個元素都是傳送來的 POJO 物件的屬性值
         * 注意:如果對方傳輸只是簡單的 String 物件,則不能強轉為 List<Object>
         */
       
        /* List<Object> objects = (List<Object>) msg;
        for (Object obj : objects) {
            System.out.println("屬性:" + obj);
        }*/

        /**
         * 服務端接收到客戶端傳送來的資料後,再回發給客戶端
         */
        ctx.writeAndFlush(msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("-----客戶端關閉:" + ctx.channel().remoteAddress());
        /**當發生異常時,關閉 ChannelHandlerContext,釋放和它相關聯的控制代碼等資源 */
        ctx.close();
    }
}

客戶端

EchoClient 內容如下:

package com.example.echo;

import com.example.messagepack.MsgpackDecoder;
import com.example.messagepack.MsgpackEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;

/**
 * Created by Administrator on 2017/5/16.
 * Echo 客戶端
 */
public class EchoClient {

    /**
     * 使用 2 個執行緒模擬 2 個客戶端
     *
     * @param args
     */
    public static void main(String[] args) {
        for (int i = 0; i < 2; i++) {
            new Thread(new MyThread()).start();
        }
    }

    static class MyThread implements Runnable {

        @Override
        public void run() {
            connect("192.168.1.20", 9898);
        }

        public void connect(String host, int port) {
            /**配置客戶端 NIO 執行緒組/池*/
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                /**Bootstrap 與 ServerBootstrap 都繼承(extends)於 AbstractBootstrap
                 * 建立客戶端輔助啟動類,並對其配置,與伺服器稍微不同,這裡的 Channel 設定為 NioSocketChannel
                 * 然後為其新增 Handler,這裡直接使用匿名內部類,實現 initChannel 方法
                 * 作用是當建立 NioSocketChannel 成功後,在進行初始化時,將它的ChannelHandler設定到ChannelPipeline中,用於處理網路I/O事件*/
                Bootstrap b = new Bootstrap();
                b.group(group).channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                                // 設定TCP連線超時時間
                        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                                System.out.println(Thread.currentThread().getName() + ",客戶端初始化管道...");
                                /**
                                 * 為了處理半包訊息,新增如下兩個 Netty 內建的編解碼器
                                 * LengthFieldPrepender:前置長度域編碼器——放在MsgpackEncoder編碼器前面
                                 * LengthFieldBasedFrameDecoder:長度域解碼器——放在MsgpackDecoder解碼器前面
                                 * 關於 長度域編解碼器處理半包訊息,本文不做詳細講解,會有專門篇章進行說明
                                 */
                                ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2));
                                ch.pipeline().addLast("MessagePack encoder", new MsgpackEncoder());
                                ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
                                ch.pipeline().addLast("MessagePack Decoder", new MsgpackDecoder());
                                ch.pipeline().addLast(new EchoClientHandler());
                            }
                        });

                /**connect:發起非同步連線操作,呼叫同步方法 sync 等待連線成功*/
                ChannelFuture channelFuture = b.connect(host, port).sync();
                System.out.println(Thread.currentThread().getName() + ",客戶端發起非同步連線..........");

                /**等待客戶端鏈路關閉*/
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                /**優雅退出,釋放NIO執行緒組*/
                group.shutdownGracefully();
            }
        }
    }
}

EchoClientHandler 內容如下:

package com.example.echo;

import com.example.domain.User;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by Administrator on 2017/5/17.
 * 用於對網路事件進行讀寫操作
 */
public class EchoClientHandler extends ChannelInboundHandlerAdapter {

    /**
     * 因為 Netty 採用執行緒池,所以這裡使用原子操作類來進行計數
     */
    private static AtomicInteger atomicInteger = new AtomicInteger();

    /**
     * 當客戶端和服務端 TCP 鏈路建立成功之後,Netty 的 NIO 執行緒會呼叫 channelActive 方法
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        /**
         * 多餘 陣列、List、Set、Map 等,對立面的元素逐個進行傳送,則對方也是逐個接收
         * 否則如果直接傳送 陣列、List、Set、Map 等,則對方會統一接收
         * 注意:因為使用LengthFieldPrepender、LengthFieldBasedFrameDecoder編解碼器處理半包訊息
         * 所以這裡連續傳送也不會出現 TCP 粘包/拆包
         */
        List<User> users = getUserArrayData();
        for (User user : users) {
            ctx.writeAndFlush(user);
        }
        ctx.writeAndFlush("我是普通的字串訊息" + Thread.currentThread().getName());
    }

    /**
     * 當服務端返回應答訊息時,channelRead 方法被呼叫,從 Netty 的 ByteBuf 中讀取並列印應答訊息
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println((atomicInteger.addAndGet(1)) + "---" + Thread.currentThread().getName() + ",Server return Message:" + msg);
    }

    /**
     * 當發生異常時,列印異常 日誌,釋放客戶端資源
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        /**釋放資源*/
        ctx.close();
    }

    /**
     * 設定網路傳輸的 POJO 物件陣列/列表
     *
     * @return
     */
    public List<User> getUserArrayData() {
        User[] users = new User[5];
        User loopUser = null;
        for (int i = 0; i < 5; i++) {
            loopUser = new User();
            loopUser.setpId(i + 1);
            loopUser.setpName("華安" + Thread.currentThread().getName());
            loopUser.setIsMarry(true);
            loopUser.setBirthday(new Date());
            users[i] = loopUser;
        }
        return Arrays.asList(users);
    }
}

執行測試

     先執行伺服器,再執行客戶端。