1. 程式人生 > >《netty權威指南》之JBoss序列化框架Marshalling

《netty權威指南》之JBoss序列化框架Marshalling

前面講了netty解決拆包粘包的問題
我們發現拆包粘包問題的解決都只是解決netty傳送字串的情況
在企業及開發中很少有直接使用字串的,一般都有定義好的訊息體,這個訊息體一定對應實體類
如果要傳送實體類那麼久一定要對實體類做序列化
(序列化就是把檔案或者記憶體中的資料結構轉換為位元組陣列以便儲存或在網路傳輸)
今天就介紹一下jboss的marshalling序列化框架

下面文章是聽了白老師的Netty教程寫的,發現網上這段程式碼很多,
反而是Marshalling並沒有太多的介紹,有時間研究一下Marshalling並把連結貼在這兒

server端程式碼

package com.lyzx.netty.netty04;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * @author hero.li
 * netty編解碼之Marshalling
 */
public class Server {

    public static void main(String[] args) throws InterruptedException {
        //開啟兩個執行緒組,一個用於接受客戶端的請求   另一個用於非同步的網路IO的讀寫
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        //Netty啟動的輔助類 裡面封裝了客戶端和服務端的連結以及如何處理選擇器 selector等邏輯
        ServerBootstrap b = new ServerBootstrap();

        //傳入兩個執行緒組,設定傳輸塊大小為1k,新增ServerHandler型別的過濾器(表示如何處理這些訊息,過濾器當然要整合netty的一個介面)
        b.group(bossGroup,workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG,1024)
                .childOption(ChannelOption.SO_KEEPALIVE,Boolean.TRUE)
                .childHandler(new ChannelInitializer<SocketChannel>(){
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception{
                        ChannelHandler[] arr = {MarshallingCodeCFactory.marshallingDecoder(),
                                                MarshallingCodeCFactory.marshallingEncoder(),
                                                new ServerHandler()};
                        ch.pipeline().addLast(arr);
                    }
                });

        //同步等待繫結埠結束
        ChannelFuture f = b.bind(9988).sync();
        //等待服務端監聽埠關閉
        f.channel().closeFuture().sync();
        //優雅的關閉執行緒組
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

server Handler程式碼

package com.lyzx.netty.netty04;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;


/**
 * 對於網事件做讀寫,通常只要關注channelRead()和exceptionCaught()即可
 */
public class ServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("server:channelRead____通道可讀開始");
        NettyRequest nr = (NettyRequest)msg;
        System.out.println("server:收到的訊息____:"+nr);

        String datetime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss:SSS"));
        nr.setMsg(datetime);
        ctx.channel().writeAndFlush(nr);
        System.out.println("server:channelRead____通道可讀結束");
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("server:channelReadComplete____通道可讀完成 ");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("server:exceptionCaught____發生異常");
        ctx.close();
    }

}

client端程式碼

package com.lyzx.netty.netty04;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;


public class Client {

    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY,true)
                .handler(new ChannelInitializer<SocketChannel>(){
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception{
                            ChannelHandler[] arr = {MarshallingCodeCFactory.marshallingDecoder(),
                                                    MarshallingCodeCFactory.marshallingEncoder(),
                                                    new ClientHandler()};
                            ch.pipeline().addLast(arr);
                    }
                });
        ChannelFuture f = b.connect("127.0.0.1", 9988).sync();
        f.channel().closeFuture().sync();
        group.shutdownGracefully();
    }
}

clienthandler程式碼

package com.lyzx.netty.netty04;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client:channelActive____通道啟用開始");
        for(int i=0;i<20;i++){
            NettyRequest req = new NettyRequest();
            req.setId((long)i);
            req.setMsg("data_"+i);
            ctx.channel().writeAndFlush(req);
            System.out.println("..."+req);
        }
        System.out.println("client:channelActive____通道啟用結束");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception {
        System.out.println("client____:通道可讀開始");
        NettyRequest nr = (NettyRequest)msg;
        System.out.println("client____response time:"+nr);
        System.out.println("client____:通道可讀結束");
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client:通道可讀完成");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("client:發生異常");

    }
}

其他工具類

package com.lyzx.netty.netty04;

import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;

/**
 * Marshalling工廠
 * @author(alienware)
 * @since 2014-12-16
 */
public final class MarshallingCodeCFactory {

    /**
     * 建立Jboss Marshalling解碼器MarshallingDecoder
     * @return MarshallingDecoder
     */
    public static MarshallingDecoder marshallingDecoder() {
    	//首先通過Marshalling工具類的方法獲取Marshalling例項物件 引數serial標識建立的是java序列化工廠物件。
		final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
		//建立了MarshallingConfiguration物件,配置了版本號為5 
		final MarshallingConfiguration configuration = new MarshallingConfiguration();
		configuration.setVersion(5);
		//根據marshallerFactory和configuration建立provider
		UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
		//構建Netty的MarshallingDecoder物件,倆個引數分別為provider和單個訊息序列化後的最大長度
		MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
		return decoder;
    }

    /**
     * 建立Jboss Marshalling編碼器MarshallingEncoder
     * @return MarshallingEncoder
     */
    public static MarshallingEncoder marshallingEncoder() {
		final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
		final MarshallingConfiguration configuration = new MarshallingConfiguration();
		configuration.setVersion(5);
		MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
		//構建Netty的MarshallingEncoder物件,MarshallingEncoder用於實現序列化介面的POJO物件序列化為二進位制陣列
		MarshallingEncoder encoder = new MarshallingEncoder(provider);
		return encoder;
    }
}
package com.lyzx.netty.netty04;

import java.io.Serializable;

public class NettyRequest implements Serializable {

    private Long id;
    private int code;
    private String msg;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public int getCode() {
        return code;
    }

    public void setCode(int code) {
        this.code = code;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    @Override
    public String toString() {
        return "NettyRequest{" +
                "id=" + id +
                ", code=" + code +
                ", msg='" + msg + '\'' +
                '}';
    }

}