1. 程式人生 > >Netty學習(九)-Netty編解碼技術之Marshalling

Netty學習(九)-Netty編解碼技術之Marshalling

前面我們講過protobuf的使用,主流的編解碼框架其實還有很多種:

①JBoss的Marshalling包

②google的Protobuf

③基於Protobuf的Kyro

④Apache的Thrift

JBoss Marshalling是一個Java物件的序列化API包,修正了JDK自帶的序列化包的很多問題,但又保持跟java.io.Serializable介面的相容;同時增加了一些可調的引數和附加的特性,並且這些引數和特性可通過工廠類進行配置。

相比於傳統的Java序列化機制,它的優點如下:

1) 可插拔的類解析器,提供更加便捷的類載入定製策略,通過一個介面即可實現定製;

2) 可插拔的物件替換技術,不需要通過繼承的方式;

3) 可插拔的預定義類快取表,可以減小序列化的位元組陣列長度,提升常用型別的物件序列化效能;

4) 無須實現java.io.Serializable介面,即可實現Java序列化;

5) 通過快取技術提升物件的序列化效能。

相比於protobuf和thrift的兩種編解碼框架,JBoss Marshalling更多是在JBoss內部使用,應用範圍有限。

Protobuf全稱Google Protocol Buffers,它由谷歌開源而來,在谷歌內部久經考驗。它將資料結構以.proto檔案進行描述,通過程式碼生成工具可以生成對應資料結構的POJO物件和Protobuf相關的方法和屬性。

它的特點如下:

1) 結構化資料儲存格式(XML,JSON等);

2) 高效的編解碼效能;

3) 語言無關、平臺無關、擴充套件性好;

4) 官方支援Java、C++和Python三種語言。

首先我們來看下為什麼不使用XML,儘管XML的可讀性和可擴充套件性非常好,也非常適合描述資料結構,但是XML解析的時間開銷和XML為了可讀性而犧牲的空間開銷都非常大,因此不適合做高效能的通訊協議。Protobuf使用二進位制編碼,在空間和效能上具有更大的優勢。

Protobuf另一個比較吸引人的地方就是它的資料描述檔案和程式碼生成機制,利用資料描述檔案對資料結構進行說明的優點如下:

1) 文字化的資料結構描述語言,可以實現語言和平臺無關,特別適合異構系統間的整合;

2) 通過標識欄位的順序,可以實現協議的前向相容;

3) 自動程式碼生成,不需要手工編寫同樣資料結構的C++和Java版本;

4) 方便後續的管理和維護。相比於程式碼,結構化的文件更容易管理和維護。

Thrift源於Facebook,在2007年Facebook將Thrift作為一個開源專案提交給Apache基金會。對於當時的Facebook來說,創造Thrift是為了解決Facebook各系統間大資料量的傳輸通訊以及系統之間語言環境不同需要跨平臺的特性,因此Thrift可以支援多種程式語言,如C++、C#、Cocoa、Erlang、Haskell、Java、Ocami、Perl、PHP、Python、Ruby和Smalltalk。

在多種不同的語言之間通訊,Thrift可以作為高效能的通訊中介軟體使用,它支援資料(物件)序列化和多種型別的RPC服務。Thrift適用於靜態的資料交換,需要先確定好它的資料結構,當資料結構發生變化時,必須重新編輯IDL檔案,生成程式碼和編譯,這一點跟其他IDL工具相比可以視為是Thrift的弱項。Thrift適用於搭建大型資料交換及儲存的通用工具,對於大型系統中的內部資料傳輸,相對於JSON和XML在效能和傳輸大小上都有明顯的優勢。

Thrift主要由5部分組成:

1) 語言系統以及IDL編譯器:負責由使用者給定的IDL檔案生成相應語言的介面程式碼;

2) TProtocol:RPC的協議層,可以選擇多種不同的物件序列化方式,如JSON和Binary;

3) TTransport:RPC的傳輸層,同樣可以選擇不同的傳輸層實現,如socket、NIO、MemoryBuffer等;

4) TProcessor:作為協議層和使用者提供的服務實現之間的紐帶,負責呼叫服務實現的介面;

5) TServer:聚合TProtocol、TTransport和TProcessor等物件。

我們重點關注的是編解碼框架,與之對應的就是TProtocol。由於Thrift的RPC服務呼叫和編解碼框架繫結在一起,所以,通常我們使用Thrift的時候會採取RPC框架的方式。但是,它的TProtocol編解碼框架還是可以以類庫的方式獨立使用的。

與Protobuf比較類似的是,Thrift通過IDL描述介面和資料結構定義,它支援8種Java基本型別、Map、Set和List,支援可選和必選定義,功能非常強大。因為可以定義資料結構中欄位的順序,所以它也可以支援協議的前向相容。

Thrift支援三種比較典型的編解碼方式:

1) 通用的二進位制編解碼;

2) 壓縮二進位制編解碼;

3) 優化的可選欄位壓縮編解碼。

由於支援二進位制壓縮編解碼,Thrift的編解碼效能表現也相當優異,遠遠超過Java序列化和RMI等。

這一節我們來講解JBoss的Marshalling的使用。

和protobuf的使用不同,netty預設支援protobuf,所以為他預設了一個編解碼器:ProtobufVarint32LengthFieldPrepender,ProtobufVarint32FrameDecoder。那如果採用jboss-marshalling進行編解碼,則沒有這麼好的運氣我們需要自己優先建立一個編解碼的工廠類,供資訊通訊時候對資訊的編解碼。

pom檔案如下,需要新增兩個jar包:jboss-marshalling,jboss-marshalling-serial。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>cn.edu.hust.netty</groupId>
  <artifactId>netty</artifactId>
  <packaging>war</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>netty Maven Webapp</name>
  <url>http://maven.apache.org</url>
  <dependencies>
    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-all</artifactId>
      <version>4.1.5.Final</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.10</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>2.6.1</version>
    </dependency>
    <dependency>
      <groupId>org.jboss.marshalling</groupId>
      <artifactId>jboss-marshalling-river</artifactId>
      <version>1.4.10.Final</version>
    </dependency>
    <dependency>
      <groupId>org.jboss.marshalling</groupId>
      <artifactId>jboss-marshalling-serial</artifactId>
      <version>1.4.11.Final</version>
    </dependency>
  </dependencies>
  <build>
    <finalName>netty</finalName>
  </build>
</project>

我們先來寫一個工廠類,手動建立編解碼器:

import io.netty.handler.codec.marshalling.*;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;

/**
 * Marshalling工廠
 */
public final class MarshallingCodeCFactory {

    /**
     * 建立Jboss Marshalling解碼器MarshallingDecoder
     * @return MarshallingDecoder
     */
    public static MarshallingDecoder buildMarshallingDecoder() {
        //首先通過Marshalling工具類的精通方法獲取Marshalling例項物件 引數serial標識建立的是java序列化工廠物件。
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        //建立了MarshallingConfiguration物件,配置了版本號為5 
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        //根據marshallerFactory和configuration建立provider
        UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
        //構建Netty的MarshallingDecoder物件,倆個引數分別為provider和單個訊息序列化後的最大長度
        MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
        return decoder;
    }

    /**
     * 建立Jboss Marshalling編碼器MarshallingEncoder
     * @return MarshallingEncoder
     */
    public static MarshallingEncoder buildMarshallingEncoder() {
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
        //構建Netty的MarshallingEncoder物件,MarshallingEncoder用於實現序列化介面的POJO物件序列化為二進位制陣列
        MarshallingEncoder encoder = new MarshallingEncoder(provider);
        return encoder;
    }
}

下面是服務端:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * Created by Administrator on 2017/3/11.
 */
public class HelloWordServer {
    private int port;

    public HelloWordServer(int port) {
        this.port = port;
    }

    public void start(){
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();

        ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)
                                    .channel(NioServerSocketChannel.class)
                                    .childHandler(new ServerChannelInitializer());

        try {
            ChannelFuture future = server.bind(port).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        HelloWordServer server = new HelloWordServer(7788);
        server.start();
    }
}

服務端Initializer:

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * Created by Administrator on 2017/3/11.
 */
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();

        pipeline.addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
        pipeline.addLast(MarshallingCodeCFactory.buildMarshallingEncoder());

        // 自己的邏輯Handler
        pipeline.addLast("handler", new HelloWordServerHandler());
    }
}

注意我們在這裡加入了剛才我們寫的編解碼器哈,順序沒有關係。

服務端handler:

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * Created by Administrator on 2017/3/11.
 */
public class HelloWordServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if(msg instanceof String){
            System.out.println(msg.toString());
        }else{
            ctx.writeAndFlush("received your msg");
            Msg m = (Msg)msg;
            System.out.println("client: "+m.getBody());
            m.setBody("人生苦短,快用python");
            ctx.writeAndFlush(m);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        ctx.close();
    }
}

接下來是客戶端:

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.io.BufferedReader;
import java.io.InputStreamReader;

/**
 * Created by Administrator on 2017/3/11.
 */
public class HelloWorldClient {
    private  int port;
    private  String address;

    public HelloWorldClient(int port,String address) {
        this.port = port;
        this.address = address;
    }

    public void start(){
        EventLoopGroup group = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ClientChannelInitializer());

        try {
            ChannelFuture future = bootstrap.connect(address,port).sync();
            future.channel().writeAndFlush("Hello Netty Server ,I am a common client");
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            group.shutdownGracefully();
        }

    }

    public static void main(String[] args) {
        HelloWorldClient client = new HelloWorldClient(7788,"127.0.0.1");
        client.start();
    }
}

客戶端Initializer:

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * Created by Administrator on 2017/3/11.
 */
public class ClientChannelInitializer extends  ChannelInitializer<SocketChannel> {

    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();


        pipeline.addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
        pipeline.addLast(MarshallingCodeCFactory.buildMarshallingEncoder());

        // 客戶端的邏輯
        pipeline.addLast("handler", new HelloWorldClientHandler());
    }
}

同樣這裡也加入編解碼器。

客戶端handler:

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * Created by Administrator on 2017/3/11.
 */
public class HelloWorldClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if(msg instanceof String){
            System.out.println(msg);
        }else{
            Msg m = (Msg)msg;
            System.out.println("client: "+m.getBody());
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Msg msg = new Msg();
        msg.setHeader((byte)0xa);
        msg.setLength(34);
        msg.setBody("放縱自己,你好兄弟");

        ctx.writeAndFlush(msg);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Client is close");
    }
}

我們注意上面有一個Msg物件,這個就是我們自己定義的一個物件,用於網路傳輸用的:

import java.io.Serializable;

/**
 * 自定義一個物件
 */
public class Msg implements Serializable {
    private byte header;
    private String body;
    private long length;
    private byte type;

    public byte getHeader() {
        return header;
    }

    public void setHeader(byte header) {
        this.header = header;
    }

    public String getBody() {
        return body;
    }

    public void setBody(String body) {
        this.body = body;
    }

    public long getLength() {
        return length;
    }

    public void setLength(long length) {
        this.length = length;
    }

    public byte getType() {
        return type;
    }

    public void setType(byte type) {
        this.type = type;
    }
}

下面我們執行客戶端和服務端,可以看到訊息已經發出去了: