1. 程式人生 > >JBoss Marshalling 序列化技術 編解碼器使用 netty 例項

JBoss Marshalling 序列化技術 編解碼器使用 netty 例項

JBoss Marshalling 是一個 Java 物件序列化包,對 JDK 預設的序列化框架進行了優化,但又保持與 Serializable 介面的相容,同時增加了一些可呼叫的引數和附加的屬性,這些引數可通過工廠類進行配置。

環境準備

maven 依賴
<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-all</artifactId>
  <version>4.1.10.Final</version>
</dependency
>
<dependency> <groupId>org.jboss.marshalling</groupId> <artifactId>jboss-marshalling</artifactId> <version>1.4.11.Final</version> </dependency> <dependency> <groupId>org.jboss.marshalling</groupId> <artifactId>jboss-marshalling-serial</artifactId
>
<version>1.4.11.Final</version> </dependency>
專案結構

編碼與介紹

功能介紹 & 實現效果
// 1. 客戶端傳送5個Boy例項物件

五個藍孩子組隊去表白:
[name=同學 1, height=170, faceScore=91, isRich=false]
[name=同學 2, height=180, faceScore=89, isRich=true]
[name=同學 3, height=162, faceScore=93, isRich=false]
[name=同學 4
, height=169, faceScore=96, isRich=true] [name=同學 5, height=172, faceScore=84, isRich=false] // 2. 服務端收到依次收到每個Boy例項物件 收到表白:[name=同學 1, height=170, faceScore=91, isRich=false] 收到表白:[name=同學 2, height=180, faceScore=89, isRich=true] 收到表白:[name=同學 3, height=162, faceScore=93, isRich=false] 收到表白:[name=同學 4, height=169, faceScore=96, isRich=true] 收到表白:[name=同學 5, height=172, faceScore=84, isRich=false] // 3. 服務端(女孩)判斷此boy有沒有高富帥屬性,給予回覆 收到女生回覆 :to 同學 1 : 中意你哦! 收到女生回覆 :to 同學 2 : 中意你哦! 收到女生回覆 :to 同學 3 : 中意你哦! 收到女生回覆 :to 同學 4 : 中意你哦! 收到女生回覆 :to 同學 5 : 騷年回家繼續努力吧!
服務端開發
// Server 服務端主程式

public class Server
{
    public void bind(int port) throws InterruptedException
    {
        EventLoopGroup boss=new NioEventLoopGroup();
        EventLoopGroup worker=new NioEventLoopGroup();
        try
        {
            ServerBootstrap b=new ServerBootstrap();
            b.group(boss,worker)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1280)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>()
                    {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception
                        {
                            ch.pipeline().addLast(
                                    MarshallingCodeCFactory.buildMarshallingDecoder()
                            );
                            ch.pipeline().addLast(
                                    MarshallingCodeCFactory.buildMarshallingEncoder()
                            );
                            ch.pipeline().addLast(
                                    new ServerHandler()
                            );
                        }
                    });

            ChannelFuture f=b.bind(port).sync();

            f.channel().closeFuture().sync();
        } finally
        {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException
    {
        int port = 8005;
        new Server().bind(port);
    }
}
// ServerHandler 服務端Handler

@ChannelHandler.Sharable
public class ServerHandler extends ChannelInboundHandlerAdapter
{
    @Override
    public void channelRead(ChannelHandlerContext ctx , Object msg) throws Exception
    {
        Boy boy= (Boy) msg;
        System.out.println("收到表白:"+boy);
        ctx.writeAndFlush(reply(boy));
    }

    private static GirlResponse reply(Boy boy){
        GirlResponse gr=new GirlResponse();
        if(boy.getFaceScore()>=90||boy.getHeight()>=180||boy.isRich()){
            gr.setMsg("to "+boy.getName()+" : 中意你哦!");
        }else{
            gr.setMsg("to "+boy.getName()+" : 騷年回家繼續努力吧!");
        }
        return gr;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx , Throwable cause) throws Exception
    {
        cause.printStackTrace();
        ctx.close();
    }
}
客戶端開發
// 客戶端主程式

public class Client
{
    public void connect(int port, String host) throws InterruptedException
    {
        EventLoopGroup group = new NioEventLoopGroup();
        try
        {
            Bootstrap b=new Bootstrap();
            b.group(group)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>()
                    {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception
                        {
                            ch.pipeline().addLast(
                                    MarshallingCodeCFactory.buildMarshallingDecoder()
                            );
                            ch.pipeline().addLast(
                                    MarshallingCodeCFactory.buildMarshallingEncoder()
                            );
                            ch.pipeline().addLast(new ClientHandler());
                        }
                    });

            ChannelFuture f= b.connect(host, port).sync();
            f.channel().closeFuture().sync();
        } finally
        {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException
    {
        int port = 8005;
        String host="127.0.0.1";
        new Client().connect(port, host);
    }
}
// 客戶端Handler

public class ClientHandler extends ChannelInboundHandlerAdapter
{
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception
    {
        System.out.println("五個藍孩子組隊去表白:");
        Random r = new Random();
        for (int i = 1; i <= 5; i++)
        {
            Boy boy = new Boy();
            boy.setName("同學 " + i);
            boy.setFaceScore(r.nextInt(20) + 80);
            boy.setHeight(r.nextInt(25) + 160);
            boy.setRich(i % 2 == 0);
            ctx.write(boy);
            System.out.println(boy);
        }
        ctx.flush();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx , Object msg) throws Exception
    {
        GirlResponse gr= (GirlResponse) msg;
        System.out.println("收到女生回覆 :"+gr.getMsg());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx , Throwable cause) throws Exception
    {
        cause.printStackTrace();
        ctx.close();
    }
}
Marshalling 編解碼器工廠類 & pojo
// Marshalling 編解碼器工廠

public final class MarshallingCodeCFactory
{

    /**
     * 建立Jboss Marshalling解碼器MarshallingDecoder
     *
     * @return MarshallingDecoder
     */
    public static MarshallingDecoder buildMarshallingDecoder()
    {
        // 首先通過Marshalling工具類的精通方法獲取Marshalling例項物件 引數serial標識建立的是java序列化工廠物件。
        final MarshallerFactory marshallerFactory = Marshalling
                .getProvidedMarshallerFactory("serial");
        // 建立了MarshallingConfiguration物件,配置了版本號為5
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        // 根據marshallerFactory和configuration建立provider
        UnmarshallerProvider provider = new DefaultUnmarshallerProvider(
                marshallerFactory , configuration);
        // 構建Netty的MarshallingDecoder物件,倆個引數分別為provider和單個訊息序列化後的最大長度
        return new MarshallingDecoder(provider ,
                1024 * 1024 * 1);
    }

    /**
     * 建立Jboss Marshalling編碼器MarshallingEncoder
     *
     * @return MarshallingEncoder
     */
    public static MarshallingEncoder buildMarshallingEncoder()
    {
        final MarshallerFactory marshallerFactory = Marshalling
                .getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        MarshallerProvider provider = new DefaultMarshallerProvider(
                marshallerFactory , configuration);
        // 構建Netty的MarshallingEncoder物件,MarshallingEncoder用於實現序列化介面的POJO物件序列化為二進位制陣列
        return new MarshallingEncoder(provider);
    }
}

要進行序列化的類一定要實現 Seriaizable介面,並且新增serialVersionUID

// Boy.java

public class Boy implements Serializable
{

    private static final long serialVersionUID = 3333140133888151024L;
    /**
     * 姓名
     */
    private String name;

    /**
     *  身高
     */
    private int height;

    /**
     *  顏值
     */
    private int faceScore;

    /**
     *  是否富有
     */
    private boolean isRich;

    @Override
    public String toString()
    {
        return "[name="+name+", height="+height+", faceScore="+faceScore+", isRich="+isRich+"]";
    }

// GirlResponse.java

public class GirlResponse implements Serializable
{
    private static final long serialVersionUID = -2619994978640439932L;
    private int code;
    private String msg;

粘包拆包思考

測試了下粘包拆包的場景,發現程式執行依舊正常,說明Marshalling 編解碼器本身是支援粘包拆包的處理。

究其原因,來看下原始碼:

// MarshallingEncoder 部分原始碼

@Sharable
public class MarshallingEncoder extends MessageToByteEncoder<Object> {

    private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
    。。。。
    。。。。
    。。。。
    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
        Marshaller marshaller = provider.getMarshaller(ctx);
        int lengthPos = out.writerIndex();
        out.writeBytes(LENGTH_PLACEHOLDER);

可以看到Marshalling編碼器在編碼時會在報文頭部加上4個位元組(LENGTH_PLACEHOLDER 用來記錄報文長度)。

// MarshallingDecoder 部分原始碼

public class MarshallingDecoder extends LengthFieldBasedFrameDecoder {
。。。
。。。
。。。
public MarshallingDecoder(UnmarshallerProvider provider, int maxObjectSize) {
    // 預設頭部長度為4位元組
        super(maxObjectSize, 0, 4, 0, 4);
        this.provider = provider;
    }

從上面原始碼可以發現Marshalling 繼承自LengthFieldBaseFrameDecoder, 這是netty 提供的一個用於解決粘包拆包問題的解碼器,主要原理就是在報文頭部記錄報文長度。

遇到的坑

Marshalling 的jar 版本小於2
// 在版本大於2的情況下,以下獲取到的MarshallerFactory 為null
final MarshallerFactory marshallerFactory = Marshalling
                .getProvidedMarshallerFactory("serial");
pojo類要實現Serializable介面