1. 程式人生 > >moquette源碼分析之七--qos1和qos2消息的處理

moquette源碼分析之七--qos1和qos2消息的處理

mqtt broker moquette 源碼

首先解釋一下mqtt協議的session的概念,因為只有有了session才會存在消息質量保證一說

如果清理會話(CleanSession)標誌被設置為0,服務端必須基於當前會話(使用客戶端標識符識別)的狀態恢復與客戶端的通信。如果沒有與這個客戶端標識符關聯的會話,服務端必須創建一個新的會話。在連接斷開之後,當連接斷開後,客戶端和服務端必須保存會話信息 [MQTT-3.1.2-4]。當清理會話標誌為0的會話連接斷開之後,服務端必須將之後的QoS 1和QoS 2級別的消息保存為會話狀態的一部分,如果這些消息匹配斷開連接時客戶端的任何訂閱 [MQTT-3.1.2-5]。服務端也可以保存滿足相同條件的QoS 0級別的消息。

如果清理會話(CleanSession)標誌被設置為1,客戶端和服務端必須丟棄之前的任何會話並開始一個新的會話。會話僅持續和網絡連接同樣長的時間。與這個會話關聯的狀態數據不能被任何之後的會話重用 [MQTT-3.1.2-6]。
客戶端的會話狀態包括:
● 已經發送給服務端,但是還沒有完成確認的QoS 1和QoS 2級別的消息
● 已從服務端接收,但是還沒有完成確認的QoS 2級別的消息。
服務端的會話狀態包括:
● 會話是否存在,即使會話狀態的其它部分都是空。
● 客戶端的訂閱信息。
● 已經發送給客戶端,但是還沒有完成確認的QoS 1和QoS 2級別的消息。
● 即將傳輸給客戶端的QoS 1和QoS 2級別的消息。
● 已從客戶端接收,但是還沒有完成確認的QoS 2級別的消息。
● 可選,準備發送給客戶端的QoS 0級別的消息。
保留消息不是服務端會話狀態的一部分,會話終止時不能刪除保留消息 [MQTT-3.1.2.7]。
有關狀態存儲的限制和細節見第 4.1節。
當清理會話標誌被設置為1時,客戶端和服務端的狀態刪除不需要是原子操作。
非規範評註
為了確保在發生故障時狀態的一致性,客戶端應該使用會話狀態標誌1重復請求連接,直到連接成功。
非規範評註
一般來說,客戶端連接時總是將清理會話標誌設置為0或1,並且不交替使用兩種值。這個選擇取決於具體的應用。清理會話標誌設置為1的客戶端不會收到舊的應用消息,而且在每次連接成功後都需要重新訂閱任何相關的主題。清理會話標誌設置為0的客戶端會收到所有在它連接斷開期間發布的QoS 1和QoS 2級別的消息。因此,要確保不丟失連接斷開期間的消息,需要使用QoS 1或 QoS 2級別,同時將清理會話標誌設置為0。
非規範評註
清理會話標誌0的客戶端連接時,它請求服務端在連接斷開後保留它的MQTT會話狀態。如果打算在之後的某個時間點重連到這個服務端,客戶端連接應該只使用清理會話標誌0。當客戶端決定之後不再使用這個會話時,應該將清理會話標誌設置為1最後再連接一次,然後斷開連接。
一。mqtt協議支持三種消息等級,分別是:

1.0--AT_MOST_ONCE,至多一次
2.1--AT_LEAST_ONCE,至少一次
3.2--EXACTLY_ONCE,有且僅有一次

二。其實AMQP協議也支持這三種。我們來看一下mqtt對於這三種消息的實現方式,對於理解AMQP的消息等級也有幫助

AT_MOST_ONCE比較簡單,這裏不說了,只說後兩種

先看消息等級為1,moquette是怎麽實現的。
先上一張圖,便於理解,

技術分享圖片

moquette對於session的實現在這裏:這裏以session存在在內存的實現來講解
io.moquette.persistence.MemorySessionStore

    class Session {
    final String clientID;
    final ClientSession clientSession;
    final Map<Topic, Subscription> subscriptions = new ConcurrentHashMap<>();
    final AtomicReference<PersistentSession> persistentSession = new AtomicReference<>(null);
    final BlockingQueue<StoredMessage> queue = new ArrayBlockingQueue<>(Constants.MAX_MESSAGE_QUEUE);
    final Map<Integer, StoredMessage> secondPhaseStore = new ConcurrentHashMap<>();
    final Map<Integer, StoredMessage> outboundFlightMessages =
            Collections.synchronizedMap(new HashMap<Integer, StoredMessage>());
    final Map<Integer, StoredMessage> inboundFlightMessages = new ConcurrentHashMap<>();

    Session(String clientID, ClientSession clientSession) {
        this.clientID = clientID;
        this.clientSession = clientSession;
    }
}

    對於broker分發的qos1消息,在沒有收到ack消息之前是存儲在outboundFlightMessages裏面的。
    結合上面的流程圖,很好理解了
    這裏補充說明一點,moquette的實現並沒有對以發送出去,而沒有收到ack消息做處理,按理說這是session的一部分,如果客戶端要求保持會話,客戶端斷線重連之後,是應該進行重發的,當然大家也可以自己實現。

再看消息等級為2,看下面的圖
技術分享圖片

結合上面的圖就很好理解了,這裏註意,moquette 其實在分發消息之前,是應該先從nflightBound..remove(messageId_1)的,但是它沒有這麽做,這其實是個bug,大家如果沒用到cleanSession和qos2消息還好,用到了就有可能存在內存泄露

三。末尾說一下mqtt的重發機制,

mqtt- broker只會在下面一種情況下重發消息:
必須滿足兩個條件:
1.客戶端要求broker保持會話,就是說qos1和qos2消息是會話的一部分。(上一次連接的時候要求保存會話)
2.客戶端重連成功,就是說連接正常能夠通信,也就是說broker對於qos1和qos2消息只做支持,不做保證(tcp連接正常,四層傳輸正常)
因為它沒法做保證,網絡不通,機器斷電,磁盤損害等。qos1和qos2只在連接正常的前提之下做出保證。

moquette源碼分析之七--qos1和qos2消息的處理