1. 程式人生 > >mqtt協議-broker之moqutte源碼研究二之SUBSCRIBE報文處理

mqtt協議-broker之moqutte源碼研究二之SUBSCRIBE報文處理

mqtt moquette broker 源碼

這一篇開始講解moqutte對SUBSCRIBE報文的處理

代碼不復雜
public void processSubscribe(Channel channel, MqttSubscribeMessage msg) {
String clientID = NettyUtils.clientID(channel);//從channel裏面獲取clientId,具體原理看下文
int messageID = messageId(msg);
LOG.info("Processing SUBSCRIBE message. CId={}, messageId={}", clientID, messageID);

    RunningSubscription executionKey = new RunningSubscription(clientID, messageID);
    SubscriptionState currentStatus = subscriptionInCourse.putIfAbsent(executionKey, SubscriptionState.VERIFIED);
    if (currentStatus != null) {
        LOG.warn("Client sent another SUBSCRIBE message while this one was being processed CId={}, messageId={}",
            clientID, messageID);
        return;
    }
    String username = NettyUtils.userName(channel);
    List<MqttTopicSubscription> ackTopics = doVerify(clientID, username, msg);
    MqttSubAckMessage ackMessage = doAckMessageFromValidateFilters(ackTopics, messageID);
    if (!this.subscriptionInCourse.replace(executionKey, SubscriptionState.VERIFIED, SubscriptionState.STORED)) {
        LOG.warn("Client sent another SUBSCRIBE message while the topic filters were being verified CId={}, " +
            "messageId={}", clientID, messageID);
        return;
    }

    LOG.info("Creating and storing subscriptions CId={}, messageId={}, topics={}", clientID, messageID, ackTopics);

    List<Subscription> newSubscriptions = doStoreSubscription(ackTopics, clientID);

    // save session, persist subscriptions from session
    for (Subscription subscription : newSubscriptions) {
        subscriptions.add(subscription.asClientTopicCouple());
    }

    LOG.info("Sending SUBACK response CId={}, messageId={}", clientID, messageID);
    channel.writeAndFlush(ackMessage);

    // fire the persisted messages in session
    for (Subscription subscription : newSubscriptions) {
        publishRetainedMessagesInSession(subscription, username);
    }

    boolean success = this.subscriptionInCourse.remove(executionKey, SubscriptionState.STORED);
    if (!success) {
        LOG.warn("Unable to perform the final subscription state update CId={}, messageId={}", clientID, messageID);
    }
}

1.channel裏面為什麽會存在clientid呢?這個問題也可以這樣描述,當連接建立之後,client發布消息的時候,netty接收到socket裏面的數據之後,他怎麽知道是哪個client的數據呢?這裏面就需要確定client與channel的映射關系。moquette是這麽做的,
在處理CONNECT的第5步,詳見http://blog.51cto.com/13579730/2073630的時候會做如下處理
private void initializeKeepAliveTimeout(Channel channel, MqttConnectMessage msg, final String clientId) {
    int keepAlive = msg.variableHeader().keepAliveTimeSeconds();
    LOG.info("Configuring connection. CId={}", clientId);
    NettyUtils.keepAlive(channel, keepAlive);
    // session.attr(NettyUtils.ATTR_KEY_CLEANSESSION).set(msg.variableHeader().isCleanSession());
    NettyUtils.cleanSession(channel, msg.variableHeader().isCleanSession());
    // used to track the client in the subscription and publishing phases.
    // session.attr(NettyUtils.ATTR_KEY_CLIENTID).set(msg.getClientID());
    NettyUtils.clientID(channel, clientId);
    int idleTime = Math.round(keepAlive * 1.5f);
    setIdleTime(channel.pipeline(), idleTime);

    if(LOG.isDebugEnabled()){
        LOG.debug("The connection has been configured CId={}, keepAlive={}, cleanSession={}, idleTime={}",
                clientId, keepAlive, msg.variableHeader().isCleanSession(), idleTime);
    }
}

    這裏面有一步NettyUtils.clientID(channel, clientId);這個不起眼的方法做了將channel與clientId映射的動作,接著跟蹤
     public static void clientID(Channel channel, String clientID) {
    channel.attr(NettyUtils.ATTR_KEY_CLIENTID).set(clientID);
}
    原來是把clientId作為一個屬性存到了channel裏面,因為channel是集成AttributeMap的,所以可以這麽做。

只要有channel與clientId的映射關系,就好說了,這也就是為什麽moquette的NettyMQTTHandler是這樣處理的

    @Override
public void channelRead(ChannelHandlerContext ctx, Object message) {
    MqttMessage msg = (MqttMessage) message;
    MqttMessageType messageType = msg.fixedHeader().messageType();
    if(LOG.isDebugEnabled())
        LOG.debug("Processing MQTT message, type={}", messageType);
    try {
        switch (messageType) {
            case CONNECT:
                m_processor.processConnect(ctx.channel(), (MqttConnectMessage) msg);
                break;
            case SUBSCRIBE:
                m_processor.processSubscribe(ctx.channel(), (MqttSubscribeMessage) msg);
                break;
            case UNSUBSCRIBE:
                m_processor.processUnsubscribe(ctx.channel(), (MqttUnsubscribeMessage) msg);
                break;
            case PUBLISH:
                m_processor.processPublish(ctx.channel(), (MqttPublishMessage) msg);
                break;
            case PUBREC:
                m_processor.processPubRec(ctx.channel(), msg);
                break;
            case PUBCOMP:
                m_processor.processPubComp(ctx.channel(), msg);
                break;
            case PUBREL:
                m_processor.processPubRel(ctx.channel(), msg);
                break;
            case DISCONNECT:
                m_processor.processDisconnect(ctx.channel());
                break;
            case PUBACK:
                m_processor.processPubAck(ctx.channel(), (MqttPubAckMessage) msg);
                break;
            case PINGREQ:
                MqttFixedHeader pingHeader = new MqttFixedHeader(
                        MqttMessageType.PINGRESP,
                        false,
                        AT_MOST_ONCE,
                        false,
                        0);
                MqttMessage pingResp = new MqttMessage(pingHeader);
                ctx.writeAndFlush(pingResp);
                break;
            default:
                LOG.error("Unkonwn MessageType:{}", messageType);
                break;

哪個tcp-socket對應哪個channel由netty負責處理,當client發送數據的時候,netty負責從ChannelHandlerContext取出channel傳給相應的業務自定義的handler進行處理。

2.創建一個正在運行中的RunningSubscription對象,之所以要創建這個對象,是為了防止重復訂閱,同時到存儲了所有的RunningSubscription的ConcurrentMap裏面查詢所有已經存在這個對象,如果存在,說明是重復的訂閱包,則不處理,這裏面調用了putIfAbsent方法,同時重寫了RunningSubscription的equals方法。packetId和clientID相同時代表是相同的RunningSubscription
3.從channel裏面取出用戶名,驗證該client下的該username是否有權利讀取該topic(訂閱該topic)的權限,這裏貼一下相關的代碼進行講解

    rivate List<MqttTopicSubscription> doVerify(String clientID, String username, MqttSubscribeMessage msg) {
    ClientSession clientSession = m_sessionsStore.sessionForClient(clientID);
    List<MqttTopicSubscription> ackTopics = new ArrayList<>();

    final int messageId = messageId(msg);
    for (MqttTopicSubscription req : msg.payload().topicSubscriptions()) {
        Topic topic = new Topic(req.topicName());
        if (!m_authorizator.canRead(topic, username, clientSession.clientID)) {
            // send SUBACK with 0x80, the user hasn‘t credentials to read the topic
            LOG.error("Client does not have read permissions on the topic CId={}, username={}, messageId={}, " +
                "topic={}", clientID, username, messageId, topic);
            ackTopics.add(new MqttTopicSubscription(topic.toString(), FAILURE));
        } else {
            MqttQoS qos;
            if (topic.isValid()) {
                LOG.info("Client will be subscribed to the topic CId={}, username={}, messageId={}, topic={}",
                    clientID, username, messageId, topic);
                qos = req.qualityOfService();
            } else {
                LOG.error("Topic filter is not valid CId={}, username={}, messageId={}, topic={}", clientID,
                    username, messageId, topic);
                qos = FAILURE;
            }
            ackTopics.add(new MqttTopicSubscription(topic.toString(), qos));
        }
    }
    return ackTopics;
}

從報文的payload裏面取出所有的訂閱請求,遍歷,然後驗證是否有權限,這個權限是在配置文件裏面配置的,詳見http://blog.51cto.com/13579730/2072467
如果沒有權限,返回SUBACK報文中標記該訂閱狀態為失敗,如果有權限,檢查topic是否有效如果有效,獲取qos,如果無效標記為失敗。
校驗之後得到一個List<MqttTopicSubscription>,再根據這個list生成SUBACK

4.將RunningSubscription的狀態從VERIFIED修改成STORED,這裏面用到了ConcurrentHashMap.replace(key,oldvalue,newvlaue)這個原子操作,如果修改失敗表面,這個訂閱請求已經存在。
5.開始存儲訂閱請求,這裏存儲訂閱請求

private List<Subscription> doStoreSubscription(List<MqttTopicSubscription> ackTopics, String clientID) {
    ClientSession clientSession = m_sessionsStore.sessionForClient(clientID);

    List<Subscription> newSubscriptions = new ArrayList<>();
    for (MqttTopicSubscription req : ackTopics) {
        // TODO this is SUPER UGLY
        if (req.qualityOfService() == FAILURE) {
            continue;
        }
        Subscription newSubscription =
                new Subscription(clientID, new Topic(req.topicName()), req.qualityOfService());

        clientSession.subscribe(newSubscription);//存儲到用戶的session裏面,用以表明該client訂閱了哪些請求
        newSubscriptions.add(newSubscription);
    }
    return newSubscriptions;
}

    我們先看存儲到用戶的session這一步
    public boolean subscribe(Subscription newSubscription) {
    LOG.info("Adding new subscription. ClientId={}, topics={}, qos={}", newSubscription.getClientId(),
        newSubscription.getTopicFilter(), newSubscription.getRequestedQos());
    boolean validTopic = newSubscription.getTopicFilter().isValid();
    if (!validTopic) {
        LOG.warn("The topic filter is not valid. ClientId={}, topics={}", newSubscription.getClientId(),
            newSubscription.getTopicFilter());
        // send SUBACK with 0x80 for this topic filter
        return false;
    }
    ClientTopicCouple matchingCouple = new ClientTopicCouple(this.clientID, newSubscription.getTopicFilter());
    Subscription existingSub = subscriptionsStore.getSubscription(matchingCouple);
    // update the selected subscriptions if not present or if has a greater qos
    if (existingSub == null || existingSub.getRequestedQos().value() < newSubscription.getRequestedQos().value()) {
        if (existingSub != null) {
            LOG.info("Subscription already existed with a lower QoS value. It will be updated. ClientId={}, " +
                "topics={}, existingQos={}, newQos={}", newSubscription.getClientId(),
                newSubscription.getTopicFilter(), existingSub.getRequestedQos(), newSubscription.getRequestedQos());
            subscriptions.remove(newSubscription);
        }
        subscriptions.add(newSubscription);//存儲到內存的session
        subscriptionsStore.addNewSubscription(newSubscription);//存儲到別的地方
    }
    return true;
}
            這裏面先創建了一個ClientTopicCouple對,然後從訂閱集合裏面查詢是否已經存在這個訂閱,如果不存在或者新的訂閱的qos要高於就的訂閱的qos,則會把訂閱添加到訂閱集合裏面,這裏有兩個存儲,一個是Set<Subscription>,一個是Map<Topic, Subscription> subscriptions(這個在ISessionsStore的具體實現裏面)moquette在這裏面做了冗余,即內存裏面會存一分,同時允許通過ISessionsStore存儲到外部。

6.我們接著看processSubscribe,這個方法會返回一個新的list
接著會遍歷這個返回的list,存儲到SubscriptionsDirectory裏面,這個維護所有的client直接的發布訂閱關系,是moquette裏面一個非常重要的對象了,裏面維護者一顆topic樹,這個後面單獨講
7.發送SUBACK
8.發布retain消息,這裏面也講解一下,這一步的作用在於,如果一個client發布了新的訂閱,那麽必須遍歷那些retain消息,如果這些新的訂閱,確實能夠匹配這些retain消息,必須將這些retain消息發送給他們。//這裏moquette的處理是遍歷map,這樣的話,當retain消息特別大的時候,效率是非常低的,會很容易拖垮那些對吞吐率和性能要求比較高的系統的。

    private void publishRetainedMessagesInSession(final Subscription newSubscription, String username) {
    LOG.info("Retrieving retained messages CId={}, topics={}", newSubscription.getClientId(),
            newSubscription.getTopicFilter());

    // scans retained messages to be published to the new subscription
    // TODO this is ugly, it does a linear scan on potential big dataset
    Collection<IMessagesStore.StoredMessage> messages = m_messagesStore.searchMatching(new IMatchingCondition() {

        @Override
        public boolean match(Topic key) {
            return key.match(newSubscription.getTopicFilter());
        }
    });

    if (!messages.isEmpty()) {
        LOG.info("Publishing retained messages CId={}, topics={}, messagesNo={}",
            newSubscription.getClientId(), newSubscription.getTopicFilter(), messages.size());
    }
    ClientSession targetSession = m_sessionsStore.sessionForClient(newSubscription.getClientId());
    this.internalRepublisher.publishRetained(targetSession, messages);

    // notify the Observables
    m_interceptor.notifyTopicSubscribed(newSubscription, username);
}

另外,用以匹配訂閱的topic與retain消息的topic是否匹配的方法也非常不完善。具體的原因大家可以看一下這裏
io.moquette.spi.impl.subscriptions.Topic#match

9.從ConcurrentMap<RunningSubscription, SubscriptionState>移除該訂閱請求。

整個RunningSubscription的狀態會從VERIFIED到STORED,這代表了整個處理過程的最重要的兩個步驟。
下一篇會講解moquette對PUBLISH報文的處理

mqtt協議-broker之moqutte源碼研究二之SUBSCRIBE報文處理