1. 程式人生 > >HBase的put流程原始碼分析

HBase的put流程原始碼分析

hbase是一個nosql型資料庫,本文我們會分析一下客戶的資料是通過什麼樣的路徑寫入到hbase的。

HBase作為一種列族資料庫,其將相關性較高的列聚合成一個列族單元,不同的列族單元物理上儲存在不同的檔案(HFile)內。一個表的資料會水平切割成不同的region分佈在叢集中不同的regionserver上。客戶端訪問叢集時會首先得到該表的region在叢集中的分佈,之後的資料交換由客戶端和regionserver間通過rpc通訊實現,下面我們從hbase原始碼裡探究客戶端put資料的流程。本文參考的原始碼是1.1.2版本的hbase

1)客戶端

put在客戶端的操作主要分為三個步驟,下面分別從三個步驟展開解釋:

(一)、客戶端快取使用者提交的put請求

get/delete/put/append/increment等等等等客戶可用的函式都在客戶端的HTable.java檔案中。 在HTable.java檔案中有如下的兩個變數: private RpcRetryingCallerFactory rpcCallerFactory; private RpcControllerFactory rpcControllerFactory; protected AsyncProcess multiAp; 如上的幾個變數分別定義了rpc呼叫的工廠和一個非同步處理的程序 客戶端的put請求呼叫getBufferedMutator().mutate(put),進入mutate這個函式可以看到它會把使用者提交的此次put操作放入到列表writeAsyncBuffer中,當buffer中的資料超過規定值時,由後臺程序進行提交。

(二)、將writeBuffer中的put操作根據region的不同進行分組,分別放入不同的Map集合程序提交由函式backgroudFlushCommits完成,提交動作包含同步提交和非同步提交兩種情況,由傳入的引數boolean控制。進入上述函式分析。
可見當傳入backgroudFlushCommits的引數為false時執行的是非同步提交,引數為true時執行的是同步提交。與此同時,可以發現無論非同步提交還是同步提交,實際的提交動作是由AsyncProcess ap執行的,呼叫的語句如下: ap.submit(tableName,writeAsyncBuffer,true,null,false) 
需要注意的是多數情況下執行的是非同步提交,只有在非同步提交出錯的情況下執行同步提交。 進入submit函式,可以看到它迴圈遍歷引數writeAsyncBuffer中的每一行,通過connection.locateRegion函式找到其在叢集的位置loc,將該位置與操作action一起繫結在變數actionByServer中。 這裡的region定位是由ClusterConnection型別的變數connection完成的,進入其locateRegion方法可以看出,如果客戶端有快取,則直接從快取讀取,否則從META表中讀出了region所處的位置,並快取此次的讀取結果。返回的結果是RegionLocations型別的變數。actionByServer是一個Map<ServerName,MulteAction<Row>>型別的變數,從該變數的型別定義可以看出,其將使用者的一批寫請求中,寫入regionserver地址相同的動作歸類到一起。(三)、提交服務端RegionServer處理,在回撥函式中與服務端互動。最後呼叫sumitMultiActions函式將所有請求提交給服務端,它接受了上面的actionByServer作為引數,內部例項化一個AsyncRequestFutureImpl類執行非同步的提交動作
從sendMultiAction函式中一步步向裡檢視程式碼,其將使用者的action請求通過getNewMultiActionRunnable、SingleServerRequestRunnable層層呼叫最終落到了hbase的RPC框架中,每個使用者請求包裝成包裝MultiServerCallable物件,其是一個Runnable物件,在該物件中使用者請求與服務端建立起RPC聯絡。所有的runnable物件最終交到AsyncProcess物件的內部執行緒池中處理執行。2)服務端
客戶端MultiServerCallable的call方法中呼叫了服務端的multi函式執行提交動作,進入服務端。multi方法內部會根據請求是否是原子請求,執行不同的操作語句,這裡我們以非原子性提交為例,其執行了doNonAtomicRegionMutation()函式,這個函式中先進行一些rpc請求的編碼,將編碼後的action相關資訊組織到一個List<ClientProtos.Action>型別的變數mutations中,這裡的編碼採用的proto buffer的編碼方案,然後呼叫doBatchOp()語句,其接受了mutations作為引數。 在doBatchOp函式中,可以看到其最終呼叫的batchMutate執行的批量操作,這裡操作的結果會返回到OperationStatus型別的變數codes[]中,包括了以下幾種狀態:BAD_FAMILY;SANITY_CHECK_FAILURE;SUCCESS等狀態。 這些狀態記錄了每個action的執行結果,包括成功啦、失敗啦等等。就一步地這些請求被包裝成一個MutationBatch型別的物件傳入batchMutate,batchMutatue首先判斷一下資源的狀態,然後呼叫doMiniBatchMutation()執行最終的put操作,該操作返回的是寫入資料的大小addedSize,根據addedSize計算此時memstore的size以決定是否flush,如果達到了flush的要求,執行requestFlush()。doMiniBatchMutation接受了MutationBatch型別的物件繼續作為其引數。關鍵程式碼如下所示:
  1. while (!batchOp.isDone()) {   //操作未完成前一直迴圈
  2.   if (!batchOp.isInReplay()) {  
  3.       checkReadOnly();              //判斷是否是隻讀狀態
  4.   }  
  5.   checkResources();               //檢查相關資源
  6.   if (!initialized) {  
  7.       this.writeRequestsCount.add(batchOp.operations.length);   //更新寫請求計數器
  8.       if (!batchOp.isInReplay()) {  
  9.         doPreMutationHook(batchOp);  
  10.       }  
  11.       initialized = true;  
  12.   }  
  13.   long addedSize = doMiniBatchMutation(batchOp);    //最終的put操作是落在這裡的
  14.   long newSize = this.addAndGetGlobalMemstoreSize(addedSize);     //以原子操作的方式增加Region上的MemStore記憶體的大小
  15.   if (isFlushSize(newSize)) {    //判斷memstore的大小是否達到閾值,決定是否flush
  16.       requestFlush();  
  17.   }  
  18. }  
服務端的put主要實現在HRegion.java的doMiniBatchMutation(),該函式主要利用了group commit技術,即多次修改一起寫。 首先對於所有要修改的行,一次性拿住所有行鎖,在2944行實現。 rowLock = getRowLockInternal(mutation.getRow(),shouldBlock) ,注意的是這裡的鎖是寫鎖。put和delete在客戶端都是由這個函式實現的,在2960行鍼對兩者的不同第一次出現了不同的處理,分別將put和delete操作歸類到putsCfSet和deletesCfSet兩個不同的集合中,這兩個集合分別代表了put/delete的列族集合,資料型別為Set<byte[]>。 第二步是修正keyvalue的時間戳,把action裡面的所有kv時間戳修正為最新的時間。時間戳修正之後,在3009行 lock(this.updatesLock.readLock(),numReadyToWrite) 加入了讀鎖。 然後獲得該批寫入memstore資料的批次號mvccNum,mvccNum同時也是此次寫事務的版本號,由this.sequenceId加一獲得的 然後通過w=mvcc.beginMemstoreInsertWithSeqNum(mvccNum),進入函式beginMemstoreInsertWithSeqNum,可以看見,該函式通過傳入的mvccNum new一個新的WriteEntry物件,然後將WriteEntry放入佇列writeQueue中,這一步加佇列的操作是被鎖保護起來的。 writeQueue佇列用於儲存多個併發寫事務的WriteEntry。 
然後,就是將batch中的資料寫入到各個store的memstore中,並根據batch中的資料構建WAL edit。 構造WAL edit之後,將該條資料對應的table name、region info、cluster id等等包裝成一個HLogKey結構的物件,該物件即為walkey,將walKey和WAL edit共同組裝成一個entry之後將之append到記憶體中的ringbuffer資料結構中。 注意的是這次的append操作產生一個HLog範圍內的id,記作txid。txid用於標識這次寫事務寫入的HLog日誌。 寫入buffer後,即釋放所有的行鎖,兩階段鎖過程結束。然後在3153行 syncOrDefer(txid,durability) 將這次事務的日誌持久化到hfs中,一旦持久化完成便提交此次事務,程式碼在3170行,其呼叫了completeMemstoreInsertWithSeqNum(),走進這個函式會發現其在寫入mvccnum之後,呼叫了waitForPreviousTransactoinsComplete()函式,這個函式實際是推進了mvcc的memstoreRead,推進的思路如下: 先鎖上writeQueue佇列,然後一個一個看,找連續的已完成的WriteEntry,最後一個WriteEntry的writeNumber即是最新的點,此時可以賦值給mvcc.memstoreRead,後續讀事務一開始就去拿mvcc.memstoreRead,從而能夠拿到本次寫入的資料。 這裡要補充一句,此時寫入的資料儲存在memstore中,並沒有持久化到hdfs中,記憶體中的key-value是以skip list的資料結構儲存的。
總結上面hbase的寫路徑可以發現在hbase的寫入過程中應用到了如下的一些技術:
首先,客戶端的rpc請求傳遞到服務端時,函式AsyncRequestFutureImpl()是一個Lazy優化,或者說是一個非同步的優化,雖然函式聲明瞭一個對服務端的rpc呼叫,但是它並沒有馬上呼叫服務端,而是在需要時才真正呼叫服務端。
第二,資料提交時採用了group commit技術,理解group commit可以用挖煤做比喻,是一鏟子一鏟子挖比較快,還是一次挖出一車比較省力。
第三,MVCC即多版本併發控制
限於篇幅和本人的知識有限,以上所說的只是簡單描述了hbase的寫事務的主幹路徑,並簡要指出了其中的關鍵技術點,此外還有冪等控制、回滾操作、錯誤處理以及寫入執行緒模型等等等等,即便是提到的mvcc、group commit也只是蜻蜓點水,如果展開還有很多很精彩的內容值得大家研究,如果你也對hbase感興趣,歡迎與我一起討論,共同提高。
參考資料:http://www.cnblogs.com/foxmailed/p/3897884.html