1. 程式人生 > >Netty5 序列化方式(Jboss Marshalling)

Netty5 序列化方式(Jboss Marshalling)

java netty 序列化


Netty作為很多高性能的底層通訊工具,被很多開發框架應用再底層,今天來說說常用的序列化工具,用Jboss的Marshalling。


直接上代碼,Marshalling的工廠類

package com.netty.serialize.marshalling;

import io.netty.handler.codec.marshalling.*;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;

/**
 * Created by sdc on 2017/8/28.
 */
public class MarshallingCodeCFactory {

    /**
     * 解碼
     * @return
     */
    public static MarshallingDecoder buildMarshallingDecoder() {
        //首先通過Marshalling工具類的精通方法獲取Marshalling實例對象 參數serial標識創建的是java序列化工廠對象。
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        //創建了MarshallingConfiguration對象,配置了版本號為5
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        //根據marshallerFactory和configuration創建provider
        UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
        //構建Netty的MarshallingDecoder對象,倆個參數分別為provider和單個消息序列化後的最大長度
        MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
        return decoder;
    }

    /**
     * 編碼
     * @return
     */
    public static MarshallingEncoder buildMarshallingEncoder() {
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
        //構建Netty的MarshallingEncoder對象,MarshallingEncoder用於實現序列化接口的POJO對象序列化為二進制數組
        MarshallingEncoder encoder = new MarshallingEncoder(provider);
        return encoder;
    }

}

這個是Marshalling的序列化方式,Marshalling自帶編解碼,所以不用擔心中途編解碼半包的問題。


服務端的Server實現:

package com.netty.serialize.server;

import com.netty.serialize.coder.MsgDecoder;
import com.netty.serialize.coder.MsgEncoder;
import com.netty.serialize.handler.ServerHandler;
import com.netty.serialize.marshalling.MarshallingCodeCFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * Created by sdc on 2017/8/26.
 */
public class MsgServer {

    public void bind(int port) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap sb = new ServerBootstrap();
            sb.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
//                    .childHandler(new ChildChannelHandler())
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                            channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                            channel.pipeline().addLast(new ServerHandler());
                        }
                    })
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture cf = sb.bind(port).sync();
            System.out.println("服務端已啟動");

            cf.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static class ChildChannelHandler extends ChannelInitializer {

        protected void initChannel(Channel channel) throws Exception {
            channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
            channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
            channel.pipeline().addLast(new ServerHandler());
        }

    }

    public static void main(String[] args){
        try {
            new MsgServer().bind(8080);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
package com.netty.serialize.handler;

import com.netty.serialize.message.Message;
import com.netty.serialize.message.MsgHeader;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

/**
 * 用於測試服務端實現的
 * Created by sdc on 2017/8/29.
 */
public class ServerHandler extends ChannelHandlerAdapter{

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
//        System.out.println("active");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
//        ctx.close();
        super.exceptionCaught(ctx, cause);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Message newMsg = (Message)msg;
//        String msgStrClient = (String)msg;
        System.out.println("獲取客戶端裏的內容:" + newMsg);

        Message message = new Message();
        String msgStr = "客戶端接受到通知";
        MsgHeader header = new MsgHeader();
        header.setStartTag(new Byte("0"));
        header.setCmdCode("1234".getBytes());
        header.setLength(msgStr.length());
        header.setVersion("11".getBytes());

        message.setBody(msgStr);
        message.setHeader(header);

        ctx.writeAndFlush(message);
    }

}


客戶端的實現:

package com.netty.serialize.client;

import com.netty.serialize.coder.MsgDecoder;
import com.netty.serialize.coder.MsgEncoder;
import com.netty.serialize.handler.ClientHandler;
import com.netty.serialize.handler.ServerHandler;
import com.netty.serialize.marshalling.MarshallingCodeCFactory;
import com.netty.serialize.message.Message;
import com.netty.serialize.message.MsgHeader;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * Created by sdc on 2017/8/26.
 */
public class MsgClient {

    public void connect(String ip, int port) throws Exception {
        EventLoopGroup workerGroup = new NioEventLoopGroup();

//        Message message = new Message();
//        String msgStr = "我想發送一條消息";
//        MsgHeader header = new MsgHeader();
//        header.setStartTag(new Byte("0"));
//        header.setCmdCode("1234".getBytes());
//        header.setLength(msgStr.length());
//        header.setVersion("11".getBytes());
//
//        message.setBody(msgStr);
//        message.setHeader(header);
        try {
            Bootstrap bs = new Bootstrap();
            bs.group(workerGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)//
                    .handler(new ChildChannelHandler());

            ChannelFuture f = bs.connect(ip,port).sync();

            //寫入消息
//            f.channel().writeAndFlush(message).sync();

            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }

    public static class ChildChannelHandler extends ChannelInitializer {
        protected void initChannel(Channel channel) throws Exception {
            channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
            channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
            channel.pipeline().addLast(new ClientHandler());
        }
    }

    public static void main(String[] args){
        try {
            new MsgClient().connect("127.0.0.1", 8080);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
package com.netty.serialize.handler;

import com.netty.serialize.message.Message;
import com.netty.serialize.message.MsgHeader;
import io.netty.channel.*;
import io.netty.util.ReferenceCountUtil;

/**
 * Created by sdc on 2017/8/29.
 */
public class ClientHandler extends ChannelHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Message message = new Message();
        String msgStr = "我想發送一條消息";
        MsgHeader header = new MsgHeader();
        header.setStartTag(new Byte("0"));
        header.setCmdCode("1234".getBytes());
        header.setLength(msgStr.length());
        header.setVersion("11".getBytes());

        message.setBody(msgStr);
        message.setHeader(header);
        ctx.writeAndFlush(message).addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    // do sth
                    System.out.println("成功發送到服務端消息");
                } else {
                    // do sth
                    System.out.println("失敗服務端消息失敗");
                }
            }
        });
//        ctx.writeAndFlush(message);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//        ctx.close();
        super.exceptionCaught(ctx, cause);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            Message newMsg = (Message) msg;
            System.out.println("收到服務端的內容" + newMsg);
        }finally {
            ReferenceCountUtil.release(msg);
        }
    }

}


傳輸的POJO的類,是自定義的封裝好的信息。

package com.netty.serialize.message;

import java.io.Serializable;

/**
 * Created by sdc on 2017/8/26.
 */
public class Message implements Serializable{

    /**
     *
     */
    private static final long serialVersionUID = 4923081103118853877L;

    private MsgHeader header;

    private Object body;

    //檢驗和
//    private byte crcCode;

//    public byte getCrcCode() {
//        return crcCode;
//    }
//
//    public void setCrcCode(byte crcCode) {
//        this.crcCode = crcCode;
//    }

    public MsgHeader getHeader() {
        return header;
    }

    public void setHeader(MsgHeader header) {
        this.header = header;
    }

    public Object getBody() {
        return body;
    }

    public void setBody(Object body) {
        this.body = body;
    }

    @Override
    public String toString() {
        return "Message{" +
                "header=" + header +
                ", body=" + body +
//                ", crcCode=" + crcCode +
                ‘}‘;
    }
}
package com.netty.serialize.message;

import java.io.Serializable;
import java.util.Arrays;

/**
 * Created by sdc on 2017/8/26.
 */
public class MsgHeader implements Serializable{

    /**
     *
     */
    private static final long serialVersionUID = 4923081103118853877L;

    //固定頭
    private byte startTag;

    //命令碼,4位
    private byte[] cmdCode;

    //版本 2位
    private byte[] version;

    private int length;

    public byte[] getVersion() {
        return version;
    }

    public void setVersion(byte[] version) {
        this.version = version;
    }

    public byte[] getCmdCode() {
        return cmdCode;
    }

    public void setCmdCode(byte[] cmdCode) {
        this.cmdCode = cmdCode;
    }

    public byte getStartTag() {
        return startTag;
    }

    public void setStartTag(byte startTag) {
        this.startTag = startTag;
    }

    public int getLength() {
        return length;
    }

    public void setLength(int length) {
        this.length = length;
    }

    @Override
    public String toString() {
        return "MsgHeader{" +
                "startTag=" + startTag +
                ", cmdCode=" + Arrays.toString(cmdCode) +
                ", version=" + Arrays.toString(version) +
                ", length=" + length +
                ‘}‘;
    }
}


到此就完事了,netty的版本,和marshalling的版本,其他的版本我不清楚會不會有什麽錯誤。

<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.11</version>
    <scope>test</scope>
</dependency>

<!--netty -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>5.0.0.Alpha2</version>
</dependency>

<!--jboss-marshalling -->
<dependency>
    <groupId>org.jboss.marshalling</groupId>
    <artifactId>jboss-marshalling-serial</artifactId>
    <version>2.0.0.Beta2</version>
</dependency>



Netty5 序列化方式(Jboss Marshalling)