mqtt協議-broker之moqutte源碼研究五之UNSUBSCRIBE與DISCONN報文處理
先說UNSUBSCRIBE,代碼比較簡單
public void processUnsubscribe(Channel channel, MqttUnsubscribeMessage msg) { List<String> topics = msg.payload().topics(); String clientID = NettyUtils.clientID(channel); LOG.info("Processing UNSUBSCRIBE message. CId={}, topics={}", clientID, topics); ClientSession clientSession = m_sessionsStore.sessionForClient(clientID); for (String t : topics) { Topic topic = new Topic(t); boolean validTopic = topic.isValid(); if (!validTopic) { // close the connection, not valid topicFilter is a protocol violation channel.close(); LOG.error("Topic filter is not valid. CId={}, topics={}, badTopicFilter={}", clientID, topics, topic); return; } if(LOG.isDebugEnabled()){ LOG.debug("Removing subscription. CId={}, topic={}", clientID, topic); } subscriptions.removeSubscription(topic, clientID); clientSession.unsubscribeFrom(topic); String username = NettyUtils.userName(channel); m_interceptor.notifyTopicUnsubscribed(topic.toString(), clientID, username); } // ack the client int messageID = msg.variableHeader().messageId(); MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, AT_LEAST_ONCE, false, 0); MqttUnsubAckMessage ackMessage = new MqttUnsubAckMessage(fixedHeader, from(messageID)); LOG.info("Sending UNSUBACK message. CId={}, topics={}, messageId={}", clientID, topics, messageID); channel.writeAndFlush(ackMessage); }
主要分為以下幾步
1.從目錄樹下,移除該client的訂閱,這個移除過程有點復雜,後面單獨一篇專門講解topic樹
2.清除ClientSession裏面的訂閱,包括Set<Subscription> subscriptions,同時還得移除ISubscriptionsStore裏面的Map<Topic, Subscription> subscriptions
3.喚醒攔截器
4.返回UNSUBACK ,這裏註意UNSUBACK 是沒有payload的。
再說DISCONNECT的處理
public void processDisconnect(Channel channel) throws InterruptedException { final String clientID = NettyUtils.clientID(channel); LOG.info("Processing DISCONNECT message. CId={}", clientID); channel.flush(); final ConnectionDescriptor existingDescriptor = this.connectionDescriptors.getConnection(clientID); if (existingDescriptor == null) { // another client with same ID removed the descriptor, we must exit channel.close(); return; } if (existingDescriptor.doesNotUseChannel(channel)) { // another client saved it‘s descriptor, exit LOG.warn("Another client is using the connection descriptor. Closing connection. CId={}", clientID); existingDescriptor.abort(); return; } if (!removeSubscriptions(existingDescriptor, clientID)) { LOG.warn("Unable to remove subscriptions. Closing connection. CId={}", clientID); existingDescriptor.abort(); return; } if (!dropStoredMessages(existingDescriptor, clientID)) { LOG.warn("Unable to drop stored messages. Closing connection. CId={}", clientID); existingDescriptor.abort(); return; } if (!cleanWillMessageAndNotifyInterceptor(existingDescriptor, clientID)) { LOG.warn("Unable to drop will message. Closing connection. CId={}", clientID); existingDescriptor.abort(); return; } if (!existingDescriptor.close()) { LOG.info("The connection has been closed. CId={}", clientID); return; } boolean stillPresent = this.connectionDescriptors.removeConnection(existingDescriptor); if (!stillPresent) { // another descriptor was inserted LOG.warn("Another descriptor has been inserted. CId={}", clientID); return; } LOG.info("The DISCONNECT message has been processed. CId={}", clientID); }
1.檢查連接描述符是否還存在,如果不存在,說明之前已經有客戶端刪除了它,直接關閉通道
2.判斷這個client的連接描述符是不是,是不是還是當前使用這個通道的client?作者要先防止這種情況呢?先賣個關子,後面的第6條會說明
3.清除訂閱請求,這裏面好像只清楚了不要求保存會話信息的clientsession裏面的ISessionsStore裏面的Map<Topic, Subscription> subscriptions,而並沒有清除ClientSession裏面的Set<Subscription> subscriptions和topic樹裏面的訂閱,這能夠解釋http://blog.51cto.com/13579730/2073914 這篇文章結尾討論的問題了,只有Map<Topic, Subscription> subscriptions的訂閱才是最準確的。
5.清除遺願消息,對於遺願消息,這裏稍微啰嗦一點,遺願消息是在初次連接的存儲到ProtocolProcessor的ConcurrentMap<String, WillMessage> m_willStore這裏面的,那麽什麽時候發送給訂閱者呢?看下面
io.moquette.server.netty.NettyMQTTHandler#channelInactive
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
String clientID = NettyUtils.clientID(ctx.channel());
if (clientID != null && !clientID.isEmpty()) {
LOG.info("Notifying connection lost event. MqttClientId = {}.", clientID);
m_processor.processConnectionLost(clientID, ctx.channel());
}
ctx.close();
}
說明是當netty檢測到通道不活躍的時候通知ProtocolProcessor處理ConnectionLost事件的。
public void processConnectionLost(String clientID, Channel channel) {
LOG.info("Processing connection lost event. CId={}", clientID);
ConnectionDescriptor oldConnDescr = new ConnectionDescriptor(clientID, channel, true);
connectionDescriptors.removeConnection(oldConnDescr);//移除連接描述符
// publish the Will message (if any) for the clientID
if (m_willStore.containsKey(clientID)) {
WillMessage will = m_willStore.get(clientID);
forwardPublishWill(will, clientID);//發布遺願消息
m_willStore.remove(clientID);//移除遺願消息存儲
}
String username = NettyUtils.userName(channel);
m_interceptor.notifyClientConnectionLost(clientID, username);//喚醒攔截器
}
在以下這種情況下會發布遺願消息
遺囑消息發布的條件,包括但不限於:
服務端檢測到了一個I/O錯誤或者網絡故障。
客戶端在保持連接(Keep Alive)的時間內未能通訊。
客戶端沒有先發送DISCONNECT報文直接關閉了網絡連接。
由於協議錯誤服務端關閉了網絡連接。
另外說明一下,遺願消息是可以設置消息等級的,而且可以被設置成retain消息
6.連接描述符集合裏面清除該通道對應的連接描述符,這裏有一點很容易誤解,強調一下
boolean stillPresent = this.connectionDescriptors.removeConnection(existingDescriptor);
if (!stillPresent) {
// another descriptor was inserted
LOG.warn("Another descriptor has been inserted. CId={}", clientID);
return;
}
作者調用的是ConcurrentMap裏面的boolean remove(Object key, Object value);這個方法要求key存在,且value 與預期的一樣才會刪除,也就說,是有可能存在的,key一樣而value不一樣的情況的,什麽時候會出現?答案是client在兩個設備上先後登陸,這個時候由於是存在一個map裏面的所以後面的登陸所創建的連接描述符會覆蓋前面的一個。當然這裏面,也可以在覆蓋之前強制斷開之前那個連接,但是moquette並沒有這麽做,具體看源碼io.moquette.server.ConnectionDescriptorStore#addConnection
也就說說moquette是允許存在一個賬號多設備登陸的。將入client先後在A,B兩個設備上建立連接,B連接會覆蓋A連接,這個時候A連接雖然還在,但其實是永遠也收不到消息的,因為發送消息的時候,會以ConnectionDescriptorStore裏面存儲的為準,具體看源碼
io.moquette.server.ConnectionDescriptorStore#sendMessage,也就是說A連接會無謂的占用broker的資源,個人覺得這樣並不好,也非常沒有必要,大家可以自行改進。
現在大家就能夠理解上面的第2步了,因為這個就是為雙登陸的情況下,被覆蓋的那個連接準備的。
mqtt協議-broker之moqutte源碼研究五之UNSUBSCRIBE與DISCONN報文處理