1. 程式人生 > >Hadoop中的RPC實現——伺服器端通訊元件

Hadoop中的RPC實現——伺服器端通訊元件

     在前一篇博文中,我已經詳細的講解了Hadoop RPC中客戶端通訊元件的實現,與之對應的就會有一套伺服器端通訊元件的實現。Hadoop RPC的伺服器端採用了多執行緒的設計,即伺服器會開啟多個處理器(後天執行緒)來處理所有客戶端傳送過來的RPC呼叫請求,所以在伺服器端主要包括三個元件:監聽器(Listener)、處理器(多個Handler)、響應傳送器(Responder)。接下來我將主要圍繞這三大元件來介紹伺服器端的工作原理。

     還是先來看看與伺服器端相關的實現類吧!


1.Server類

    從上面的類圖中,我們可以看出 Server類的核心是伺服器端的三大元件,因此Server類的本質工作就是對這三大元件進行管理,如啟動/停止元件等。

private String bindAddress; 			//服務端繫結的地址
private int port;                               //服務端監聽埠
private int handlerCount;                       //處理器的數量
private Class<? extends Writable> paramClass;   //呼叫引數的解析器類
private int maxIdleTime;                        //一個客戶端連線後的最大空閒時間
private int thresholdIdleConnections;           // 可維護的最大連線數量
int maxConnectionsToNuke;                       

private int maxQueueSize;			//處理器Handler例項佇列大小
private int socketSendBufferSize;               //Socket Buffer大小
private final boolean tcpNoDelay;               //TCP連線是否不延遲

volatile private boolean running = true;         //Server是否執行
private BlockingQueue<Call> callQueue; //待處理的rpc呼叫佇列

//維護客戶端連線的列表
private List<Connection> connectionList
//監聽Server Socket的執行緒,為處理器Handler執行緒建立任務
private Listener listener = null;
private Responder responder = null;//響應客戶端RPC呼叫的執行緒,向客戶端呼叫傳送響應資訊
private int numConnections = 0;	//連線數量
private Handler[] handlers = null;//處理器Handler執行緒陣列
    伺服器為了提高RPC的吞吐率,設計了多個處理器執行緒來處理客戶端的RPC呼叫請求,至於伺服器端可以同時開啟多少個處理器,是由屬性決定的handlerCount,該屬性的值可以在建立Server例項時設定,對於Hadoop中各個節點中的Server例項,當然可以通過配置檔案來設定的,NameNode、DataNode、JobTracker節點對應的配置項分別為:dfs.namenode.handler.countdfs.datanode.handler.countmapred.job.tracker.handler.count,而TaskTracker節點中Server內部處理器的數量是由Hadoop叢集中可執行的最大map任務數或者reduce任務數決定的。另外一個方面,受限於系統資源等原因,Server內部的處理器的數量也就是有限的了,同時等待處理器處理的RPC呼叫的佇列大小也是有限制的,因此對於每一個Server處理,待其處理的RPC呼叫請求上限(另一方面也是考慮到處理時間的原因)是:

maxQueueSize=handlerCount*Max_QUEUE_SIZE_PER_HANDLER

目前Max_QUEUE_SIZE_PER_HANDLER的值為100。

2.Listener類

private long lastCleanupRunTime; //上一次清理客戶端連線的時間
private long cleanupInterval; //清理客戶端端連線的時間間隔 
private int backlogLength; //允許客戶端等待連線的佇列長度

    監聽器Listener被用來偵聽伺服器的服務地址,處理客戶端的連線請求。當發現客戶端發起連線請求時,監聽器會建立一個對應的Connection物件,當發現客戶端傳送來RPC呼叫請求時,就呼叫對應的Connection來處理。這裡需要注意的是客戶端在連線成功之後並不會馬上向其傳送RPC呼叫,而是首先發送驗證資訊(這一點在上一篇博文中談到過),而伺服器端也會先驗證這些資訊(協議版本號、使用者許可權等),當然這個驗證在Connection的整個生命週期中只有一次,就是剛開始的時候。同時,Listener還要完成的一個工作就是對客戶端連線Connection進行管理。首先,Listener限制了客戶端等待連線的數量,也就說同時不能超過backlogLength個客戶端等待和伺服器建立連線,這個值的預設大小為128,但也可以通過配置檔案來設定,對應的配置項為:ipc.server.listen.queue.size。當伺服器端連線的客戶端數量超過threshholdIdleConnections時就會對客戶端連線進行每 cleanupInterval ms一次的清理,清理的物件主要是那些空閒時間超過 maxIdleTime ms的客戶端連線,即關閉掉這些客戶端連線Connection,同時每一次清理(關閉)Connection的數量又不能超過maxConnectionsToNuke。這些引數對應的值如下:

3.Connection類

   Connection類主要用來接受客戶端傳送過來的RPC呼叫,即把網路網路二進位制資料進行解析,最後封裝成一個Call物件,這個物件實際上就是表示一次RPC呼叫所需要的所有相關引數。之後把這個Call新增到待處理的佇列callQueue中。當然,Connection每一次在接收到一個Call之後就會更新它的lastContact值,Listener主要根據該值來計算當前Connection的空閒時間。

4.Handler類

   每一個處理器總是不停的從待處理的Call佇列callQueue獲取一個Call來處理,處理完之後就會把該Call交給響應傳送器Responder來處理,對於已經處理過的Call,無論是成功還是失敗,其處理結果都存放在Call的response屬性中了。

5.Responder類

   對於處理器已經處理過的Call,Responder不會馬上將該Call的處理結果傳送回客戶端除非該客戶端只有這一個call呼叫,否則Responder會把這個已經處理好的Call加入到其客戶端連線Connection的內部佇列responseQueue中,也就是說Responder是以Connection為排程單位來處理的。另外,還有一個檢查機制就是對於某一個客戶端的一次批量的RPC呼叫,如果在 PURGE_INTERVAL ms之後不能全部處理完的話,伺服器就會放棄為該客戶端提供服務,斷開與它之前的網路連線。目前,PURGE_INTERVAL的值為900000。

   伺服器端的工作流程如圖所示: