MQTT---HiveMQ原始碼詳解(十二)Netty-MQTT訊息、事件處理(流程)
MQTT交流群:221405150
簡介
前面這些章節,講的基本上都是屬於netty對MQTT周邊的一些處理,由於MQTT協議總共目前可用的訊息型別有14個,如果再加上對應的事件處理載入一起那就估計大概有14*3個handler,如果每個來講一遍,難免有些枯燥,而且知識點會很分散,思考再三,想把整體的MQTT訊息以及對應的事件處理作為一節來介紹,我們只講它整體的實現思路、處理流程即可,這樣對需要自己寫broker的朋友的幫助應該是非常大的,這也符合最初寫此係列部落格的初衷。
熱身
一、Callback
1、分類
HiveMQ的Callback總體分為同步、非同步兩種callback,其中部分非同步callback被標記為lowlevel。
2、同步
可以看出同步的callback主要分為broker的callback、安全相關的callback、OnConnectCallback、OnPublishReceivedCallback、OnSubscribeCallback,這些回撥都是使用非同步執行緒呼叫。
broker在啟動和關閉時,會觸發OnBrokerStart和OnBrokerStop,使用者可在broker啟動的時候做一些自己的處理,例如資料庫連線池的建立,spring context的建立等等;在broker關閉時,可以關閉資料庫連線池等操作。
安全相關的主要包括Authentication、Authorization,主要是做連線認證和授權;可以寫第三方plugin去做Authentication和Authorization。
OnConnectCallback、OnPublishReceivedCallback、OnSubscribeCallback,使用者可以在client連線、client publish、client subscribe的時候做一些處理。
3、非同步
- 非同步callback主要包括一些mqtt訊息回撥、認證完成回撥等等,使用者可以根據自己的需求開發一些個性化外掛定製屬於自己的broker業務。
4、lowlevel
- lowlevel屬於非同步callback的一部分,都是mqtt訊息的回撥。
5、CallbackExecutor
- CallbackExecutor就是所有非同步呼叫callback處理的Executor,由hivemq統一調配;使用者可使用配置內部引數來控制其執行緒數;來保證broker的效能;CallbackExecutor由CallbackExecutorProvider建立提供。
三、Plugin*Handler
在netty handlers一覽中介紹了很多plugin*handler;這些handler都是監聽netty的使用者自定義event來對callback進行回撥
正戲
下來就通過mqtt的connect訊息的整個呼叫處理流程來示例一下mqtt訊息和事件處理。
流程較多,部分細節處理做了減免描述,為了讓大家更清晰地瞭解訊息的處理、事件的處理、以及其相互之間的觸發方式,大家如果自己寫broker,沒必要生搬硬套按照這樣的步驟去處理(只要按照mqtt標準/建議的處理流程去處理即可),目的是為了大家瞭解其處理機制提供一個思路而已。
總結
1、所有的mqtt訊息的handler與callbackhandler都是通過netty的自定義event來實現互動的,callbackhandler幾乎都是動態加入到pipeline中以減少記憶體的消耗。
2、所由callbackhandler都使用CallbackExecutor去非同步呼叫callback,並監聽對應完成的event來進行互動。
由上可以看出,所有流程都是採用非同步處理,同時限制Executor執行緒數來限制非同步同時處理過多,使用netty自定義event來達到相互的互動、以及客戶端(plugin、mqtt client)感知的同步。