1. 程式人生 > >HBase原始碼分析2 – RPC機制:客戶端

HBase原始碼分析2 – RPC機制:客戶端

先澄清一些本文中術語的涵意

客戶端 – 指的是HBase client API.提供了從使用者程式連線到HBase後臺伺服器即Master server及Region server的功能

服務端 – 即指的是HBase的Master server 及 Region server

使用者端 – 指使用者程式.即對HBase client API的呼叫方.

本篇的主要目的是說明RPC的客戶端實現.解決客戶端RPC的最後兩個問題

1)      傳輸,併發及會話控制

2)      其它的保障,如出錯,重試等.

首先是RPC傳輸,併發.及會話

前一篇基礎,已經說明HBase client可以得到HMasterInterface 和HRegionInterface的介面例項.具體參看org.apache.hadoop.hbase.client.HConnectionManager.TableServers的兩個方法getMaster()和getHRegionConnection(). 從這點說,HBase的RPC與RMI是相似的,即通本地的樁(stub)物件,來傳輸呼叫,但對使用者來說,這是透明的.

這個實現主要分兩大步驟

  1. 生成介面的例項.這是通過代理類來實現的.
  2. 方法呼叫轉化為序列化的socket傳輸.代理類將方法,引數序列化後,用socket傳給服務端

第一步,這兩個介面的例項,是如何例項化出來?

毫無疑問的是這兩個例項是樁(stub)或稱為代理,樁的建立是從org.apache.hadoop.hbase.ipc.HBaseRPC.getProxy()這個靜態方法中得到的.

下面就詳細的說明一下getProxy() ,它的核心是利用的java反射中的動態代理框架.呼叫java.lang.reflect.Proxy的靜態方法newProxyInstance,它需要三個引數:

1)ClassLoader,

2)要實現的介面類(可以有多個),例如HRegionInterface,

3)最後是函式呼叫代理類java..lang.reflect.InvocationHandler的例項.

也就是說Proxy.newProxyInstantce的功能是將產生一個Object,這個Object實現了指定的介面,其實就是將介面與InvocationHandler的例項綁定了. 對介面中方法的呼叫將被轉發到InvocationHandler例項上.getProxy()將生成的物件轉化成VersionProtocol物件然後返回.然後再根據外部的呼叫將VersionProtocol轉化為具體的HRegionInterface或HMasterInterface

在這步,InvocationHandler是由HBaseRPC的內部類Invoker實現的.所有的RPC呼叫就落實到了它的invoke方法上.invoke又利用org.apache.hadoop.hbase.HBaseClient來完成呼叫的傳輸.

第二步中,主要是

1)      序列化.

2)      Socket傳輸.

序列化在第一篇中,已談到了一點,這裡講一下具體的流程

1) 呼叫的方法和引數被封裝成Invocation(HBaseRPC內部類)物件. Invocation本身就是一個Writable物件.函式名被傳化成一個編碼code.可以看到Invocation內部有兩個表,即方法名到一個位元組值的雙向的對映.首先方法名被排序了,然後它們對應的位元組code從0依次增一.過載的方法編碼相同,但可以根據引數不同來區分的.引數依次被序列化成HbaseObjectWritable物件.這個通過個Invocation物件,呼叫就可以輸出成資料流或從資料流中輸入.

2)這個協議的下層封裝是由HBaseClient的內部類Call實現的,Call裝入前述的Invocation物件,從當前的連線類HBaseClient取得一個id,這是一個自增量,然後將這個id及流的長度及Invocation通過socket連線傳送出去.並同步等待(Call.Wait())返回結果.Call物件例項被放入了另一個內部類Connection的一個表calls中,是id(integer)到Call例項的對映.如果Call例項的完成標誌被置,則說明結果被儲存在了它的value欄位中.後續反序列化的就不詳述了.

以後就Socket傳輸的機制工作了.

Connection主要負責傳輸. 主要成員socket(java.net.Socket)是一條到HBase Server連線. 在這條連線上, Connection是傳輸呼叫並接收結果.它也支援併發.前面提到的calls表存放了所有正在進行傳輸的Call.每個Call都是從一特定的呼叫者執行緒中發起.但資料接收卻不在呼叫者執行緒.

Connection本身是java.lang.Thread的子類,在它本身的執行緒會在socket連線建立時啟動,功能是迴圈檢查當前有沒有呼叫Call存在,如果有就試著從socket上讀取結果.並分析出結果是對應到哪個(查calls表)Call,將結果放入Call,並設定Call的完成標誌,通知(call.notify())呼叫者執行緒.

最後介紹客戶端的RPC容錯和重試機制.

從使用者端看到API並不是直接的HRegionInterface或HMasterInterface,而是HTable之類已經包裝的比較高階的API.

HTable這層的功能封裝是較複雜的,因為這一層的操作會分配到不同HBase伺服器上,比如說,從上層看只是對一個表的查詢,但在HTable這層被分解成為了對不同HRegionInterface的呼叫.因為一個表是有多個Region的,而不同的Region被分配到了不同的Region Server上了,而一個HRegionInterface又代表了一個Region Server.另外,為了獲得Region在伺服器上的分佈,還要掃描Root表和Meta表等等.

本文的重點在是在HTable的業務邏輯與RPC底層機制之間部分—容錯機制.

HTable 所有引起RPC呼叫的方法,一般都會呼叫到HConnectionManager.TableServers的getRegionServerWithRetries()這個方法,

以簡單的HTable.delete(Delete)這個函式為例,你可以看到一個對HRegionInterface.delete的呼叫被封裝成了一個org.apache.hadoop.hbase.client.ServerCallable物件.這個物件然後被作為引數傳入了getRegionServerWithRetires方法呼叫.

在這個方法內部,這個ServerCallable物件被呼叫到,成功了就返回,否則嘗試呼叫多次,預設10次.每次呼叫時的間隔還逐漸拉長.其中還將出錯丟擲的異常記到一個列表中,以便最終失敗的時候分析原因.

考慮以下情況,當一個Region正被遷移,最初時Delete中的row key物件可能由Region Server 1來管理,但當呼叫發生後客戶端收到了出錯,因為此時這個row key從屬的Region由Region Server 2來服務了.在這種重試機制下,都會經過兩個流程

1)      呼叫ServerCallable.instantiateServer(). 這步會重置,這個row key所屬的location及對應的Region server.這樣就可以獲得region server 2了.

2)      呼叫ServerCallable.call().重新發起呼叫.

這個機制也兼顧了一些特別情況,比如客戶端可以收到一條稱為DoNotRetryIOException的異常,這樣客戶端就不會再試了.舉例來說,當執行Scan操作時,當Region遷移時,服務端會要求客戶端在其它Region server上重啟Scan,而不要繼續.