1. 程式人生 > >RocketMQ原理解析-Remoting1. 通訊層實現

RocketMQ原理解析-Remoting1. 通訊層實現

Rocketmq的通訊層是基於通訊框架netty 4.0.21.Final之上做了簡單的協議封裝,是強依賴。


一: NettyRemotingAbstract  Server與Client公用抽象類

ResponseFuture模式:

        invokeSyncImpl和invokeAsyncImpl都使用了

       請求方會new一個ResponseFuture物件快取起來ConcurrentHashMap<Integer/* opaque */, ResponseFuture>,並且設定opaque 值

       Broker接收請求將opaque 直接把這個值設定迴響應物件,客戶端接收到這個響應,通過opaque從快取查詢對應的ResponseFuture物件

       構建ResponseFuture,設定opaque值

        netty傳送請送並且設定監聽器回撥響應

          傳送成功設定ResponseFuture傳送成功,退出監聽器

         傳送失敗設定ResponseFuture傳送失敗,並且從快取中移除ResponseFuture(沒有響應過來,就用不到快取中的ResponseFuturel)

        responseFuture.waitResponse(timeoutMillis)獲取響應

 傳送成功,沒有響應物件說明超時

        非同步一般鏈路耗時比較長, 為了防止本地快取的netty請求過多, 使用訊號量控制上限預設2048個

        獲取是否可以處理請求

        構建一次釋放物件

        構建responseFuture物件,設定opaque, callback, once,超時時間等值,並放入快取集合

        通過netty傳送請求,設定listener, 

                   傳送成功responseFuture.setSendRequestOK(true);

                   傳送失敗responseFuture.setSendRequestOK(false), 訊號量通過once釋放, 刪除快取

         Netty接收server端響應,根據opaque從快取獲取responseFuture,呼叫回撥方法即介面InvokeCallback實現


        標記onewayRpc

        用訊號量控制併發的數 //這是我對在這裡用新號量控制的理解

由定時任務啟動,定時檢視超時的快取請求,有callback的執行callback,讓後從快取中移除再釋放請求

        根據請求code查詢對應的處理器執行緒池pair, 沒有用預設的

        有處理器處理請求返回RemotingCommand物件的響應response

        若不是onewayRpc 給response設定opaque

                       標記響應型別

                       通過netty寫入響應

 當client向server傳送請求的時候,server處理後向client反饋處理結果。

        根據RemotingCommand的opaque,從快取中取出對應的ResponseFuture

        ResponseFuture設定響應物件RemotingCommand

        responseFuture釋放訊號量

        有callback的執行callback(通過執行緒池), 沒有的putResponse(這個方法同步呼叫使用,來countDownLatch, 因為呼叫執行緒在等待呢)

二:NettyRemotingServer Remoting 服務端實現

broker啟動初始化NettyRometingServer

向netty註冊handler

 NettyEncoder協議編碼器,將RemotingCommand轉換為位元組,給netty傳輸

 NettyDecoder協議解碼器, 將netty接收的輸入流,轉換成RemotingCommand

NettyConnetManageHandler 處理register,unregiter, active, inactive, exception

NettyServerHandler  netty處理請求的業

向NettyRemotingServer註冊業務處理器

  server接收client請求根據RequestCode選擇具體的處理器RequestProcessor,就是利用RequestCode進行策略的選擇

  server(broker,namesrv)在啟動的時候會把RequestCode與對應的RequestProcessor和處理執行緒池註冊到NettyRemotingServer中去,程式碼類似如下:

         remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, SendMessageProcessor, sendMessageExecutor)

 

三:NetttyRemotingClient

向netty註冊handler

NettyEncoder協議編碼器,將RemotingCommand轉換為位元組,給netty傳輸

         NettyDecoder協議解碼器, 將netty接收的輸入流,轉換成RemotingCommand

NettyConnetManageHandler 處理register,unregiter, active, inactive, exception

NettyClientHandler   netty 處理請求的業

Client與通訊層的互動封了MQClientAPIImpl統一處理,在MQClientAPIImpl構造的時候註冊了ClientRemotingProcessor來處理server的請求