1. 程式人生 > >mqtt協議-broker之moqutte源碼研究五之UNSUBSCRIBE與DISCONN報文處理

mqtt協議-broker之moqutte源碼研究五之UNSUBSCRIBE與DISCONN報文處理

mqtt broker moquette

本文講解moquette對UNSUBSCRIBE和DISCONNECT的處理

先說UNSUBSCRIBE,代碼比較簡單

    public void processUnsubscribe(Channel channel, MqttUnsubscribeMessage msg) {
    List<String> topics = msg.payload().topics();
    String clientID = NettyUtils.clientID(channel);

    LOG.info("Processing UNSUBSCRIBE message. CId={}, topics={}", clientID, topics);

    ClientSession clientSession = m_sessionsStore.sessionForClient(clientID);
    for (String t : topics) {
        Topic topic = new Topic(t);
        boolean validTopic = topic.isValid();
        if (!validTopic) {
            // close the connection, not valid topicFilter is a protocol violation
            channel.close();
            LOG.error("Topic filter is not valid. CId={}, topics={}, badTopicFilter={}", clientID, topics, topic);
            return;
        }
        if(LOG.isDebugEnabled()){
            LOG.debug("Removing subscription. CId={}, topic={}", clientID, topic);
        }
        subscriptions.removeSubscription(topic, clientID);
        clientSession.unsubscribeFrom(topic);
        String username = NettyUtils.userName(channel);
        m_interceptor.notifyTopicUnsubscribed(topic.toString(), clientID, username);
    }

    // ack the client
    int messageID = msg.variableHeader().messageId();
    MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, AT_LEAST_ONCE, false, 0);
    MqttUnsubAckMessage ackMessage = new MqttUnsubAckMessage(fixedHeader, from(messageID));

    LOG.info("Sending UNSUBACK message. CId={}, topics={}, messageId={}", clientID, topics, messageID);
    channel.writeAndFlush(ackMessage);
}

主要分為以下幾步
1.從目錄樹下,移除該client的訂閱,這個移除過程有點復雜,後面單獨一篇專門講解topic樹
2.清除ClientSession裏面的訂閱,包括Set<Subscription> subscriptions,同時還得移除ISubscriptionsStore裏面的Map<Topic, Subscription> subscriptions
3.喚醒攔截器
4.返回UNSUBACK ,這裏註意UNSUBACK 是沒有payload的。

再說DISCONNECT的處理

public void processDisconnect(Channel channel) throws InterruptedException {
    final String clientID = NettyUtils.clientID(channel);
    LOG.info("Processing DISCONNECT message. CId={}", clientID);
    channel.flush();
    final ConnectionDescriptor existingDescriptor = this.connectionDescriptors.getConnection(clientID);
    if (existingDescriptor == null) {
        // another client with same ID removed the descriptor, we must exit
        channel.close();
        return;
    }

    if (existingDescriptor.doesNotUseChannel(channel)) {
        // another client saved it‘s descriptor, exit
        LOG.warn("Another client is using the connection descriptor. Closing connection. CId={}", clientID);
        existingDescriptor.abort();
        return;
    }

    if (!removeSubscriptions(existingDescriptor, clientID)) {
        LOG.warn("Unable to remove subscriptions. Closing connection. CId={}", clientID);
        existingDescriptor.abort();
        return;
    }

    if (!dropStoredMessages(existingDescriptor, clientID)) {
        LOG.warn("Unable to drop stored messages. Closing connection. CId={}", clientID);
        existingDescriptor.abort();
        return;
    }

    if (!cleanWillMessageAndNotifyInterceptor(existingDescriptor, clientID)) {
        LOG.warn("Unable to drop will message. Closing connection. CId={}", clientID);
        existingDescriptor.abort();
        return;
    }

    if (!existingDescriptor.close()) {
        LOG.info("The connection has been closed. CId={}", clientID);
        return;
    }

    boolean stillPresent = this.connectionDescriptors.removeConnection(existingDescriptor);
    if (!stillPresent) {
        // another descriptor was inserted
        LOG.warn("Another descriptor has been inserted. CId={}", clientID);
        return;
    }

    LOG.info("The DISCONNECT message has been processed. CId={}", clientID);
}

1.檢查連接描述符是否還存在,如果不存在,說明之前已經有客戶端刪除了它,直接關閉通道
2.判斷這個client的連接描述符是不是,是不是還是當前使用這個通道的client?作者要先防止這種情況呢?先賣個關子,後面的第6條會說明
3.清除訂閱請求,這裏面好像只清楚了不要求保存會話信息的clientsession裏面的ISessionsStore裏面的Map<Topic, Subscription> subscriptions,而並沒有清除ClientSession裏面的Set<Subscription> subscriptions和topic樹裏面的訂閱,這能夠解釋http://blog.51cto.com/13579730/2073914 這篇文章結尾討論的問題了,只有Map<Topic, Subscription> subscriptions的訂閱才是最準確的。

4.丟棄存儲的消息,這裏面也只是會丟棄不要去保存會話信息的消息
5.清除遺願消息,對於遺願消息,這裏稍微啰嗦一點,遺願消息是在初次連接的存儲到ProtocolProcessor的ConcurrentMap<String, WillMessage> m_willStore這裏面的,那麽什麽時候發送給訂閱者呢?看下面

    io.moquette.server.netty.NettyMQTTHandler#channelInactive
    @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    String clientID = NettyUtils.clientID(ctx.channel());
    if (clientID != null && !clientID.isEmpty()) {
        LOG.info("Notifying connection lost event. MqttClientId = {}.", clientID);
        m_processor.processConnectionLost(clientID, ctx.channel());
    }
    ctx.close();
}
    說明是當netty檢測到通道不活躍的時候通知ProtocolProcessor處理ConnectionLost事件的。
    public void processConnectionLost(String clientID, Channel channel) {
    LOG.info("Processing connection lost event. CId={}", clientID);
    ConnectionDescriptor oldConnDescr = new ConnectionDescriptor(clientID, channel, true);
    connectionDescriptors.removeConnection(oldConnDescr);//移除連接描述符
    // publish the Will message (if any) for the clientID
    if (m_willStore.containsKey(clientID)) {
        WillMessage will = m_willStore.get(clientID);
        forwardPublishWill(will, clientID);//發布遺願消息
        m_willStore.remove(clientID);//移除遺願消息存儲
    }

    String username = NettyUtils.userName(channel);
    m_interceptor.notifyClientConnectionLost(clientID, username);//喚醒攔截器
}
    在以下這種情況下會發布遺願消息
    遺囑消息發布的條件,包括但不限於:
    服務端檢測到了一個I/O錯誤或者網絡故障。
    客戶端在保持連接(Keep Alive)的時間內未能通訊。
    客戶端沒有先發送DISCONNECT報文直接關閉了網絡連接。
    由於協議錯誤服務端關閉了網絡連接。

    另外說明一下,遺願消息是可以設置消息等級的,而且可以被設置成retain消息

6.連接描述符集合裏面清除該通道對應的連接描述符,這裏有一點很容易誤解,強調一下

    boolean stillPresent = this.connectionDescriptors.removeConnection(existingDescriptor);
    if (!stillPresent) {
        // another descriptor was inserted
        LOG.warn("Another descriptor has been inserted. CId={}", clientID);
        return;
    }

    作者調用的是ConcurrentMap裏面的boolean remove(Object key, Object value);這個方法要求key存在,且value 與預期的一樣才會刪除,也就說,是有可能存在的,key一樣而value不一樣的情況的,什麽時候會出現?答案是client在兩個設備上先後登陸,這個時候由於是存在一個map裏面的所以後面的登陸所創建的連接描述符會覆蓋前面的一個。當然這裏面,也可以在覆蓋之前強制斷開之前那個連接,但是moquette並沒有這麽做,具體看源碼io.moquette.server.ConnectionDescriptorStore#addConnection

也就說說moquette是允許存在一個賬號多設備登陸的。將入client先後在A,B兩個設備上建立連接,B連接會覆蓋A連接,這個時候A連接雖然還在,但其實是永遠也收不到消息的,因為發送消息的時候,會以ConnectionDescriptorStore裏面存儲的為準,具體看源碼
io.moquette.server.ConnectionDescriptorStore#sendMessage,也就是說A連接會無謂的占用broker的資源,個人覺得這樣並不好,也非常沒有必要,大家可以自行改進。
現在大家就能夠理解上面的第2步了,因為這個就是為雙登陸的情況下,被覆蓋的那個連接準備的。

mqtt協議-broker之moqutte源碼研究五之UNSUBSCRIBE與DISCONN報文處理