1. 程式人生 > >MQTT---HiveMQ原始碼詳解(十二)Netty-MQTT訊息、事件處理(流程)

MQTT---HiveMQ原始碼詳解(十二)Netty-MQTT訊息、事件處理(流程)

MQTT交流群:221405150

簡介

前面這些章節,講的基本上都是屬於netty對MQTT周邊的一些處理,由於MQTT協議總共目前可用的訊息型別有14個,如果再加上對應的事件處理載入一起那就估計大概有14*3個handler,如果每個來講一遍,難免有些枯燥,而且知識點會很分散,思考再三,想把整體的MQTT訊息以及對應的事件處理作為一節來介紹,我們只講它整體的實現思路、處理流程即可,這樣對需要自己寫broker的朋友的幫助應該是非常大的,這也符合最初寫此係列部落格的初衷。

熱身

一、Callback

callback

1、分類

HiveMQ的Callback總體分為同步、非同步兩種callback,其中部分非同步callback被標記為lowlevel。

2、同步

可以看出同步的callback主要分為broker的callback、安全相關的callback、OnConnectCallback、OnPublishReceivedCallback、OnSubscribeCallback,這些回撥都是使用非同步執行緒呼叫。

  1. broker在啟動和關閉時,會觸發OnBrokerStart和OnBrokerStop,使用者可在broker啟動的時候做一些自己的處理,例如資料庫連線池的建立,spring context的建立等等;在broker關閉時,可以關閉資料庫連線池等操作。

  2. 安全相關的主要包括Authentication、Authorization,主要是做連線認證和授權;可以寫第三方plugin去做Authentication和Authorization。

  3. OnConnectCallback、OnPublishReceivedCallback、OnSubscribeCallback,使用者可以在client連線、client publish、client subscribe的時候做一些處理。

3、非同步

  1. 非同步callback主要包括一些mqtt訊息回撥、認證完成回撥等等,使用者可以根據自己的需求開發一些個性化外掛定製屬於自己的broker業務。

4、lowlevel

  1. lowlevel屬於非同步callback的一部分,都是mqtt訊息的回撥。

5、CallbackExecutor

  1. CallbackExecutor就是所有非同步呼叫callback處理的Executor,由hivemq統一調配;使用者可使用配置內部引數來控制其執行緒數;來保證broker的效能;CallbackExecutor由CallbackExecutorProvider建立提供。

三、Plugin*Handler

在netty handlers一覽中介紹了很多plugin*handler;這些handler都是監聽netty的使用者自定義event來對callback進行回撥

正戲

下來就通過mqtt的connect訊息的整個呼叫處理流程來示例一下mqtt訊息和事件處理。

Created with Raphaël 2.1.0MqttConnectHandlerMqttConnectHandlerMqttConnectHandlerMqttConnectHandlerPluginOnAuthenticationCallbackHandlerPluginOnAuthenticationCallbackHandlerPluginOnAuthenticationCallbackHandlerPluginOnAuthenticationCallbackHandlerPluginAfterLoginCallbackHandlerPluginAfterLoginCallbackHandlerPluginRestrictionsCallbackHandlerPluginRestrictionsCallbackHandlerPluginRestrictionsCallbackHandlerPluginRestrictionsCallbackHandlerPluginOnConnectCallbackHandlerPluginOnConnectCallbackHandlerChannelPersistenceChannelPersistenceMqttConnectPersistenceHandlerMqttConnectPersistenceHandler當接受到connect訊息時為pipeline新增MqttDisallowSecondConnect(請檢視協議 MQTT-3.1.0-2)驗證clientid長度是否符合配置,否則傳送ConnAck(REFUSED_IDENTIFIER_REJECTED)到client端刪除IdleStateHandler和NoConnectIdleEventHandler(連線建立後,必須在使用者配置時間內傳送connect訊息)觸發PluginOnAuthentication事件,讓其呼叫callback進行認證非同步遍歷所有OnAuthenticationCallback讓其認證,每一個callback認證完成會觸發一個PluginOnAuthenticationCallbackCompleted事件接收到PluginOnAuthenticationCallbackCompleted,根據使用者的外掛認證配置決定下一步處理當認證完成後會觸發PluginOnAuthenticationCompleted根據client端是否存在LWT,做LWT處理(此處不做過多描述,主要目的是描述流程)為pipeline新增:PluginAfterLoginCallbackHandler,做認證完成回撥處理觸發PluginAfterLogin事件,讓其呼叫callback進行認證完成結果的通知若認證不通過則傳送ConAck(OnAuthenticationCallback返回的return code)到客戶端新增PluginRestrictionsCallbackHandler,為客戶端進行授權。觸發PluginRestrictionsAfterLogin事件,讓其遍歷呼叫RestrictionsAfterLoginCallback,讓每個callback對客戶端進行授權當每個授權都完成後,觸發PluginRestrictionsAfterLoginCompleted,將授權資訊進行回傳為pipeline新增:PluginOnConnectCallbackHandler,讓其遍歷所有callback進行連線通知當所有OnConnectCallback回撥完成,觸發PluginOnConnectCompleted事件處理掉與當前clientid一樣的連線儲存連線新增closeFuture,處理客戶端斷線若客戶端持久session,則觸發持久session事件,讓MqttConnectPersistenceHandler處理持久session處理採集(統計)當前線上連線數增加新增closeFuture,採集(統計)當前線上連線數減少

流程較多,部分細節處理做了減免描述,為了讓大家更清晰地瞭解訊息的處理、事件的處理、以及其相互之間的觸發方式,大家如果自己寫broker,沒必要生搬硬套按照這樣的步驟去處理(只要按照mqtt標準/建議的處理流程去處理即可),目的是為了大家瞭解其處理機制提供一個思路而已。

總結

1、所有的mqtt訊息的handler與callbackhandler都是通過netty的自定義event來實現互動的,callbackhandler幾乎都是動態加入到pipeline中以減少記憶體的消耗。

2、所由callbackhandler都使用CallbackExecutor去非同步呼叫callback,並監聽對應完成的event來進行互動。

由上可以看出,所有流程都是採用非同步處理,同時限制Executor執行緒數來限制非同步同時處理過多,使用netty自定義event來達到相互的互動、以及客戶端(plugin、mqtt client)感知的同步。