1. 程式人生 > >Hbase-0.98.6原始碼分析--Put寫操作Client端流程

Hbase-0.98.6原始碼分析--Put寫操作Client端流程

        客戶端程式寫資料通過HTable和Put進行操作,我們從客戶端程式碼開始分析寫資料的流程:


       可以看到,客戶端寫資料最終的呼叫了HTableInterface的put()方法,因為HTableInterface只是一個介面,所以最終呼叫的是它的子類HTable的put()方法。進入HTable.put():


       從上面程式碼可以看出:你既可以一次put一行記錄也可以一次put多行記錄,兩個方法內部都會呼叫doPut方法,最後再來根據autoFlush(預設為true),即自動提交,判斷是否需要flushCommits刷寫提交,在autoFlush為false的時候,如果當前容量超過了緩衝區大小(預設值為:2097152=2M),也會呼叫flushCommits方法。也就是說,在自動提交情況下,你可以手動控制通過一次put多條記錄,然後將這些記錄flush,以提高寫操作吞吐量。

       首先看下flushCommits()方法:


       只是簡單地呼叫了backgroundFlushCommits()方法,該方法會在後面講到。

       進入doPut()方法:

       從上面的程式碼可以看出,backgroundFlushCommits()這個重新整理操作可以是制定非同步提交還是同步提交,從doPut方法中來看預設是以非同步的方式進行,這裡的ap是AsyncProcess類的一個例項,該類使用多執行緒的來實現非同步的請求,也就是說,並非每一次put操作都是直接往HBase裡面寫資料的,而是等到快取區域內的資料多到一定程度(預設設定是2M),再進行一次寫操作。當然這次操作在Server端應當還是要排隊執行的,具體執行機制這裡不作展開。可以確定的是,HConnection在HTable的put操作中,只是起到一個定位RegionServer的作用,在定位到RegionServer之後,操作都是由cilent端通過rpc呼叫完成的。這個結論在插入/查詢/刪除中是一致的。

       writeAsyncBuffer.add(put)就是向一個非同步緩衝區新增該操作,然後當一定條件的時候進行flash,當發生flash操作的時候,才會真正的去執行該操作,這主要是提高系統的吞吐率,接下來我們去看看這個flush的操作內部。


     看下waitUntilDone()方法:

    進入waitForMaximumCurrentTasks()方法:


      由這個waitForMaximumCurrentTasks()方法,可以清晰了了解到waitUntilDone()方法的操作流程,具體要等待到什麼時候呢?等到tasksSent的值減去tasksDone的值等於0,tasksSent表示提交的任務數,tasksDone表示完成的任務數。

       現在就可以重新總結一下backgroundFlushCommits()方法,在第965行,submit()方法傳入的引數是true,表示需要等待rpc呼叫結束。第980行,如果有部分資料提交失敗,同時沒有設定清空失敗的資料時,把資料重新新增到writeAsyncBuffer列表中。最後在finally塊中,清空當前currentWriteBufferSize的大小,如果有資料沒有提交成功,
重新把未提交的資料的大小計算起來新增到currentWriteBufferSize中。

       比較doPut()和flushCommits(),如果在doput的過程中,也就是呼叫htable.put(Put)的時候,如果快取大小超過了客戶端寫快取大小的限制,呼叫backgroundFlushCommits()方法方法是非同步的;而在flushcommit方法中,backgroundFlushCommits()這個方法是同步的。

       接下來就是重要的提交過程,submit()方法:



       進入sendMultiAction()方法,看它是如何傳送put請求的:



       從上面的程式碼可以看出,每個任務都是通過HBase的RPC框架與伺服器進行通訊,並獲取返回的結果。其中最重要的兩個元件我用紅色方框已經圈出,看下他倆的具體實現:


       先構造一個MultiServerCallable,然後再通過rpcCallerFactory將其封裝為RpcRetryingCaller做最後的call操作。

檢視MultiServerCallable:


       註釋裡就說的很明白了,client端通過MultiServerCallable.call()方法呼叫res的rpc的multi()方法,來實現put提交請求。可以想象,根據講過的《Hadoop RPC機制-原理篇》,HRegionServer端必定也有一個multi()方法。

       總結put操作:
  (1)把put操作新增到writeAsyncBuffer佇列裡面,符合條件(自動flush或者超過了閥值writeBufferSize)就通過AsyncProcess非同步批量提交。
  (2)在提交之前,我們要根據每個rowkey找到它們歸屬的region server,這個定位的過程是通過HConnection的locateRegion方法獲得的,然後再把這些rowkey按照HRegionLocation分組。在獲得具體region位置的時候,會對最近使用的region server做快取,如果快取中儲存了相應的region server資訊,就直接使用這個region資訊,連線這個region server,否則會對master進行一次rpc操作,獲得region server資訊,客戶端的操作put、get、delete等操作每次都是封裝在一個Action物件中進行提交操作的,都是一系列的的action一起提交,這就是MultiAction。
  (3)通過多執行緒,一個HRegionLocation構造MultiServerCallable<Row>,然後通過rpcCallerFactory.<MultiResponse> newCaller()執行呼叫,忽略掉失敗重新提交和錯誤處理,客戶端的提交操作到此結束。

      下篇文章將會介紹HRegionServer如何響應客戶端發出的Put請求。