moquette改造筆記(二):優化BrokerInterceptor notifyTopicPublished()邏輯
阿新 • • 發佈:2018-12-10
發現問題
下面部分是io.moquette.spi.impl.BrokerInterceptor.java部分原始碼
@Override public void notifyClientConnected(final MqttConnectMessage msg) { for (final InterceptHandler handler : this.handlers.get(InterceptConnectMessage.class)) { LOG.debug("Sending MQTT CONNECT message to interceptor. CId={}, interceptorId={}", msg.payload().clientIdentifier(), handler.getID()); executor.execute(() -> handler.onConnect(new InterceptConnectMessage(msg))); } } @Override public void notifyClientDisconnected(final String clientID, final String username) { for (final InterceptHandler handler : this.handlers.get(InterceptDisconnectMessage.class)) { LOG.debug("Notifying MQTT client disconnection to interceptor. CId={}, username={}, interceptorId={}", clientID, username, handler.getID()); executor.execute(() -> handler.onDisconnect(new InterceptDisconnectMessage(clientID, username))); } } @Override public void notifyClientConnectionLost(final String clientID, final String username) { for (final InterceptHandler handler : this.handlers.get(InterceptConnectionLostMessage.class)) { LOG.debug("Notifying unexpected MQTT client disconnection to interceptor CId={}, username={}, " + "interceptorId={}", clientID, username, handler.getID()); executor.execute(() -> handler.onConnectionLost(new InterceptConnectionLostMessage(clientID, username))); } } @Override public void notifyTopicPublished(final MqttPublishMessage msg, final String clientID, final String username) { msg.retain(); executor.execute(() -> { try { int messageId = msg.variableHeader().messageId(); String topic = msg.variableHeader().topicName(); for (InterceptHandler handler : handlers.get(InterceptPublishMessage.class)) { LOG.debug("Notifying MQTT PUBLISH message to interceptor. CId={}, messageId={}, topic={}, " + "interceptorId={}", clientID, messageId, topic, handler.getID()); handler.onPublish(new InterceptPublishMessage(msg, clientID, username)); } } finally { ReferenceCountUtil.release(msg); } }); } @Override public void notifyTopicSubscribed(final Subscription sub, final String username) { for (final InterceptHandler handler : this.handlers.get(InterceptSubscribeMessage.class)) { LOG.debug("Notifying MQTT SUBSCRIBE message to interceptor. CId={}, topicFilter={}, interceptorId={}", sub.getClientId(), sub.getTopicFilter(), handler.getID()); executor.execute(() -> handler.onSubscribe(new InterceptSubscribeMessage(sub, username))); } }
可以發現在除了notifyTopicPublished(),其它方法中,在for迴圈中每次for迴圈,對於InterceptHandler的呼叫都是線上程池中每次都是新執行一個任務,但是在notifyTopicPublished()方法中是在一個執行緒中for迴圈依次呼叫,這樣處理首先沒有用到執行緒池的多執行緒,其次是一旦某個InterceptHandler的notifyTopicPublished方法是阻塞的,那麼後面的InterceptHandler的notifyTopicPublished()都會被阻塞。
優化邏輯
優化方向:向啟動方法一樣,每次呼叫InterceptHandler的notifyTopicPublished方法都是線上程池中新建一個任務
具體程式碼:
public class PublishTask implements Runnable { final MqttPublishMessage msg; final InterceptHandler interceptHandler; final String clientId; final String username; PublishTask(MqttPublishMessage msg, InterceptHandler interceptHandler, String clientId, String username) { this.msg = msg; this.interceptHandler = interceptHandler; this.clientId = clientId; this.username = username; } @Override public void run() { try { interceptHandler.onPublish(new InterceptPublishMessage(msg, clientId, username)); } finally { ReferenceCountUtil.release(msg); } } @Override public String toString() { return "PublishTask{" + "msg=" + msg + ", interceptHandler=" + interceptHandler + ", clientId='" + clientId + '\'' + ", username='" + username + '\'' + '}'; } public InterceptHandler getInterceptHandler() { return interceptHandler; } } @Override public void notifyTopicPublished(final MqttPublishMessage msg, final String clientID, final String username) { msg.retain(); try { for (InterceptHandler handler : handlers.get(InterceptPublishMessage.class)) { executor.execute(new PublishTask(msg.retainedDuplicate(), handler, clientID, username)); } } finally { ReferenceCountUtil.release(msg); } // executor.execute(() -> { // try { // int messageId = msg.variableHeader().messageId(); // String topic = msg.variableHeader().topicName(); // for (InterceptHandler handler : handlers.get(InterceptPublishMessage.class)) { // LOG.debug("Notifying MQTT PUBLISH message to interceptor. CId={}, messageId={}, topic={}, " // + "interceptorId={}", clientID, messageId, topic, handler.getID()); // handler.onPublish(new InterceptPublishMessage(msg, clientID, username)); // } // } finally { // ReferenceCountUtil.release(msg); // } // }); }
解釋:(1)新建一個PublishTask用來實現呼叫handler.onPublish()方法。其中要注意new PublishTask(msg.retainedDuplicate(), handler, clientID, username)中的msg.retainedDuplicate()還要主要在兩個finally中ReferenceCountUtil.release(msg);
(2)PublishTask的toString()和getInterceptHandler()可以先不用管,會在其它地方用到,下一篇文章會講到。