1. 程式人生 > >精通併發與 Netty (二)常用的 rpc 框架

精通併發與 Netty (二)常用的 rpc 框架

Google Protobuf 使用方式分析

對於 RPC 協議來說,最重要的就是物件的傳送與接收,這就要用到序列化與反序列化,也稱為編碼和解碼,序列化與反序列化和網路傳輸一般都在對應的 RPC 框架中完成。

序列化與反序列化的流程如下:

JavaBean-> stub(client) <->skeleton(server)->JavaBean,簡單點說就是編碼和解碼。

相比於 RMI 遠端方法呼叫,很多 RPC 遠端過程呼叫的跨語言的,這就需要序列化於反序列化協議也支援跨語言。Google Procobuf 就是這樣一種跨語言的序列化於反序列化協議,效率非常高(怎麼做到比其他協議效率高那?比其他協議壓縮生成的物件小)。

Netty 對於 ProtoBuf 提供了很好的支援。

先看如何單獨使用 Google ProtoBuf

  1. 新建 .proto 結構描述檔案

    syntax = "proto2";
    
    package com.paul.protobuf;
    
    //加快解析速度
    option optimize_for = SPEED;
    option java_package = "com.paul.protobuf";
    option java_outer_classname = "DataInfo";
    
    message Student{
      reuqired string name = 1;
      option int32 = 2;
      option string address = 3;
    }
  2. 使用對應的編譯檔案生成對應的 Java 類

    Proton —java_out src/main/java src/protobuf/Student.proto

  3. 這時在我們程式碼的 src/main/java 資料夾下生成了一個新的 pkg com.paul.protobuf,裡面生成了 DataInfo 類。物件會有對應的 builder 方法讓我們來構建。

  4. 測試序列化方法

    // 構建物件->位元組->物件
    public class ProtoBufTest{
      public static void main(String[] args) throws Exception{
        DataInfo.Student student = DataInfo.Student.newBuilder().setName("張三").setAge(20).setAddress("abc").build();
        byte[] student2ByteArray = student.toByteArray();
        DataInfo.Student student2 = DataInfo.Student.parseFrom(student2ByteArray);
        System.out.println(studdent2);
      }
    
    }

在來看 Netty 對 Google ProtoBuf 的支援

還是隻給出不一樣的部分(服務單和客戶端的這部分是一樣的):

@Override
protected void initChannel(SocketChannel ch) throws Exception{
  ChannelPipeline pipeline = ch.pipeline();
  pipeline.addLast(new ProtobufVarint32FrameDecoder());
  //解碼器
  pipeline.addLast(new ProtobufDecoder(DataInfo.Student.getDefaultInstance()));
  pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
  //編碼器
  pipeline.addLast(new ProtobufEncoder());
  pipeline.addLast(new MyServerHandler());
}

測試方法就是在客戶端組裝一個 DataInfo.Student 然後傳送給服務端,這裡就不演示了。

大家可能會發現上面的程式碼存在一個問題,就是上面的程式只能對 DataInfo.Student 進行編解碼,如果傳遞訊息的型別有多種怎麼辦那?

解決方案一:定義義協議,需要自己實現解碼器,通過前兩位來標識具體的 JavaBean 型別。

解決方案二:定義一個最外層的類,通過列舉的方式來確定傳遞的 JavaBean 型別。

比如我們有兩個 JavaBean

message MyMessage{
    enum DataType{
     PersonType = 1;
     DogType = 2;
     CatType = 3;
    }
  required Datatype data_type = 1;
  //oneof 在同一時刻只有一個欄位會被設定,欄位之間會共享記憶體,後面設定會自動清空前面的。
  oneof dataBody{
        Person person = 2;
    Dog dog = 3;
    Cat cat = 4;
  }
}

message Person{
    option string name = 1;
  option int32 age = 2;
  option string address = 3;
}

message Dog{
    option string name = 1;
  option int32 age = 2;
}

message Cat{
    option string name = 1;
  option int32 city = 2;
}

Pipeline 的改動(客戶端和服務端):

pipeline.addLast(new ProtobufDecoder(DataInfo.MyMessage.getDefaultInstance()));

我們自己的 handler 的改動:

@Overrode
public void channelActive(ChannelHandlerContext ctx) throws Exception{
  MyDataInfo.MyMessage myMessage = MyDataInfo.MyMessage.newBuilder().
       setDataType(DataType.PersonType.PersonType).
       setPerson(MyDataInfo.Person.newBuilder().
                setName("張三").setAge(20).
                setAddress("111").build()).
       build();
       ctx.channel().writeAndFlush(myMessage);
        
}

服務端 handler 根據 enum 的型別分別進行解析。

在實際的應用環境中,我們客戶端和服務端大概率是兩個分開的應用程式,此時我們使用 Google ProtoBuf 時 .proto 檔案和對應的 class 檔案是不是需要在兩邊都儲存一份,如果有修改會非常麻煩。下面我們介紹一種最佳實踐。

最佳實踐是使用 git 作為版本控制系統為前提的:

不那麼好的方案:git submodule,就相當於 maven 的子模組,客戶端和服務端都依賴這個模組。

比較好的方案:git subtree,將公共的程式碼合併到 server 和 client 端,相當於向 server 和 client 提交程式碼。

Apache Thrift 使用方式與檔案編寫方式分析

Apache Thrift 和 Google ProtoBuf 整體非常相似,適用於可伸縮的跨語言的服務開發。Thrift 相當於 Netty + Google ProtoBuf,是一個高效能 RPC 框架。Thrift 底層是 socket + RPC 的模式。

Thrift 是一個典型的 CS 結構,客戶端和服務端可以使用不同的語言開發,既然客戶端和服務端能使用不同的語言開發,那麼一定有一種中間語言來關聯服務端和客戶端,這就是 IDL(Interface Description Language)。

Thrift 如何實現多語言之間的通訊?

資料傳輸使用 socket (多種語言均支援),資料再以特定的格式(String 等)傳送,接收方語言進行解析。

如何使用?

定義 thrift 的檔案,由 thrift 檔案(IDL) 生成雙方語言的介面,model,在生成的 model 以及介面中會有解碼編碼的程式碼。

Thrift 中的服務

Thrift 定義服務相當於 Java 中建立 Interface 一樣,建立的 service 經過程式碼生成命令之後就會生成客戶端和服務端的框架程式碼,定義形式如下:

service HelloWorldService{
  //service 中定義的函式,相當於 java interface 中定義的方法
  string doAction(1:string name, 2:i32 age);
}

.thrift 檔案的定義

// java 中的包名
namespace jave thrift.generate
// 定義別名
typedef i16 short
typedef i32 int
typedef i64 long
typedef bool boolean
typedef string String

struct Person{
  1: optional String username,
  2: optional int age,
  3: optional boolean married
}

exception DataException{
  1: optional String message,
  2: optional String callStack,
  3: optional String date
}

service PersonService{
  Person getPersonByName(1: required String username) throws (1: DataException dataException),
  void savePerson(1:requried Person person) throws (1:DataException dataException)
}

編譯 thrift 檔案

thrift --gen java src/thrift/data.thrift

生成的檔案

Person.java 裡面包含了編解碼的方法,PersonService 裡面包含了 getPersonByName 和 savePerson 的方法。

測試方法:

服務端服務的具體實現方法

public class PersonServiceImpl implements PersonService.Iface{
  @Override
  public Person getPersonByName(String username) throws DataException,TException{
    Person p = new Person();
    p.setUserName("paul");
    p.setAge(25);
    p.setMarried(true);
    return p;
  }
  
  @Override
  public void savePerson(Person person) throws DataException,TException{
    System.out.println(person.getUserName());
  }
  
}

Thrift 的服務端:

public class ThriftServer{
  public static void main(String[] args){
    //非阻塞的 socket server
    TNonblockingServerSocket socket = new TNonblockingServerSocket(8899);
    // 高可用的 server
    THsHaServer.Args arg = new THsHaServer.Args(socket).minWorkerThreads(2).maxWorkerThreads(4);
    PersonService.Processor<PersonServiceImpl> processor = new PersonService.Processor<>(new PersonServiceImpl());
    
    arg.protocolFactory(new TCompactPrococol.Factory());
    arg.transportFactory(new TFramedTransport.Facotry());
    arg.processorFactory(new TProcessorFactory(processor));
    
    TServer server = new THsHaServer(arg);
    System.out.println("Thrift Server Started");
    //死迴圈
    server.serve();
  }
}

Thrift 的客戶端:

public class ThriftClient{
  publiuc static void main(String[] args){
    TTransport transport = new TFramedTransport(new TSocket
("localhost",8899),600);
    TProcotol procotol = new TComapctProcotol(transport);
    PersonService.Client client = new PersonService.Client(procotol);
    
    try{
      //開啟 socket
      transport.open();
      //好像呼叫本地方法一樣
      Person person = client.getPersonByName("paul");
      System.out.println(person.getAge());
    }catch(Exception ex){
      throw ex;
    }finally{
      transport.close();
    }
  }
}

Thrift 的架構:

Thrift 的傳輸格式,協議:

TBinaryProtocol-二進位制格式

TCompactProtocol-壓縮格式

TJSONProtocol-JSON 格式

TSimpleJSONProtocol-提供 JSON 只寫協議,生成的檔案很容易通過指令碼語言解析。很少使用,缺少元資料資訊,接收方不能讀取出來。

TDebugProtocol-使用易懂的可讀的文字格式,以便於 debug。

Thrift 資料傳輸方式,transport:

TSocket-阻塞式 socket。

TFramedTransport-以 frame 為單位進行傳輸,非阻塞式服務中使用。

TFileTransport-以檔案形式進行傳輸。

TMemoryTransport-將記憶體用於 I/O,Java 實現時內部實際使用了簡單的 ByteArrayOutputStream。

支援的服務模型,server:

TSimpleServer-簡單的單執行緒服務模型,常用於測試。

TThreadPoolServer-多執行緒服務模型,標準的阻塞式 IO。

TNonboockingServer-多執行緒服務模型,使用非阻塞式 IO(需要使用 TFramedTransport 資料傳輸方式)。

THsHaServer-THsHa 引入了執行緒池去處理,其模型把讀寫任務放到執行緒池處理。Half-sync/Half-async 的處理模式,Half-sync 是在處理 IO 事件上,Half-async 用於 handler 對 rpc 的同步處理