1. 程式人生 > >mqtt協議-broker之moqutte源碼研究六之集群

mqtt協議-broker之moqutte源碼研究六之集群

mqtt broker moquette

moquette的集群功能是通過Hazelcast來實現的,對Hazelcast不了解的同學可以自行Google以下。
在講解moquette的集群功能之前需要講解一下moquette的攔截器,因為moquette對Hazelcast的集成本身就是通過攔截器來實現的。

一。攔截器
io.moquette.spi.impl.ProtocolProcessor類裏面有一個BrokerInterceptor類,這個類就是broker攔截器,這個對象,在processConnect,processPubAck,processPubComp,processDisconnect,processConnectionLost,processUnsubscribe,processSubscribe,processPublish等八個地方都用到了,說明在broker處理各個報文的關鍵期間都會用到,我們先看一這個類的結構

 private static final Logger LOG = LoggerFactory.getLogger(BrokerInterceptor.class);
private final Map<Class<?>, List<InterceptHandler>> handlers;
private final ExecutorService executor;

private BrokerInterceptor(int poolSize, List<InterceptHandler> handlers) {
    LOG.info("Initializing broker interceptor. InterceptorIds={}", getInterceptorIds(handlers));
    this.handlers = new HashMap<>();
    for (Class<?> messageType : InterceptHandler.ALL_MESSAGE_TYPES) {
        this.handlers.put(messageType, new CopyOnWriteArrayList<InterceptHandler>());
    }
    for (InterceptHandler handler : handlers) {
        this.addInterceptHandler(handler);
    }
    executor = Executors.newFixedThreadPool(poolSize);
}

裏面有個map,有一個,看構造方法,發現key是(InterceptConnectMessage.class, InterceptDisconnectMessage.class,
InterceptConnectionLostMessage.class, InterceptPublishMessage.class, InterceptSubscribeMessage.class,
InterceptUnsubscribeMessage.class, InterceptAcknowledgedMessage.class)其中的一個,發現總共有七個消息類,這七個消息類,剛好與上面的八個使用的地方對應上(processPubAck,processPubComp都對應InterceptAcknowledgedMessage)。value是一個list,裏面放的是InterceptHandler,即處理器。

從這個map可以看出來,moquette允許你在,各個關鍵階段註冊一系列的處理器,供它回調。

二。HazelcastInterceptHandler
HazelcastInterceptHandler集成了AbstractInterceptHandler,AbstractInterceptHandler實現了InterceptHandler,這裏吐槽一下Java,Java要求實現一個Java的interface就要實現所有的接口,可是很多時候我就只想實現其中一個方法,如果接口裏面有10個方法結果
我的實現類裏面會出現9個空方法,顯得代碼非常不簡潔,當然,用個抽象類緩沖一下是一個辦法,但是我又得加一個類,還好Java8,有個默認方法,能夠解決這個問題。
我們跟蹤一下,看一下HazelcastInterceptHandler,是什麽時候被放進BrokerInterceptor的。

從啟動類Server裏面的main一路跟蹤到下面的代碼

    public void startServer(IConfig config, List<? extends InterceptHandler> handlers, ISslContextCreator sslCtxCreator,
        IAuthenticator authenticator, IAuthorizator authorizator) throws IOException {
    if (handlers == null) {
        handlers = Collections.emptyList();
    }
    LOG.info("Starting Moquette Server. MQTT message interceptors={}", getInterceptorIds(handlers));

    scheduler = Executors.newScheduledThreadPool(1);

    final String handlerProp = System.getProperty(BrokerConstants.INTERCEPT_HANDLER_PROPERTY_NAME);
    if (handlerProp != null) {
        config.setProperty(BrokerConstants.INTERCEPT_HANDLER_PROPERTY_NAME, handlerProp);
    }
    configureCluster(config);
    final String persistencePath = config.getProperty(BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME);
    LOG.info("Configuring Using persistent store file, path={}", persistencePath);
    m_processorBootstrapper = new ProtocolProcessorBootstrapper();
    final ProtocolProcessor processor = m_processorBootstrapper.init(config, handlers, authenticator, authorizator,
        this);
    LOG.info("Initialized MQTT protocol processor");
    if (sslCtxCreator == null) {
        LOG.warn("Using default SSL context creator");
        sslCtxCreator = new DefaultMoquetteSslContextCreator(config);
    }

    LOG.info("Binding server to the configured ports");
    m_acceptor = new NettyAcceptor();
    m_acceptor.initialize(processor, config, sslCtxCreator);
    m_processor = processor;

    LOG.info("Moquette server has been initialized successfully");
    m_initialized = true;
}

    上面有三點需要註意,
    1.是從啟動參數裏面獲取INTERCEPT_HANDLER_PROPERTY_NAME = "intercept.handler",放入config,說明註冊處理器,是需要通過啟動參數指定的,而且目前的版本還只能指定一個。
    2.configureCluster(config);配置集群,我們暫且不看怎麽配置的。後面再講
    3.通過ProtocolProcessorBootstrapper初始化ProtocolProcessor,這個類有多重要,前面幾篇文章已經講過了。它的init方法,很有意思,裏面做了很多事情,基本上就是初始化ProtocolProcessor裏面的各種對象,其中與本文要講的摘抄下來

    LOG.info("Configuring message interceptors...");

    List<InterceptHandler> observers = new ArrayList<>(embeddedObservers);
    String interceptorClassName = props.getProperty(BrokerConstants.INTERCEPT_HANDLER_PROPERTY_NAME);
    if (interceptorClassName != null && !interceptorClassName.isEmpty()) {
        InterceptHandler handler = loadClass(interceptorClassName, InterceptHandler.class, Server.class, server);
        if (handler != null) {
            observers.add(handler);
        }
    }
    BrokerInterceptor interceptor = new BrokBrokerInterceptor(props, observers);

這裏面就是根據classname,實例化處理器,然後創建BrokerInterceptor對象,它的狗爪方法裏面調用了這個方法addInterceptHandler

     @Override
public void addInterceptHandler(InterceptHandler interceptHandler) {
    Class<?>[] interceptedMessageTypes = getInterceptedMessageTypes(interceptHandler);
    LOG.info("Adding MQTT message interceptor. InterceptorId={}, handledMessageTypes={}",
        interceptHandler.getID(), interceptedMessageTypes);
    for (Class<?> interceptMessageType : interceptedMessageTypes) {
        this.handlers.get(interceptMessageType).add(interceptHandler);
    }
}

    private static Class<?>[] getInterceptedMessageTypes(InterceptHandler interceptHandler) {
    Class<?>[] interceptedMessageTypes = interceptHandler.getInterceptedMessageTypes();
    if (interceptedMessageTypes == null) {
        return InterceptHandler.ALL_MESSAGE_TYPES;
    }
    return interceptedMessageTypes;
}

調用了interceptHandler.getInterceptedMessageTypes();說明每個InterceptHandler都實現了這個方法,即它自身必須告訴別人,它準備註冊到哪幾個事件上,即上面的七個類型。
一個處理器,可以註冊到上面七個中的一個,或多個,或全部,如果返回的是空,則默認是全部。HazelcastInterceptHandler就註冊到了所有的上面

到此我們就明白了HazelcastInterceptHandler註冊到攔截器的全過程。

三。集群間通信。
我們回答一下上面的問題,看一下configureCluster的全過程

private void configureCluster(IConfig config) throws FileNotFoundException {
    LOG.info("Configuring embedded Hazelcast instance");
    String interceptHandlerClassname = config.getProperty(BrokerConstants.INTERCEPT_HANDLER_PROPERTY_NAME);
    if (interceptHandlerClassname == null || !HZ_INTERCEPT_HANDLER.equals(interceptHandlerClassname)) {
        LOG.info("There are no Hazelcast intercept handlers. The server won‘t start a Hazelcast instance.");
        return;
    }
    String hzConfigPath = config.getProperty(BrokerConstants.HAZELCAST_CONFIGURATION);
    if (hzConfigPath != null) {
        boolean isHzConfigOnClasspath = this.getClass().getClassLoader().getResource(hzConfigPath) != null;
        Config hzconfig = isHzConfigOnClasspath
                ? new ClasspathXmlConfig(hzConfigPath)
                : new FileSystemXmlConfig(hzConfigPath);
        LOG.info("Starting Hazelcast instance. ConfigurationFile={}", hzconfig);
        hazelcastInstance = Hazelcast.newHazelcastInstance(hzconfig);
    } else {
        LOG.info("Starting Hazelcast instance with default configuration");
        hazelcastInstance = Hazelcast.newHazelcastInstance();
    }
    listenOnHazelCastMsg();
}

1.判斷是否註冊了HazelcastInterceptHandler,如果沒有直接跳出方法
2.從config裏面獲取hazelcast.configuration的位置,並且加載配置文件,同時創建HazelcastInstance實例。這個配置文件在moquette-master-improve/distribution/src/main/resources下有一個模版,也可以參考下面的

    <network>
<public-address>IP1:5701</public-address>
<port>5701</port>
<join>
   <multicast enabled="false" />
   <tcp-ip enabled="true">
          <required-member>IP2:5701</required-member>
   </tcp-ip>
</join>
</network>
    3.監聽HazelcastInstance,具體的監聽對象是HazelcastListener,這個類剛好和HazelcastInterceptHandler構成一對。剛好用來完成集群間的同步

先看一下HazelcastInterceptHandler,裏面只有一個方法,onPublish,即監聽事件的發生,發送消息。

然後看一下HazelcastListener,裏面也只有一個方法,onMessage,即當有集群中的其他節點發送消息給當前機器的時候,由Hazelcast,調用HazelcastListener的onMessage方法,看一下這個方法的邏輯

    public void onMessage(Message<HazelcastMsg> msg) {
    try {
        if (!msg.getPublishingMember().equals(server.getHazelcastInstance().getCluster().getLocalMember())) {
            HazelcastMsg hzMsg = msg.getMessageObject();
            LOG.info("{} received from hazelcast for topic {} message: {}", hzMsg.getClientId(), hzMsg.getTopic(),
                hzMsg.getPayload());
            // TODO pass forward this information in somehow publishMessage.setLocal(false);

            MqttQoS qos = MqttQoS.valueOf(hzMsg.getQos());
            MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, false, 0);
            MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(hzMsg.getTopic(), 0);
            ByteBuf payload = Unpooled.wrappedBuffer(hzMsg.getPayload());
            MqttPublishMessage publishMessage = new MqttPublishMessage(fixedHeader, varHeader, payload);
            server.internalPublish(publishMessage, hzMsg.getClientId());
        }
    } catch (Exception ex) {
        LOG.error("error polling hazelcast msg queue", ex);
    }
}

邏輯比較簡單,構建消息體,調用server.internalPublish(publishMessage, hzMsg.getClientId());原來Server裏面的這個方法幹這個用的,我之前看到這個方法是一臉懵逼呀。說白了當收到其他的節點發過來的消息的時候,偽造成某個client的publish報文,接著往下看
這個方法內部其實調用的是ProtocolProcessor.internalPublish(msg, clientId),這個方法裏面調用的是MessagesPublisher.publish2Subscribers(toStoreMsg, topic),這個方法很眼熟,是因為在這一篇中講過,http://blog.51cto.com/13579730/2074290

因為Qos0PublishHandler,Qos1PublishHandler,Qos2PublishHandler底層都調的這個方法,至此發現集群間的消息同步,其實就是模仿的client向broker發送publish消息,或者可以理解成:
當某個broker節點收到來自一個連接到它的client發送的publish消息的時候,它不僅會分發給訂閱這個消息並且連接到它的其他clients,同時會把消息分發給集群中的其他節點,以供集群中其他節點繼續分發下去。
終於把集群講完了。後面還有一篇,講topic目錄樹的,這個系列就算完結了。哈哈

mqtt協議-broker之moqutte源碼研究六之集群