1. 程式人生 > >HBase的Scan實現原始碼分析

HBase的Scan實現原始碼分析

public Cell peek() {
   if (this.current == null) {
      return null;
   }
   return this.current.peek();
}

講完了上述三個重要的資料結構,迴歸到hbase系統,HBase的表資料分為多個層次,分別是HRegion->HStore->[HFile,HFile,....,MemStoreFile]。一個表首先會水平分片形成多個HRegion,一個HRegion內不同的Column Family對應著不同的HStore,一個HStore下包含多個HFile和一個Memstore,資料寫入時先寫入MemstoreFile,MemStoreFile會不斷重新整理形成新的HFile。
複雜的資料結構形成了複雜的Scanner,在一個scan流程中,會形成如下描述的scanner物件:每個region的資料讀取由一個RegionScanner物件負責,RegionScanner有一個scanner的優先佇列,裡面放的是storeScanner(這個優先佇列在實現上是由多個StoreScanner組成的堆,使用RegionScanner的成員變數KeyValueHeap storeHeap來表示)。 每個StoreScanner物件對應著一個Column Family內的資料讀取,其也有一個KeyValueHeap型別的成員變數heap,儲存的是隸屬於該store的MemStoreScanner和StoreFileScanner。 storeHeap&StoreScanner的構造程式碼如下所示,在RegionScannerImpl中,會遍歷該Region下的所有store,並針對每個store建立對應的StoreScanner。
for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
          scan.getFamilyMap().entrySet()) {         //遍歷該region下的各store
        Store store = stores.get(entry.getKey());
        KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);  //new一個該store的StoreScanner
        if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
          || this.filter.isFamilyEssential(entry.getKey())) {
          scanners.add(scanner);        //將不同的StoreScanner歸入不同的scanner list中
        } else {
          joinedScanners.add(scanner);
        }
}
initializeKVHeap(scanners, joinedScanners, region);   //然後用這些StoreScanner初始化一個KeyValueHeap

store.getScanner就是針對每個store建立一個StoreScanner,最後一步的initializeKVHeap則將這些StoreScanner構建成一個堆儲存在RegionScanner的成員變數storeHeap中,用於遍歷取該region下所有store中的資料,而storeScanner同樣是一個由FileScanner組成的heap,其在store.getScanner中完成對每個store的storeScanner構造,建構函式中的關鍵兩步如下所示:
// Pass columns to try to filter out unnecessary StoreFiles.
    List<KeyValueScanner> scanners = getScannersNoCompaction();  //返回該Store下對應的MemStore/StoreFile Scanner

    // Seek all scanners to the start of the Row (or if the exact matching row
    // key does not exist, then to the start of the next matching Row).
    // Always check bloom filter to optimize the top row seek for delete
    // family marker.
    seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery
        && lazySeekEnabledGlobally, isParallelSeekEnabled);     //對這些StoreFileScanner和MemStoreScanner分別進行seek
                                                                //seekKey是matcher.getStartKey()

getScannersNoCompaction()返回這個Store下包含的HFileScanner和memstoreScanner,儲存在scanners中。 seekScanners就是在memstore或者hfile中定位到指定的keyValue(通常是scan時startKey&endKey指定的keyValue),如果指定的keyValue不存在,則seek到指定keyValue的下一個元素。實際實現時這裡採用了lazy seek優化,優化的目的是為了不需要對所有的HFile進行seek尋找目標keyValue,而只需對keyValue真實存在的HFile進行seek。 典型的客戶端發起scan請求的程式碼如下所示:
Scan scan = new Scan();
scan.setStartRow(........);
scan.setStopRow(........);
Result result;
try (ResultScanner rs = table.getScanner(scan)) {
   while ((result=rs.next()) != null) {
       //your code here
   }
}

進入上述程式碼的getScanner方法,會發現其new一個ClientScanner物件,該物件包含了使用者傳入的Scan物件以及快取、連線、重試次數、表名、region資訊等引數。ClientScanner建構函式的最後呼叫initializeScannerInConstruction(),這個函式實際上包裝了一個如下的呼叫: nextScanner(this.caching, false); 其中,this.caching是一個int型變數,表示一次scan的rpc請求返回的結果數量,返回結果儲存在客戶端的cache中。 進入nextScanner函式,其首先檢查是否已scan至表尾,如果已scan至表尾則關閉scan並返回false給客戶端,否則更新localStartKey作為本次scan的開始位置,並將輸入引數this.caching賦值給nbRows以表示本次scan返回的資料量,以上述兩個變數作為引數呼叫getScannerCallable方法,該方法會返回一個ScannerCallableWithReplicas型別的物件callable,接著呼叫callable物件中的call方法向服務端發起一次rpc呼叫,呼叫路徑如下:
ScannerCallableWithReplicas.call->ScannerCallable.call
服務呼叫是通過構造一個ScanRequest型別的物件request,並將其發往服務端來實現的,核心程式碼如下:
request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
controller = controllerFactory.newController();
response = getStub().scan(controller, request);
這裡需要注意的是構造request時包裝了一個long型的變數nextCallSeq。 至此客戶端發起scan請求的流程結束,下面介紹服務端是如何處理這些scan請求,並與前面的知識建立起聯絡。 客戶端呼叫的getStub().scan向服務端發起了一次scan的rpc請求,服務端scan的實現在RSRpcServices中,首先其申請一個租約Lease.lease,用於客戶端和服務端之間的心跳連線,然後對照客戶端和服務端的nextCallSeq欄位(目的是保證客戶端順序得到所有資料而不漏),程式碼如下:
if (request.hasNextCallSeq()) {
          if (rsh == null) {
            rsh = scanners.get(scannerName);
          }
          if (rsh != null) {
            if (request.getNextCallSeq() != rsh.getNextCallSeq()) {
              throw new OutOfOrderScannerNextException(
                "Expected nextCallSeq: " + rsh.getNextCallSeq()
                + " But the nextCallSeq got from client: " + request.getNextCallSeq() +
                "; request=" + TextFormat.shortDebugString(request));
            }
            // Increment the nextCallSeq value which is the next expected from client.
            rsh.incNextCallSeq();  //保證客戶端順序得到所有資料不漏,Client和RS都維護一個nextCallSeq欄位
          }
}

RSRpcServices中包含一個ConcurrentHashMap<String, RegionScannerHolder>型別的變數scanners,String是region的名字,也就是說每一個region都租用一個RegionScanner。回到scan函式,引數request中可以獲得scannerName,憑藉scannerName從scanners中獲取對應的RegionScanner物件scanner。 接著從request中提取本次scan的資訊,如是否是small scan、reverse scan等等,根據這些資訊構造ScannerContext型別的物件scannerContext,以此為引數呼叫RegionScanner的nextRaw方法,這樣就與前面介紹的RegionScanner建立起聯絡,返回結果存放在List<KeyValue>型別的變數values中:
moreRows = scanner.nextRaw(values, scannerContext)

newRaw方法的呼叫路徑如下: nextRaw()  ->  RegionScanner.nextInternal() -> populateResult() 其中,在RegionScanner.nextInternal()會進行一些對stopRow/filterRow的檢查,populateResult函式開始迭代取資料,呼叫語句如下: populateResult(results, this.storeHeap, scannerContext, currentRow, offset, length); this.storeHeap是前面我們說的RegionScanner中維護的一個由StoreScanner組成的堆。populateResult的主要邏輯簡化如下:
do{
      heap.next(results, scannerContext);
                    
      nextKv= heap.peek();
      moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length);
} while (moreCellsInRow)

populateResult中真正返回資料呼叫的是heap的next方法,這裡還記得前面說的heap是由storescanner組成的堆,在next中用current變數記住當前正在處理的storescanner,然後呼叫next()函式返回了該storescanner中可能存在的結果。 到StoreScanner的next方法,StoreScanner維護著一個由StoreFileScanner/memstoreScanner構造的堆,next實際是從它的scanner堆中peek出一個StoreFileScanner或者是MemStoreScanner,然後呼叫next()取得資料,再將該scanner添加回佇列中。 在StoreScanner的next方法有下面一段程式碼需要注意:
LOOP: while((cell = this.heap.peek()) != null) {
      // Update and check the time limit based on the configured value of cellsPerTimeoutCheck
      if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
        scannerContext.updateTimeProgress();
        if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
          return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
        }
      }
      。。。。。。。。
}
這段程式碼維護了服務端和客戶端的一個心跳,是為了防止服務端scan到較大的資料時長時間沒有給客戶端返回響應,而造成客戶端誤以為服務端掛掉而產生超時錯誤。其中cellsPerHeartbeatCheck定義了心跳傳送的週期,該值由"hbase.cells.scanned.per.heartbeat.check"配置,預設是10000,表示的是每scan出10000個cell,則服務端向客戶端傳送一條心跳。

參考資料: