1. 程式人生 > >hbase之createTable完整的netty實現執行流程

hbase之createTable完整的netty實現執行流程

hbase的客戶端程式碼並不想hive一樣用java編寫,shell呼叫,而是使用ruby編寫。 在admin.rb檔案中方法create,其中接受兩個引數,其中第二個引數型別為變長引數。 而在create方法的最後,呼叫了admin.createTable,其中的admin是hbaes.rb初始化時通過呼叫java程式碼ConnectionFactory.createConnection建立的connection呼叫getAdmin而獲得的。   下面簡單分析一下ConnectionFactory.createConnection流程。 預設的hbase.client.connection.impl實現類ConnectionImplementation.class,因此,該方法其實就相當於初始化了ConnectionImplementation。而在ConnectionImplementation中,最主要的構建了型別為NettyRpcClient的rpc客戶端。   接著,根據原始碼我們可以發現,然後呼叫了HBaseAdmin.createTable。在該方法呼叫的時候,有一些hbase通用的架構,我們接下來一一道來。   首先,呼叫createTableAsync方法,其中構建了一個MasterCallable型別的匿名物件,其複寫的rpcCall方法真正的呼叫了客戶端方法。 接下來呼叫executeCallable方法,然後構建RpcRetryingCaller物件,並呼叫該物件的callWithRetries方法。在其唯一實現RpcRetryingCallerImpl中我們可以看到 首先呼叫了傳入的callable.prepare方法。由於此時我們分析的callable型別為MasterCallable,因此,我們可以追蹤到MasterCallable.prepare方法。在這裡,呼叫了ConnectionImplementation.getMaster方法。接著呼叫了ConnectionImplementation.getKeepAliveMasterService。接下來,返回rpc呼叫的本地stub。 然後呼叫了callable.call方法,而這個call方法最後恰恰呼叫了匿名物件複寫的rpcCall方法。也就是說,他呼叫了本地stub的createTable方法。 而接下來的呼叫流程就正如我在上篇博文中所講的。會呼叫BlockingRpcChannelImplementation.callBlockingMethod,AbstractRpcClient.callBlockingMethod,AbstractRpcClient.callMethod,NettyRpcConnection.sendRequest,HBaseRpcControllerImpl.notifyOnCancel等一系列方法。   在NettyRpcConnection.sendRequest方法中,我們將著重進行分析。以糾正之前所犯的錯誤。   在這裡首先執行了connect方法,如下圖所示,我們可以發現,這裡添加了一個ChannelFutureListener。 通過operationComplete裡面的established方法,我們可以看到,通道的pipeline中添加了NettyRpcDuplexHandler。
  然後執行了write方法,ch.writeAndFlush,學過Netty大家都清楚,下一步就會呼叫剛剛加入的NettyRpcDuplexHandler.write方法。然後就呼叫該方法,向服務端傳送資訊。   接著,等待服務端的返回。 服務端接收到客戶端後(具體流程可以參考我的上一篇博文[Hbase之rpc呼叫流程簡介]),將響應返回。 並呼叫下圖所示的readResponse方法。     (而在此之前,在AbstractRpcClient.callBlockingMethod的方法中BlockingRpcCallback.get方法已經開始呼叫this.wait()。 在BlockingRpcConnection.run方法中,會呼叫readResponse。(在客戶端的實現為BlockingRpcConnection,才會呼叫。)。 而我們都知道,在呼叫的實際過程中,hbase的預設客戶端實現是NettyRpcConnection。 而在readResponse方法中,類似hadoop中rpc的阻塞一樣,呼叫in.readInt,也就是說等待到服務端的返回後,該方法會繼續向下執行。一直到call.setResponse,接著就是call.callComplete,)   在readResponse方法的最後,我們可以看到呼叫了call.setResponse,接著就是callComplete, callback.run,而這裡的callback恰恰就是下圖中的匿名物件。
  接著呢,就是呼叫BlockingRpcCallback.run方法。呼叫this.notify.。然後將AbstractRpcClient.callBlockingMethod中的阻塞開啟,獲得server端的返回值。   當然,這只是獲得了createTable的服務端返回值。接下來會建立CreateTableFuture物件,其中封裝了剛剛獲得的服務端返回值。 而接下來會繼續呼叫到ProcedureFuture.get(long timeout, TimeUnit unit)方法。在該方法內部,會繼續呼叫waitProcedureResult,getProcedureResult等等一系列方法。其流程與上面所敘述的大體一致,我們就不在這裡一一介紹了。所不同的是,這裡呼叫的方法是getProcedureResult。   hbase createTable的流程答題時這樣,如果感覺對你的理解有幫助,歡迎你的讚賞,如果解答不了你的疑問,可以傳送郵件至
[email protected]
,期待你的來信。 你的讚賞是我前進的動力。