HBase之Table.put客戶端流程
首先,讓我們從HTable.put方法開始。由於這一節有很多方法只是簡單的引數傳遞,我就簡單略過,但是,關鍵的方法我還是會截圖講解,所以希望大家儘可能對照原始碼進行流程分析。另外,在這一節,我單單介紹put操作在客戶端的流程,畢竟,這個內容已經很多了。至於具體服務端的流程,我會在後面的章節中介紹到,歡迎大家到時候閱讀。
由於這一節的方法還是比較複雜的,我特地畫了一張思維導圖,大家可以先通過思維導圖來對本節的內容有一個大概的瞭解,置於具體的流程,我在下面將對照原始碼的貼圖一一為大家講解(在這裡宣告一點,我在這一節只介紹單個put操作的流程,至於put批處理,大家有興趣可以自己研究一下)。
首先,讓我們來到HTable.put方法,如下圖所示: 這裡我先講一下這一節的最後呼叫流程,也同時讓大家明確一下在本節我著重要講解的流程是哪塊。在上圖中我已經表示出來了,後面方法的呼叫最後呼叫到了上面新建立的ClientServiceCallable中覆寫的rpcCall方法,也就是呼叫到了ClientServiceCallable.doMutate。關於這個方法中具體與服務端的互動流程在本節我就略過,但是,在後面的內容中,我會談到類似的情況,如果大家感興趣的話,可以繼續後面的內容。
接下來讓我們回到本節的重點。首先是RpcRetryingCallerFactory.newCaller方法的呼叫,該方法使用RpcRetryingCallerFactory的成員引數建立了RpcRetryingCaller,用於後面對於RetryingCallable的呼叫(該方法在後面也會多次呼叫,在後面我就不貼圖了)。
接下來讓我們來到RpcRetryingCallerImpl.callWithRetries。這個方法是本節中最為重要的方法,在後面也會多次用到。方法雖然比較長,但大多是異常的情況的解決,在本節中我們就單單介紹callable.prepare與callable.call兩個方法。至於interceptor.intercept,由於在構造RpcRetryingCallerFactory時預設的interceptor型別為RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR,在本節並不會有其它影響,所以我們暫時不需要關注。
上面的方法呼叫的callable具體型別為覆寫了rpcCall方法的ClientServiceCallable,下面讓我們來到ClientServiceCallable類的內部。ClientServiceCallable繼承自RegionServerCallable,因此,上面方法實際呼叫的是RegionServerCallable.prepare與RegionServerCallable.call。
首先讓我們來到RegionServerCallable.prepare方法。這裡比較重要的方法我已經框選出來了。需要大家特別留意的是最後的setStubByServiceName,一是因為他比較重要,二是我在後面的內容才會介紹,大家到時候可能忘記了,所以在這裡特別提醒一下大家。
容易看到,首先呼叫了connection.getRegionLocator獲得一個新構建的HRegionLocator(這裡就不截圖了,因為實在是沒有什麼內容需要講),不過大家需要注意的是,這裡的tableName是我們實際要查詢到tableName,而後面會用到META_TABLE_NAME,容易混淆,我在這裡簡單提一下。接下來呼叫了HRegionLocator.getRegionLocation。 在呼叫HRegionLocator.getRegionLocation時,這裡會有一系列簡單方法的呼叫,由於在上面的導圖中我並沒有畫出,在這裡我就一一貼圖描述。
一系列方法走下來,到這裡就到了比較重要的方法。由於這個是長圖,沒有辦法框選除重點,我就在文字中一一介紹該方法中呼叫的比較好重要的方法。
1.getCachedLocation,該方法簡介呼叫到了metaCache.getCachedLocation,但此時,由於我們是第一次呼叫該表的資訊,並沒有放到快取中,因此,這裡返回的locations = null。
2.然後我們來到RegionInfo.createRegionName,需要注意的是,其入參row就是我們put操作建立的rowKey,也就是我們常說的行鍵。另外,在metaStartKey中傳入的id為HConstants.NINES(NINES = "99999999999999"),而在metaStopKey中傳入的id為空字串。
3.接著構造了Scan。其中withStartRow與withStopRow中的inclusive入參都為true。將reversed設定為true,並且將catalog family設定為"info"(CATALOG_FAMILY_STR = "info")。大家可能注意到了,這裡的info列族在我們的表中並不一定存在。到了這裡,大家可能就猜到我在前面埋的伏筆了。沒錯,這裡構建的Scan是為了後面的查詢後面的META_TABLE_NAME做準備。
4.緊接著來到fro迴圈中,這裡連著呼叫了兩次getCachedLocation,後面的那次呼叫加了鎖,類似我們在單例設計模式中流程,加鎖以確保物件不會重複。
5.然後構建了ReversedClientScanner物件。(鑑於之前經驗,貼太多圖容易擾亂大家的思維,我在這裡儘量用文字來介紹)。ReversedClientScanner是ClientScanner的子類,另外,大家需要注意的是,在構造ReversedClientScanner時傳入的tableName為TableName.META_TABLE_NAME。在ReversedClientScanner的構造過程中,雖然有一些需要注意的地方,不過,我還是放在後面來描述,以便大家能夠更好的理解整個流程。
6.接下來呼叫了ReversedClientScanner.next,大家千萬不要小看這個方法,這個方法裡面的一系列呼叫時非常複雜的,也是本節的另外一個重點,我將在後面詳細介紹。
7.然後呼叫了MetaTableAccessor.getRegionLocations,其入參為ReversedClientScanner.next的返回值。這個方法的詳細流程也比較重要,同樣,我放到後面為大家講解。
8.最後呼叫了cacheLocation,也就是將當前tableName放到快取中。 上面,我將ConnectionImplementation.locateRegionInMeta方法中呼叫的各個流程都簡單介紹了一下,下面,我就選擇其中比較重要的方法來詳細描述。
首先讓我們來到ReversedClientScanner.next。這個方法呼叫了ClientScanner.nextWithSyncCache,如下圖所示: 上圖框選的兩個方法都比較重要,讓我們首先介紹比較複雜的loadCache,如下圖所示。
看到這個方法大家可能比較慌,沒有關係,我會在這裡為大家一一介紹。
1.首先呼叫了moveToNextRegion。該方法首先呼叫closeScanner(其間首先呼叫了成員變數callable.setClose方法,然後呼叫了ClientScanner.call方法,這個方法我在後面也會提到,最後將當前成員變數callable中的值置為null,簡而言之,將成員變數callable.setClose置為null)。
然後構造了ScannerCallableWithReplicas並賦給成員變數callable。在構造ScannerCallableWithReplicas時需要注意的是其中建立了ReversedScannerCallable。也就是說ScannerCallableWithReplicas的成員變數currentScannerCallable為ReversedScannerCallable。順便提一下,ScannerCallableWithReplicas的成員變數scan為我們在上面構造的scan。
2.接著呼叫了ClientScanner.call方法。這裡的呼叫流程比較繁瑣。為了更清楚的解釋清楚loadCache方法,我們先跳過這裡,假設其中已經有了返回值。
3.然後呼叫了scanResultCache.addAndGet。簡單提示一下我們這裡的scanResultCache型別為CompleteScanResultCache。
4.然後將結果集中的內容遍歷放到成員變數cache中。這裡我們可以回過頭來看看上面的圖。上面圖中我框選了cache.poll方法。也就是說cache.poll將在loadCache方法中放入的結果集取出來。
上面我提到過很多次ClientScanner.call方法,但是都沒有詳細描述,下面我就特意來講解該方法。其實這個方法很簡單,只是呼叫了方法RpcRetryingCaller.callWithoutRetries。這裡的caller是在ReversedClientScanner方法構造時建立的(上面只是提到說構造ReversedClientScanner有需要注意的地方,也就是這裡,其截圖我在上面也已經貼出來了)。
接下來讓我們來到RpcRetryingCallerImpl.callWithoutRetries。這裡的入參callable我在上面的方法loadCache已經介紹過了。其型別為ScannerCallableWithReplicas。由於ScannerCallableWithReplicas.prepare方法為空實現,我在這裡就不貼圖了,接下來將重點放在ScannerCallableWithReplicas.call。
讓我們來到ScannerCallableWithReplicas.call,如下圖所示。
1.在ClientScanner.closeScanner方法呼叫時,會走上面的if判斷。由於currentScannerCallable.closed的值為true。
2.由於預設的成員變數regionReplication,因此會呼叫RpcRetryingCallerWithReadReplicas.getRegionLocations。這個方法的呼叫與我們今天的主要流程並沒有什麼太多的聯絡,因此,在這裡簡單略過。該方法我可能會放在後面的章節中講到。
3.構造了ResultBoundedCompletionService。這個方法比較重要,在後面的流程中我會反覆講到。
4.呼叫了addCallsForCurrentReplica,將成員變數currentScannerCallable封裝到ScannerCallableWithReplicas.RetryinRPC/">gRPC中,並交由ResultBoundedCompletionService提交。
5.接著呼叫cs.poll,獲取其提交的任務的返回值。
後面我將詳細講解。 首先來到ScannerCallableWithReplicas.addCallsForCurrentReplica方法。容易看到,將成員變數currentScannerCallable封裝到RetryingRPC中。然後呼叫了ResultBoundedCompletionService.submit。這裡著重提醒一下大家,這裡的currentScannerCallable型別為ReversedScannerCallable。
接著讓我們來到ResultBoundedCompletionService.submit,如下圖所示。
這裡將傳入的RetryingRPC封裝到QueueingFuture,然後呼叫了executor.execute。由於QueueingFuture繼承自java.util.concurrent.RunnableFuture,也就是在呼叫executor.execute時,QueueingFuture.run方法會執行。 接下來讓我們來到QueueingFuture。在下圖中,我框選出了其中比較重要的方法。
首先這裡呼叫了RpcRetryingCallerImpl.callWithRetries方法(由於這個方法我在上面已經提到過了,因此在這裡就不貼圖了)。重要的是其中的入參future型別為ScannerCallableWithReplicas.RetryingRPC。另外後面將當前QueueingFuture新增到ResultBoundedCompletionService成員變數completedTasks中。 讓我們來到ScannerCallableWithReplicas.RetryingRPC.prepare方法。如下圖所示。大家可能對這裡的成員變數callable比較模糊了,大家可以往上翻到方法addCallsForCurrentReplica的描述,沒錯這裡的callable就是ScannerCallableWithReplicas的成員變數currentScannerCallable。而ScannerCallableWithReplicas.currentScannerCallable正是在構造ScannerCallableWithReplicas時傳入的ReversedScannerCallable。
接下來讓我們來到ReversedScannerCallable.prepare。由於這是第一次呼叫prepare方法,因此其成員變數instantiated為false。這裡簡單提一下,這裡的getRow方法獲取的是我們呼叫put時的行鍵,也就是我們對於目標表的rowKey。由於這裡的tableName為TableName.META_TABLE_NAME,其rowKey在後面並沒有用到。
然後呼叫了ReversedScannerCallable.setStub方法。為成員變數stub的賦值。其值為getConnection().getClient(getLocation().getServerName())呼叫的返回值。 讓我們來到ConnectionImplementation.getClient方法。看過我博文《HBase之HRegionServer啟動(含與HMaster互動)》的同學看到這裡可能就比較熟悉。 沒錯,這裡正是通過ClientProtos.ClientService.newBlockingStub構造了協議ClientProtos.ClientService的客戶端stub。關於與服務端互動的流程,我在《HBase之HRegionServer啟動(含與HMaster互動)》中已經具體介紹了,大家感興趣的可以去看一下,我們這裡來描述比較重要一個點。
就是computeIfAbsentEx的最後一個入參IOExceptionSupplier。他類似於java中的Supplier(類似的方法呼叫我在後面講解方法MetaTableAccessor.getRegionLocations)。 在第一次呼叫時,我們的stubs中並沒有到該serverName的客戶端stub,因此呼叫了入參supplier的get方法。也就是我們上面看到的lambda表示式方法內容被呼叫。
到這裡,ReversedScannerCallable.prepare方法就呼叫完成了。這個還有一個需要注意的點就是ReversedScannerCallable.prepare方法的最後將其成員變數instantiated置為true。
接下來讓我們來到ScannerCallableWithReplicas.RetryingRPC.call方法(這裡的callable型別為ReversedScannerCallable)。
這裡再次呼叫了RpcRetryingCallerImpl.callWithRetries,由於ReversedScannerCallable.prepare方法已經呼叫,並且其成員變數instantiated被置為true,所以上面描述的內容並不會再次呼叫(這裡框選的內容作為後面的伏筆)。 也就是說,接下來應該呼叫的是ReversedScannerCallable.call。由於其並沒有call方法,因此,會一直呼叫到其父類RegionServerCallable.call。如下圖所示。這裡的rpcController型別為HBaseRpcControllerImpl。接下來呼叫了rpcCall方法。由於ReversedScannerCallable中並沒有rpcCall方法的實現,而在其父類ScannerCallable有實現rpcCall。
接下來,讓我們來到ScannerCallable.rpcCall。由於預設的成員變數scannerId為-1,因此,會呼叫openScanner。由於openScanner方法僅僅是通過Client協議傳送到服務端。關於rpc流程我在部落格《hbase之RPC呼叫流程簡介》中已經介紹過了,感興趣的同學可以去看一下,那篇博文講的比較淺顯,我會在春節期間將那篇內容更新,大家可以關注我,到時候有更新大家也就收到通知了。
然後呼叫了ResponseConverter.getResults,將服務端的返回的ScanResponse轉換為Result。 讓我們來到ResponseConverter.getResults。這個方法的主要作用是將CellScanner中Cell的或ScanResponse中的PB型別的results轉換為java型別的Result。至於該方法的詳細描述我要放到後面開設的第二章節,也就是HBase中客戶端協議各個操作中來講解,因為這裡流程是比較複雜的,要結合上服務端的流程才能講述清楚。所以這裡暫時略過。
到這裡,一個完整的RpcRetryingCallerImpl.callWithRetries方法呼叫流程可以說是完結了。然後在ResultBoundedCompletionService.QueueingFuture.run方法的後面,將當前QueueingFuture新增到ResultBoundedCompletionService成員變數completedTasks中(雖然我在上面提到過,但這裡還是重述一下,以便我們後面的理解)。
而在我們本節描述的整體流程中,ScannerCallableWithReplicas.addCallsForCurrentReplica方法呼叫完結。
接下來讓我們來到ResultBoundedCompletionService.poll,由於其間接呼叫了ResultBoundedCompletionService.pollForSpecificCompletedTask,如下圖所由於在QueueingFuture.run方法的最後,將自身新增到了completedTasks。因此,上面的方法獲取的正是剛剛新增的QueueingFuture。接著呼叫了ResultBoundedCompletionService.QueueingFuture.get方法。如下圖所示。也就是說,這裡將result返回。這裡result的型別我們需要注意一下,以便後面在型別上面的理解。由於這裡QueueingFuture成員變數future的實際型別為ScannerCallableWithReplicas.RetryingRPC。大家可以往上翻到ScannerCallableWithReplicas.RetryingRPC.call,就可以發現,這裡的result是從ResponseConverter.getResults獲得的Result陣列與成員變數callable封裝後的Pair物件。接著,將r.getFirst(),也就是實際獲得的結果返回。
到這裡,大家可能以為要結束了,很遺憾,這裡只是到了ClientScanner.call方法的返回。
由於接下來的是兩個單獨的流程了。一個是MetaTableAccessor.getRegionLocations,另外一個是ConnectionImplementation.cacheLocation。至於這兩個流程之外的後續流程比較簡單,我就不一一敘述了,相信大家跟著原始碼與我在前面的提示很容易就可以弄清楚了。而前面提到的那兩個單獨的流程我將放在後面的一節《HBase之Table.put客戶端流程(續)》中介紹。到時候歡迎大家閱讀。
大家可以關注我的部落格,或者傳送郵件到我的郵箱[email protected]來溝通交流大資料相關的知識。感謝大家的閱讀,如果覺得不錯,希望您可以點選下面的推薦。