1. 程式人生 > >MQTT---HiveMQ原始碼詳解(十三)Netty-MQTT訊息、事件處理(原始碼舉例解讀)

MQTT---HiveMQ原始碼詳解(十三)Netty-MQTT訊息、事件處理(原始碼舉例解讀)

MQTT交流群:221405150

前言

由於上一篇講的都是大致的流程,所以這一篇我們抽取流程中的一步,給大家介紹Authentication部分的原始碼,讓大家對上一節的理解更深。

MqttConnectHandler

MqttConnectHandler是SimpleChannelInboundHandler的子類

channelRead0

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Connect msg) throws Exception {
        try
{ //加入MqttDisallowSecondConnect ctx.pipeline().addAfter(Pipelines.MQTT_MESSAGE_DECODER, Pipelines.MQTT_DISALLOW_SECOND_CONNECT, this.disallowSecondConnect); } catch (IllegalArgumentException e) { ctx.pipeline().firstContext().fireChannelRead(msg); return; } //校驗clientid
if (!validIdentifier(ctx, msg)) { return; } //標誌是否接管 ctx.channel().attr(AttributeKeys.MQTT_TAKEN_OVER).set(false); //刪除連線成功,未發connect訊息超時handler removeConnectIdleHandler(ctx); //進入外掛認證階段 pluginOnAuthentication(ctx, msg); }

pluginOnAuthentication

private void pluginOnAuthentication(ChannelHandlerContext ctx, Connect connect) {
    //獲得clienttoken,ClientToken是ClientCredentials實現類
        ClientToken clientToken = ChannelUtils.clientToken(ctx.channel());
        //判斷callbackRegistry中是否存在可用的OnAuthenticationCallback
        if (this.callbackRegistry.isAvailable(OnAuthenticationCallback.class)) {

//新增PluginOnAuthenticationCallbackHandler,
         ctx.pipeline().addLast(Pipelines.PLUGIN_ON_AUTHENTICATION_CALLBACK_HANDLER, this.pluginOnAuthenticationCallbackHandlerProvider.get());
         //觸發PluginOnAuthentication事件
            ctx.fireUserEventTriggered(new PluginOnAuthentication(connect, clientToken));
        } else {
        //如果沒有可用OnAuthenticationCallback,那麼認為是不需要做Authentication,就去處理LWT,因為當client掉線後,會觸發傳送遺言,所以需要先判斷對與該遺言釋出到topic是否具備許可權
            pluginOnAuthorizationLWT(ctx, null, connect, clientToken, ReturnCode.ACCEPTED, true);
        }
    }

userEventTriggered

@Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        //當外掛認證完成
        if (evt instanceof PluginOnAuthenticationCompleted) {
        //進行認證完成後的處理
            pluginOnAuthenticationCompleted(ctx, (PluginOnAuthenticationCompleted) evt);
        } else if (evt instanceof PluginRestrictionsAfterLoginCompleted) {
            pluginRestrictionsAfterLoginCompleted(ctx, (PluginRestrictionsAfterLoginCompleted) evt);
        } else if (evt instanceof PluginOnConnectCompleted) {
            pluginOnConnectCompleted(ctx, (PluginOnConnectCompleted) evt);
        } else if (evt instanceof PluginOnAuthorizationCompleted) {
            pluginOnAuthorizationCompleted(ctx, (PluginOnAuthorizationCompleted) evt);
        } else if (evt instanceof MqttConnectPersistenceHandler.OnConnectPersistenceCompleted) {
            MqttConnectPersistenceHandler.OnConnectPersistenceCompleted event = (MqttConnectPersistenceHandler.OnConnectPersistenceCompleted) evt;
            onConnectPersistenceCompleted(ctx, event.getConnect(), event.isSessionPresent());
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

pluginOnAuthenticationCompleted

    private void pluginOnAuthenticationCompleted(ChannelHandlerContext ctx,
                                                 PluginOnAuthenticationCompleted event) {
        //獲得處理完成的ReturnCode                                       
        ReturnCode returnCode = event.getReturnCode();
        boolean accepted = returnCode == ReturnCode.ACCEPTED;
        //處理LWT
        pluginOnAuthorizationLWT(ctx, event.getException(), event.getConnect(),
                event.getClientCredentials(), returnCode, accepted);
    }

PluginOnAuthenticationCallbackHandler


@Singleton
@ChannelHandler.Sharable
public class PluginOnAuthenticationCallbackHandler extends ChannelInboundHandlerAdapter {
    private static final Logger LOGGER = LoggerFactory.getLogger(PluginOnAuthenticationCallbackHandler.class);
    private final CallbackRegistry callbackRegistry;
    private final HiveMQConfigurationService hiveMQConfigurationService;
    private final Metrics metrics;
    private final CallbackExecutor callbackExecutor;

    @Inject
    public PluginOnAuthenticationCallbackHandler(CallbackRegistry callbackRegistry,
                                                 HiveMQConfigurationService hiveMQConfigurationService,
                                                 Metrics metrics,
                                                 CallbackExecutor callbackExecutor) {
        this.callbackRegistry = callbackRegistry;
        this.hiveMQConfigurationService = hiveMQConfigurationService;
        this.metrics = metrics;
        this.callbackExecutor = callbackExecutor;
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        //當需要外掛認證
        if (evt instanceof PluginOnAuthentication) {
            //進行認證
            onAuthentication(ctx, (PluginOnAuthentication) evt);
        //當單一一個外掛認證完成
        } else if (evt instanceof PluginOnAuthenticationCallbackCompleted) {
            //對一個外掛認證完成的結果進行處理
            onAuthenticationCallbackCompleted(ctx, (PluginOnAuthenticationCallbackCompleted) evt);
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    private void onAuthentication(ChannelHandlerContext ctx, PluginOnAuthentication event) {
        //判斷是否存在可用OnAuthenticationCallback.class,這裡再判斷一次原因是因為有時差。
        boolean available = this.callbackRegistry.isAvailable(OnAuthenticationCallback.class);
        if (available) {
        //獲得所有已註冊的OnAuthenticationCallback,並構建新的佇列
            Deque<OnAuthenticationCallback> leftCallbacks = new ArrayDeque(this.callbackRegistry.getCallbacks(OnAuthenticationCallback.class));
            //獲得認證證書
            ClientCredentials clientCredentials = event.getClientCredentials();
            //獲得callback數量,並作為期待的返回結果數量,以後面處理完成作為一個判斷條件
            int expectedResultCount = leftCallbacks.size();
            //poll一個OnAuthenticationCallback,進行認證
            OnAuthenticationCallback callback = leftCallbacks.poll();
            //構建儲存結果的list
            List<PluginOnAuthenticationResult> results = new ArrayList(leftCallbacks.size());
            //提交認證task
            submitTask(ctx, callback, clientCredentials, event.getConnect(), leftCallbacks, results, expectedResultCount);
            //如果使用者配置需要所有外掛都必須全部認證通過,才認為通過認證,併發布認證完成事件事件
        } else if (needAllPluginsToReturnTrue()) {
            ctx.pipeline().fireUserEventTriggered(new PluginOnAuthenticationCompleted(
                    event.getConnect(), event.getClientCredentials(), ReturnCode.REFUSED_NOT_AUTHORIZED, new AuthenticationException("No OnAuthenticationCallback available", ReturnCode.REFUSED_NOT_AUTHORIZED)));
        } else {
        //否則,認為認證通過,併發布認證完成事件
            ctx.pipeline().fireUserEventTriggered(new PluginOnAuthenticationCompleted(
                    event.getConnect(), event.getClientCredentials(), ReturnCode.ACCEPTED));
        }
    }

    //當一個callback認證完成
    private void onAuthenticationCallbackCompleted(ChannelHandlerContext ctx,
                                                   PluginOnAuthenticationCallbackCompleted event) {
        //獲得事件結果
        List<PluginOnAuthenticationResult> results = event.getResults();
        //獲得最後一個result
        PluginOnAuthenticationResult lastResult = results.get(results.size() - 1);
        Connect connect = event.getConnect();
        ClientCredentials clientCredentials = event.getClientCredentials();
        //判斷是否可以提前結束,也就是可以確定的到可以返回client端ConnAck
        if (lastResult.isRefused() ||
                lastResult.isAuthenticated() && !needAllPluginsToReturnTrue() ||
                !lastResult.isAuthenticated() && needAllPluginsToReturnTrue()) {
                //觸發認證完成事件
            ctx.pipeline().fireUserEventTriggered(new PluginOnAuthenticationCompleted(
                    connect, clientCredentials, lastResult.getReturnCode(), lastResult.getException()));
            //判斷當前pipeline中是否存在當前handler,並移除
            if (ctx.pipeline().get(getClass()) != null) {
                ctx.pipeline().remove(this);
            }
            return;
        }
        //如果所有外掛認證都完成了
        if (results.size() == event.getExpectedResultCount()) {
            //如果認證通過
            if (accepted(results)) {
                ctx.pipeline().fireUserEventTriggered(new PluginOnAuthenticationCompleted(
                        connect, clientCredentials, ReturnCode.ACCEPTED));
            //否則認證失敗
            } else {
                ctx.pipeline().fireUserEventTriggered(new PluginOnAuthenticationCompleted(
                        connect, clientCredentials, ReturnCode.REFUSED_NOT_AUTHORIZED));
            }
            //判斷並移除
            if (ctx.pipeline().get(getClass()) != null) {
                ctx.pipeline().remove(this);
            }
            return;
        }
        //如果還有外掛未完成認證
        Queue<OnAuthenticationCallback> leftCallbacks = event.getLeftCallbacks();
        //poll一個OnAuthenticationCallback
        OnAuthenticationCallback callback = leftCallbacks.poll();
        //繼續提交認證task
        submitTask(ctx, callback, clientCredentials, connect,
                leftCallbacks, results, event.getExpectedResultCount());
    }

    private void submitTask(ChannelHandlerContext ctx,
                            OnAuthenticationCallback callback,
                            ClientCredentials clientCredentials,
                            Connect connect,
                            Queue<OnAuthenticationCallback> leftCallbacks,
                            List<PluginOnAuthenticationResult> results,
                            int expectedResultCount) {
        //獲得到Future
        ListenableFuture future = this.callbackExecutor.submit(createTask(callback, clientCredentials));
        //建立獲得結果的callback
        ResultCallback resultCallback = createResultCallback(ctx, clientCredentials, connect, leftCallbacks, results, expectedResultCount);
        //同步獲得結果
        Futures.addCallback(future, resultCallback, ctx.executor().parent());
    }


    //建立認證task
    @NotNull
    @VisibleForTesting
    Task createTask(OnAuthenticationCallback callback,
                    ClientCredentials clientCredentials) {
        return new Task(callback, clientCredentials, this.metrics);
    }

    //建立獲得認證結果FutureCallback
    @NotNull
    @VisibleForTesting
    ResultCallback createResultCallback(ChannelHandlerContext ctx,
                                        ClientCredentials clientCredentials,
                                        Connect connect,
                                        Queue<OnAuthenticationCallback> leftCallbacks,
                                        List<PluginOnAuthenticationResult> results,
                                        int expectedResultCount) {
        return new ResultCallback(ctx, clientCredentials, connect,
                leftCallbacks, results, expectedResultCount);
    }
    //獲得使用者外掛認證配置
    private boolean needAllPluginsToReturnTrue() {
        return this.hiveMQConfigurationService.internalConfiguration()
                .getBoolean(Internals.PLUGIN_AUTHENTICATION_NEED_ALL_PLUGINS_TO_RETURN_TRUE);
    }
    //判斷是否認證通過
    private boolean accepted(List<PluginOnAuthenticationResult> results) {
        boolean needAllPluginsToReturnTrue = needAllPluginsToReturnTrue();
        for (PluginOnAuthenticationResult result : results) {
            if (!needAllPluginsToReturnTrue && result.isAuthenticated()) {
                return true;
            }
            if (needAllPluginsToReturnTrue && !result.isAuthenticated()) {
                return false;
            }
        }
        return needAllPluginsToReturnTrue;
    }

    //同步獲得結果的FutureCallback
    @VisibleForTesting
    static class ResultCallback implements FutureCallback<PluginOnAuthenticationResult> {
        private final ChannelHandlerContext ctx;
        private final ClientCredentials clientCredentials;
        private final Connect connect;
        private final Queue<OnAuthenticationCallback> leftCallbacks;
        private final List<PluginOnAuthenticationResult> results;
        private final int expectedResultCount;

        public ResultCallback(ChannelHandlerContext ctx,
                              ClientCredentials clientCredentials,
                              Connect connect,
                              Queue<OnAuthenticationCallback> leftCallbacks,
                              List<PluginOnAuthenticationResult> results,
                              int expectedResultCount) {
            this.ctx = ctx;
            this.clientCredentials = clientCredentials;
            this.connect = connect;
            this.leftCallbacks = leftCallbacks;
            this.results = results;
            this.expectedResultCount = expectedResultCount;
        }

        //沒有異常,回撥,並觸發一個外掛認證完成事件
        @Override
        public void onSuccess(@Nullable PluginOnAuthenticationResult result) {
            this.results.add(result);
            this.ctx.pipeline().fireUserEventTriggered(
                    new PluginOnAuthenticationCallbackCompleted(this.leftCallbacks, this.results, this.connect,
                            this.clientCredentials, this.expectedResultCount));
        }
        //有異常,回撥,並觸發一個外掛認證完成事件
        public void onFailure(Throwable t) {
            LOGGER.error("OnAuthenticationCallback failed. Skipping all other handlers");
            this.results.add(new PluginOnAuthenticationResult(false, ReturnCode.REFUSED_NOT_AUTHORIZED, true,
                    new AuthenticationException(t.getMessage() + " See log for more information", ReturnCode.REFUSED_NOT_AUTHORIZED)));
            this.ctx.pipeline().fireUserEventTriggered(
                    new PluginOnAuthenticationCallbackCompleted(this.leftCallbacks, this.results,
                            this.connect, this.clientCredentials, this.expectedResultCount));
        }
    }

    //認證task
    @VisibleForTesting
    static class Task implements CallableTask<PluginOnAuthenticationResult> {
        private final OnAuthenticationCallback callback;
        private final ClientCredentials clientCredentials;
        private final Metrics metrics;

        public Task(@NotNull OnAuthenticationCallback callback,
                    ClientCredentials clientCredentials,
                    Metrics metrics) {
            this.callback = callback;
            this.clientCredentials = clientCredentials;
            this.metrics = metrics;
        }

        @Override
        public PluginOnAuthenticationResult call() throws Exception {
            //獲得埋點該外掛執行時間的上下文
            Timer.Context timer = this.metrics.pluginTimerAuthentication().time();
            try {
                //呼叫callback,去認證
                Boolean authenticated = this.callback.checkCredentials(this.clientCredentials);
                //構建認證結果
                PluginOnAuthenticationResult result = new PluginOnAuthenticationResult(authenticated, authenticated ? ReturnCode.ACCEPTED : ReturnCode.REFUSED_NOT_AUTHORIZED, false);
                //停止外掛執行時間計時
                timer.stop();
                return result;
            } catch (AuthenticationException e) {
                //當外掛丟擲認證失敗exception
                LOGGER.debug("An exception was raised when calling the OnAuthenticationCallback {}:", this.callback.getClass(), e);
                //構建認證結果
                PluginOnAuthenticationResult result = new PluginOnAuthenticationResult(false, e.getReturnCode(), true, e);
                //停止計時
                timer.stop();
                return result;
            } catch (Throwable t) {
                //當外掛丟擲其他Throwable
                LOGGER.error("Unhandled Exception in OnAuthenticationCallback {}. Skipping all other handlers", this.callback.getClass());
                //停止計時
                timer.stop();
                //外掛異常的處理器,去記錄日誌
                PluginExceptionUtils.log(t);
                //構建認證結果
                return new PluginOnAuthenticationResult(false, ReturnCode.REFUSED_NOT_AUTHORIZED, true,
                        new AuthenticationException(t.getMessage() + " See log for more information", ReturnCode.REFUSED_NOT_AUTHORIZED));
            }
        }

        @NotNull
        public Class callbackType() {
            return this.callback.getClass();
        }
    }
}

其他事件類

這幾個類都是簡單的pojo,在前面的原始碼註釋中都已經描述過,所以就不寫註釋了。

public class PluginOnAuthentication {
    private final Connect connect;
    private final ClientCredentials clientCredentials;

    public PluginOnAuthentication(Connect connect, ClientCredentials clientCredentials) {
        this.connect = connect;
        this.clientCredentials = clientCredentials;
    }

    public Connect getConnect() {
        return connect;
    }

    public ClientCredentials getClientCredentials() {
        return clientCredentials;
    }
}

public class PluginOnAuthenticationCallbackCompleted {
    private final Queue<OnAuthenticationCallback> leftCallbacks;
    private final int expectedResultCount;
    private final List<PluginOnAuthenticationResult> results;
    private final Connect connect;
    private final ClientCredentials clientCredentials;

    public PluginOnAuthenticationCallbackCompleted(Queue<OnAuthenticationCallback> leftCallbacks,
                                                   List<PluginOnAuthenticationResult> results,
                                                   Connect connect,
                                                   ClientCredentials clientCredentials,
                                                   int expectedResultCount) {
        this.leftCallbacks = leftCallbacks;
        this.results = results;
        this.connect = connect;
        this.clientCredentials = clientCredentials;
        this.expectedResultCount = expectedResultCount;
    }

    public Queue<OnAuthenticationCallback> getLeftCallbacks() {
        return leftCallbacks;
    }

    public int getExpectedResultCount() {
        return expectedResultCount;
    }

    public List<PluginOnAuthenticationResult> getResults() {
        return results;
    }

    public Connect getConnect() {
        return connect;
    }

    public ClientCredentials getClientCredentials() {
        return clientCredentials;
    }
}

public class PluginOnAuthenticationCompleted {
    private final Connect connect;
    private final ClientCredentials clientCredentials;
    private final ReturnCode returnCode;
    private final AuthenticationException exception;

    public PluginOnAuthenticationCompleted(Connect connect,
                                           ClientCredentials clientCredentials,
                                           ReturnCode returnCode) {
        this.connect = connect;
        this.clientCredentials = clientCredentials;
        this.returnCode = returnCode;
        this.exception = null;
    }

    public PluginOnAuthenticationCompleted(Connect connect,
                                           ClientCredentials clientCredentials,
                                           ReturnCode returnCode,
                                           AuthenticationException exception) {
        this.connect = connect;
        this.clientCredentials = clientCredentials;
        this.returnCode = returnCode;
        this.exception = exception;
    }

    public Connect getConnect() {
        return connect;
    }

    public ClientCredentials getClientCredentials() {
        return clientCredentials;
    }

    public ReturnCode getReturnCode() {
        return returnCode;
    }

    public AuthenticationException getException() {
        return exception;
    }
}