精通併發與 Netty (二)常用的 rpc 框架
Google Protobuf 使用方式分析
對於 RPC 協議來說,最重要的就是物件的傳送與接收,這就要用到序列化與反序列化,也稱為編碼和解碼,序列化與反序列化和網路傳輸一般都在對應的 RPC 框架中完成。
序列化與反序列化的流程如下:
JavaBean-> stub(client) <->skeleton(server)->JavaBean,簡單點說就是編碼和解碼。
相比於 RMI 遠端方法呼叫,很多 RPC 遠端過程呼叫的跨語言的,這就需要序列化於反序列化協議也支援跨語言。Google Procobuf 就是這樣一種跨語言的序列化於反序列化協議,效率非常高(怎麼做到比其他協議效率高那?比其他協議壓縮生成的物件小)。
Netty 對於 ProtoBuf 提供了很好的支援。
先看如何單獨使用 Google ProtoBuf
新建 .proto 結構描述檔案
syntax = "proto2"; package com.paul.protobuf; //加快解析速度 option optimize_for = SPEED; option java_package = "com.paul.protobuf"; option java_outer_classname = "DataInfo"; message Student{ reuqired string name = 1; option int32 = 2; option string address = 3; }
使用對應的編譯檔案生成對應的 Java 類
Proton —java_out src/main/java src/protobuf/Student.proto
這時在我們程式碼的 src/main/java 資料夾下生成了一個新的 pkg com.paul.protobuf,裡面生成了 DataInfo 類。物件會有對應的 builder 方法讓我們來構建。
測試序列化方法
// 構建物件->位元組->物件 public class ProtoBufTest{ public static void main(String[] args) throws Exception{ DataInfo.Student student = DataInfo.Student.newBuilder().setName("張三").setAge(20).setAddress("abc").build(); byte[] student2ByteArray = student.toByteArray(); DataInfo.Student student2 = DataInfo.Student.parseFrom(student2ByteArray); System.out.println(studdent2); } }
在來看 Netty 對 Google ProtoBuf 的支援
還是隻給出不一樣的部分(服務單和客戶端的這部分是一樣的):
@Override
protected void initChannel(SocketChannel ch) throws Exception{
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ProtobufVarint32FrameDecoder());
//解碼器
pipeline.addLast(new ProtobufDecoder(DataInfo.Student.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
//編碼器
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new MyServerHandler());
}
測試方法就是在客戶端組裝一個 DataInfo.Student 然後傳送給服務端,這裡就不演示了。
大家可能會發現上面的程式碼存在一個問題,就是上面的程式只能對 DataInfo.Student 進行編解碼,如果傳遞訊息的型別有多種怎麼辦那?
解決方案一:定義義協議,需要自己實現解碼器,通過前兩位來標識具體的 JavaBean 型別。
解決方案二:定義一個最外層的類,通過列舉的方式來確定傳遞的 JavaBean 型別。
比如我們有兩個 JavaBean
message MyMessage{
enum DataType{
PersonType = 1;
DogType = 2;
CatType = 3;
}
required Datatype data_type = 1;
//oneof 在同一時刻只有一個欄位會被設定,欄位之間會共享記憶體,後面設定會自動清空前面的。
oneof dataBody{
Person person = 2;
Dog dog = 3;
Cat cat = 4;
}
}
message Person{
option string name = 1;
option int32 age = 2;
option string address = 3;
}
message Dog{
option string name = 1;
option int32 age = 2;
}
message Cat{
option string name = 1;
option int32 city = 2;
}
Pipeline 的改動(客戶端和服務端):
pipeline.addLast(new ProtobufDecoder(DataInfo.MyMessage.getDefaultInstance()));
我們自己的 handler 的改動:
@Overrode
public void channelActive(ChannelHandlerContext ctx) throws Exception{
MyDataInfo.MyMessage myMessage = MyDataInfo.MyMessage.newBuilder().
setDataType(DataType.PersonType.PersonType).
setPerson(MyDataInfo.Person.newBuilder().
setName("張三").setAge(20).
setAddress("111").build()).
build();
ctx.channel().writeAndFlush(myMessage);
}
服務端 handler 根據 enum 的型別分別進行解析。
在實際的應用環境中,我們客戶端和服務端大概率是兩個分開的應用程式,此時我們使用 Google ProtoBuf 時 .proto 檔案和對應的 class 檔案是不是需要在兩邊都儲存一份,如果有修改會非常麻煩。下面我們介紹一種最佳實踐。
最佳實踐是使用 git 作為版本控制系統為前提的:
不那麼好的方案:git submodule,就相當於 maven 的子模組,客戶端和服務端都依賴這個模組。
比較好的方案:git subtree,將公共的程式碼合併到 server 和 client 端,相當於向 server 和 client 提交程式碼。
Apache Thrift 使用方式與檔案編寫方式分析
Apache Thrift 和 Google ProtoBuf 整體非常相似,適用於可伸縮的跨語言的服務開發。Thrift 相當於 Netty + Google ProtoBuf,是一個高效能 RPC 框架。Thrift 底層是 socket + RPC 的模式。
Thrift 是一個典型的 CS 結構,客戶端和服務端可以使用不同的語言開發,既然客戶端和服務端能使用不同的語言開發,那麼一定有一種中間語言來關聯服務端和客戶端,這就是 IDL(Interface Description Language)。
Thrift 如何實現多語言之間的通訊?
資料傳輸使用 socket (多種語言均支援),資料再以特定的格式(String 等)傳送,接收方語言進行解析。
如何使用?
定義 thrift 的檔案,由 thrift 檔案(IDL) 生成雙方語言的介面,model,在生成的 model 以及介面中會有解碼編碼的程式碼。
Thrift 中的服務
Thrift 定義服務相當於 Java 中建立 Interface 一樣,建立的 service 經過程式碼生成命令之後就會生成客戶端和服務端的框架程式碼,定義形式如下:
service HelloWorldService{
//service 中定義的函式,相當於 java interface 中定義的方法
string doAction(1:string name, 2:i32 age);
}
.thrift 檔案的定義
// java 中的包名
namespace jave thrift.generate
// 定義別名
typedef i16 short
typedef i32 int
typedef i64 long
typedef bool boolean
typedef string String
struct Person{
1: optional String username,
2: optional int age,
3: optional boolean married
}
exception DataException{
1: optional String message,
2: optional String callStack,
3: optional String date
}
service PersonService{
Person getPersonByName(1: required String username) throws (1: DataException dataException),
void savePerson(1:requried Person person) throws (1:DataException dataException)
}
編譯 thrift 檔案
thrift --gen java src/thrift/data.thrift
生成的檔案
Person.java 裡面包含了編解碼的方法,PersonService 裡面包含了 getPersonByName 和 savePerson 的方法。
測試方法:
服務端服務的具體實現方法
public class PersonServiceImpl implements PersonService.Iface{
@Override
public Person getPersonByName(String username) throws DataException,TException{
Person p = new Person();
p.setUserName("paul");
p.setAge(25);
p.setMarried(true);
return p;
}
@Override
public void savePerson(Person person) throws DataException,TException{
System.out.println(person.getUserName());
}
}
Thrift 的服務端:
public class ThriftServer{
public static void main(String[] args){
//非阻塞的 socket server
TNonblockingServerSocket socket = new TNonblockingServerSocket(8899);
// 高可用的 server
THsHaServer.Args arg = new THsHaServer.Args(socket).minWorkerThreads(2).maxWorkerThreads(4);
PersonService.Processor<PersonServiceImpl> processor = new PersonService.Processor<>(new PersonServiceImpl());
arg.protocolFactory(new TCompactPrococol.Factory());
arg.transportFactory(new TFramedTransport.Facotry());
arg.processorFactory(new TProcessorFactory(processor));
TServer server = new THsHaServer(arg);
System.out.println("Thrift Server Started");
//死迴圈
server.serve();
}
}
Thrift 的客戶端:
public class ThriftClient{
publiuc static void main(String[] args){
TTransport transport = new TFramedTransport(new TSocket
("localhost",8899),600);
TProcotol procotol = new TComapctProcotol(transport);
PersonService.Client client = new PersonService.Client(procotol);
try{
//開啟 socket
transport.open();
//好像呼叫本地方法一樣
Person person = client.getPersonByName("paul");
System.out.println(person.getAge());
}catch(Exception ex){
throw ex;
}finally{
transport.close();
}
}
}
Thrift 的架構:
Thrift 的傳輸格式,協議:
TBinaryProtocol-二進位制格式
TCompactProtocol-壓縮格式
TJSONProtocol-JSON 格式
TSimpleJSONProtocol-提供 JSON 只寫協議,生成的檔案很容易通過指令碼語言解析。很少使用,缺少元資料資訊,接收方不能讀取出來。
TDebugProtocol-使用易懂的可讀的文字格式,以便於 debug。
Thrift 資料傳輸方式,transport:
TSocket-阻塞式 socket。
TFramedTransport-以 frame 為單位進行傳輸,非阻塞式服務中使用。
TFileTransport-以檔案形式進行傳輸。
TMemoryTransport-將記憶體用於 I/O,Java 實現時內部實際使用了簡單的 ByteArrayOutputStream。
支援的服務模型,server:
TSimpleServer-簡單的單執行緒服務模型,常用於測試。
TThreadPoolServer-多執行緒服務模型,標準的阻塞式 IO。
TNonboockingServer-多執行緒服務模型,使用非阻塞式 IO(需要使用 TFramedTransport 資料傳輸方式)。
THsHaServer-THsHa 引入了執行緒池去處理,其模型把讀寫任務放到執行緒池處理。Half-sync/Half-async 的處理模式,Half-sync 是在處理 IO 事件上,Half-async 用於 handler 對 rpc 的同步處理