一、RpcChannel簡介:

1、RPC即遠端過程呼叫,它的提出旨在消除通訊細節、遮蔽繁雜且易錯的底層網路通訊操作,像呼叫本地服務一般地呼叫遠端服務,讓業務開發者更多關注業務開發而不必考慮網路、硬體、系統的異構複雜環境。

2、假設一個叢集環境中有node1, node2, node3, node4四個叢集節點,現在node1呼叫RPC介面向node2, node3, node4進行服務請求,其過程如下:

  a)   先將待傳遞的資料放到NIO叢集通訊框架中,由於使用的是NIO模式,執行緒無需阻塞直接返回, 由於與叢集其他節點通訊需要花銷若干時間,為了提高CPU使用率當前執行緒應該放棄CPU的使用權進行等待操作;

  b)   Node2, node3, node4分別收到請求訊息後,根據請求訊息封裝成Response物件返回;

  c)   假如NIO叢集通訊框架首先接收到node2節點的響應訊息,將Response物件儲存至響應陣列;

  d)   由於改RPC訊息響應模式為RpcResponseType.ALL_REPLY,而此時叢集通訊框架只收到了node2的響應訊息,還有node3和node4的響應訊息尚未收到,因此執行緒持續阻塞中;

  e)   此時收到了node4的響應訊息,和node2一樣,將Response物件儲存到響應陣列,最後收到node3的響應訊息時,將Response物件儲存到響應陣列;

  f)   現在所有節點的響應都已經收集完畢,是時候通知剛剛被阻塞的那條執行緒了,原來的執行緒被notify醒後拿到所有節點的響應Response[]進行處理,至此完成了整個叢集RPC過程。

3、在多執行緒環境下,由於多個執行緒都呼叫了RPC介面,此時收到的響應訊息並不知道是哪個執行緒傳送的,因此在傳送之前生成一個UUID標識,此標識要保證同socket中唯一,再把UUID與執行緒物件關係對應起來,可使用Map資料結構實現,UUID的值作為key,執行緒對應的鎖物件為value。

4、RpcCallback介面:

public interface RpcCallback {
public Serializable replyRequest(Serializable msg, Member sender);
public void leftOver(Serializable msg, Member sender);
}

接口裡面的方法是預留提供給上層具體邏輯處理的入口,replyRequest方法用於處理響應邏輯,leftOver方法用於殘留請求的邏輯處理,殘留響應是指有時我們在接收到第一個響應後就喚起執行緒。如果node1呼叫node2的RPC方法,node2收到請求訊息後,會呼叫node2上的replyRequest處理請求訊息,處理完後封裝成Response物件返回給叢集通訊框架,叢集通訊框架根據響應模式決定什麼時候返回Response物件給node1,node1收到Response物件後進行處理;如果響應模式是FIRST_REPLY或者MAJORITY_REPLY時(也就是收到第一條響應或者超過半數的響應時),還有部分響應訊息不在Response裡面,這個時候叢集通訊框架會呼叫node1節點上的leftover方法處理;

5、RPCChannel基於Tribes叢集通訊框架,是整個RPC的抽象,它實現通訊框架的ChannelListener介面,實現了該介面就能在messageReceived方法中處理接收到的訊息。RPCChannel.send方法也是呼叫的Channel.send方法實現的;

二、自定義RpcChannel Demo:

自定義一個RPC,它要實現RpcCallback介面,分別對請求處理和殘留響應處理,這裡請求處理僅僅是簡單返回“hello,response for you!”作為響應訊息,殘留響應處理則是簡單輸出“receive a leftover message!”。程式碼如下:

public class MyRPC implements RpcCallback {

  @Override
public Serializable replyRequest(Serializable msg, Member sender) {
RpcMessage mapmsg = (RpcMessage) msg;
mapmsg.message = "hello,response for you!";
return mapmsg;
} @Override
public void leftOver(Serializable msg, Member sender) {
System.out.println("receive a leftover message!");
} public static void main(String[] args) {
MyRPC myRPC = new MyRPC();
byte[] rpcId = new byte[] {1, 1, 1, 1};
byte[] key = new byte[] {0, 0, 0, 0};
String message = "hello";
int sendOptions = Channel.SEND_OPTIONS_SYNCHRONIZED_ACK | Channel.SEND_OPTIONS_USE_ACK;
RpcMessage msg = new RpcMessage(rpcId, key, (Serializable) message);
RpcChannel rpcChannel = new RpcChannel(rpcId, channel, myRPC);
RpcResponse[] resp =
rpcChannel.send(channel.getMembers(), msg, RpcResponseType.FIRST_REPLY, sendOptions, 3000);
while (true)
Thread.currentThread().sleep(1000);
}
}

三、UML圖:

1、RpcMessage定義通訊訊息協議,實現Externalizable介面自定義序列化和反序列化;message用於存放響應訊息,uuid標識用於關聯執行緒,rpcId用於標識RPC例項,reply表示是否回覆訊息;其writeExternal方法用於將RpcMessage序列化,readExternal用於將二進位制的位元組流反序列化;

2、NoRpcChannelReply表示響應訊息,繼承於RpcMessage類,其成員與RpcMessage完全相同,只是在建構函式裡面將reply變數指定為True;

3、當呼叫RpcChannel.send方法後,執行緒會阻塞住,喚醒執行緒的條件有四種:接收到第一個響應就喚起執行緒、接收到叢集中大多數節點的響應就喚起執行緒、接收到叢集中所有節點的響應才喚起執行緒、無需等待響應的無響應模式,程式碼中的定義如下:

public class RpcResponseType {
public static final int FIRST_REPLY = 1;
public static final int MAJORITY_REPLY = 2;
public static final int ALL_REPLY = 3;
public static final int NO_REPLY = 4;
}

4、Response表示響應物件,用於封裝接收到的訊息,Member在通訊框架是節點的抽象,這裡用來表示來源節點。Message表示響應的訊息,也就是RpcMessage型別的物件;

5、RpcCollectorKey只是對位元組陣列型別的id的封裝,RpcCollector 表示RPC響應集,用於存放同個UUID的所有響應,其成員變數responses儲存了響應陣列,key與UUID的意義相同,表示該key下的所有的response響應,options表示執行緒喚醒型別(FIRST_REPLY, MAJORITY_REPLY, ALL_REPLY, NO_REPLY);destcnt為整型變數,表示總共需要收到的響應數量,如果執行緒喚醒條件為MAJORITY_REPLY時,response陣列中的數量必須大於destcnt的一半以上,如果執行緒喚醒條件為ALL_REPLY,則response陣列中的數量等於destcnt時才會喚醒執行緒;在isComplete方法裡面就是根據執行緒喚醒條件判斷的:

6、RpcChannel是整個RPC的核心抽象,它實現通訊框架的ChannelListener介面,實現了該介面就能在messageReceived方法中處理接收到的訊息。它還實現了send方法,這兩個方法都是基於成員變數Channel來實現的:

Send方法通過呼叫channel.send將訊息傳送到其他叢集節點上,並阻塞執行緒等到response響應陣列收到的響應數量滿足喚醒時才將Response物件陣列返回給呼叫方;

RPC請求傳送出去後,其他叢集節點收到訊息後觸發messageReceived方法,在這個方法裡面首先呼叫replyRequest對請求訊息進行處理,處理完後將reply設定為true併發送給呼叫方,而呼叫方將其封裝成Response物件並新增到response陣列中:

而呼叫方接收到響應訊息後,將其封裝成Response物件並新增到response陣列中,此時如果滿足執行緒喚醒條件則喚醒執行緒:

通過send方法可以看到在傳送RPC請求Key新增到responseMap中,傳送後阻塞執行緒,執行緒喚醒後將該key從responseMap中刪除,messageReceived中處理響應訊息時,如果在responseMap中找不到該Key,則說明該訊息屬於殘留請求,此時應該呼叫leftOver來處理該訊息;