1. 程式人生 > >moquette改造筆記(二):優化BrokerInterceptor notifyTopicPublished()邏輯

moquette改造筆記(二):優化BrokerInterceptor notifyTopicPublished()邏輯

發現問題

下面部分是io.moquette.spi.impl.BrokerInterceptor.java部分原始碼

@Override
    public void notifyClientConnected(final MqttConnectMessage msg) {
        for (final InterceptHandler handler : this.handlers.get(InterceptConnectMessage.class)) {
            LOG.debug("Sending MQTT CONNECT message to interceptor. CId={}, interceptorId={}",
                    msg.payload().clientIdentifier(), handler.getID());
            executor.execute(() -> handler.onConnect(new InterceptConnectMessage(msg)));
        }
    }

    @Override
    public void notifyClientDisconnected(final String clientID, final String username) {
        for (final InterceptHandler handler : this.handlers.get(InterceptDisconnectMessage.class)) {
            LOG.debug("Notifying MQTT client disconnection to interceptor. CId={}, username={}, interceptorId={}",
                clientID, username, handler.getID());
            executor.execute(() -> handler.onDisconnect(new InterceptDisconnectMessage(clientID, username)));
        }
    }

    @Override
    public void notifyClientConnectionLost(final String clientID, final String username) {
        for (final InterceptHandler handler : this.handlers.get(InterceptConnectionLostMessage.class)) {
            LOG.debug("Notifying unexpected MQTT client disconnection to interceptor CId={}, username={}, " +
                "interceptorId={}", clientID, username, handler.getID());
            executor.execute(() -> handler.onConnectionLost(new InterceptConnectionLostMessage(clientID, username)));
        }
    }

    @Override
    public void notifyTopicPublished(final MqttPublishMessage msg, final String clientID, final String username) {
        msg.retain();

        executor.execute(() -> {
                try {
                    int messageId = msg.variableHeader().messageId();
                    String topic = msg.variableHeader().topicName();
                    for (InterceptHandler handler : handlers.get(InterceptPublishMessage.class)) {
                        LOG.debug("Notifying MQTT PUBLISH message to interceptor. CId={}, messageId={}, topic={}, "
                                + "interceptorId={}", clientID, messageId, topic, handler.getID());
                        handler.onPublish(new InterceptPublishMessage(msg, clientID, username));
                    }
                } finally {
                    ReferenceCountUtil.release(msg);
                }
        });
    }

    @Override
    public void notifyTopicSubscribed(final Subscription sub, final String username) {
        for (final InterceptHandler handler : this.handlers.get(InterceptSubscribeMessage.class)) {
            LOG.debug("Notifying MQTT SUBSCRIBE message to interceptor. CId={}, topicFilter={}, interceptorId={}",
                sub.getClientId(), sub.getTopicFilter(), handler.getID());
            executor.execute(() -> handler.onSubscribe(new InterceptSubscribeMessage(sub, username)));
        }
    }

可以發現在除了notifyTopicPublished(),其它方法中,在for迴圈中每次for迴圈,對於InterceptHandler的呼叫都是線上程池中每次都是新執行一個任務,但是在notifyTopicPublished()方法中是在一個執行緒中for迴圈依次呼叫,這樣處理首先沒有用到執行緒池的多執行緒,其次是一旦某個InterceptHandler的notifyTopicPublished方法是阻塞的,那麼後面的InterceptHandler的notifyTopicPublished()都會被阻塞。

優化邏輯

優化方向:向啟動方法一樣,每次呼叫InterceptHandler的notifyTopicPublished方法都是線上程池中新建一個任務

具體程式碼:

public class PublishTask implements Runnable {
        final MqttPublishMessage msg;
        final InterceptHandler interceptHandler;
        final String clientId;
        final String username;

        PublishTask(MqttPublishMessage msg, InterceptHandler interceptHandler, String clientId, String username) {
            this.msg = msg;
            this.interceptHandler = interceptHandler;
            this.clientId = clientId;
            this.username = username;
        }

        @Override
        public void run() {
            try {
                interceptHandler.onPublish(new InterceptPublishMessage(msg, clientId, username));
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }

        @Override
        public String toString() {
            return "PublishTask{" +
                    "msg=" + msg +
                    ", interceptHandler=" + interceptHandler +
                    ", clientId='" + clientId + '\'' +
                    ", username='" + username + '\'' +
                    '}';
        }

        public InterceptHandler getInterceptHandler() {
            return interceptHandler;
        }
    }


    @Override
    public void notifyTopicPublished(final MqttPublishMessage msg, final String clientID, final String username) {

        msg.retain();

        try {
            for (InterceptHandler handler : handlers.get(InterceptPublishMessage.class)) {
                executor.execute(new PublishTask(msg.retainedDuplicate(), handler, clientID, username));
            }
        } finally {
            ReferenceCountUtil.release(msg);
        }


//        executor.execute(() -> {
//                try {
//                    int messageId = msg.variableHeader().messageId();
//                    String topic = msg.variableHeader().topicName();
//                    for (InterceptHandler handler : handlers.get(InterceptPublishMessage.class)) {
//                        LOG.debug("Notifying MQTT PUBLISH message to interceptor. CId={}, messageId={}, topic={}, "
//                                + "interceptorId={}", clientID, messageId, topic, handler.getID());
//                        handler.onPublish(new InterceptPublishMessage(msg, clientID, username));
//                    }
//                } finally {
//                    ReferenceCountUtil.release(msg);
//                }
//        });
    }

解釋:(1)新建一個PublishTask用來實現呼叫handler.onPublish()方法。其中要注意new PublishTask(msg.retainedDuplicate(), handler, clientID, username)中的msg.retainedDuplicate()還要主要在兩個finally中ReferenceCountUtil.release(msg);

(2)PublishTask的toString()和getInterceptHandler()可以先不用管,會在其它地方用到,下一篇文章會講到。