mqtt協議-broker之moqutte源碼研究六之集群
在講解moquette的集群功能之前需要講解一下moquette的攔截器,因為moquette對Hazelcast的集成本身就是通過攔截器來實現的。
一。攔截器
io.moquette.spi.impl.ProtocolProcessor類裏面有一個BrokerInterceptor類,這個類就是broker攔截器,這個對象,在processConnect,processPubAck,processPubComp,processDisconnect,processConnectionLost,processUnsubscribe,processSubscribe,processPublish等八個地方都用到了,說明在broker處理各個報文的關鍵期間都會用到,我們先看一這個類的結構
private static final Logger LOG = LoggerFactory.getLogger(BrokerInterceptor.class); private final Map<Class<?>, List<InterceptHandler>> handlers; private final ExecutorService executor; private BrokerInterceptor(int poolSize, List<InterceptHandler> handlers) { LOG.info("Initializing broker interceptor. InterceptorIds={}", getInterceptorIds(handlers)); this.handlers = new HashMap<>(); for (Class<?> messageType : InterceptHandler.ALL_MESSAGE_TYPES) { this.handlers.put(messageType, new CopyOnWriteArrayList<InterceptHandler>()); } for (InterceptHandler handler : handlers) { this.addInterceptHandler(handler); } executor = Executors.newFixedThreadPool(poolSize); }
裏面有個map,有一個,看構造方法,發現key是(InterceptConnectMessage.class, InterceptDisconnectMessage.class,
InterceptConnectionLostMessage.class, InterceptPublishMessage.class, InterceptSubscribeMessage.class,
InterceptUnsubscribeMessage.class, InterceptAcknowledgedMessage.class)其中的一個,發現總共有七個消息類,這七個消息類,剛好與上面的八個使用的地方對應上(processPubAck,processPubComp都對應InterceptAcknowledgedMessage)。value是一個list,裏面放的是InterceptHandler,即處理器。
二。HazelcastInterceptHandler
HazelcastInterceptHandler集成了AbstractInterceptHandler,AbstractInterceptHandler實現了InterceptHandler,這裏吐槽一下Java,Java要求實現一個Java的interface就要實現所有的接口,可是很多時候我就只想實現其中一個方法,如果接口裏面有10個方法結果
我的實現類裏面會出現9個空方法,顯得代碼非常不簡潔,當然,用個抽象類緩沖一下是一個辦法,但是我又得加一個類,還好Java8,有個默認方法,能夠解決這個問題。
我們跟蹤一下,看一下HazelcastInterceptHandler,是什麽時候被放進BrokerInterceptor的。
從啟動類Server裏面的main一路跟蹤到下面的代碼
public void startServer(IConfig config, List<? extends InterceptHandler> handlers, ISslContextCreator sslCtxCreator,
IAuthenticator authenticator, IAuthorizator authorizator) throws IOException {
if (handlers == null) {
handlers = Collections.emptyList();
}
LOG.info("Starting Moquette Server. MQTT message interceptors={}", getInterceptorIds(handlers));
scheduler = Executors.newScheduledThreadPool(1);
final String handlerProp = System.getProperty(BrokerConstants.INTERCEPT_HANDLER_PROPERTY_NAME);
if (handlerProp != null) {
config.setProperty(BrokerConstants.INTERCEPT_HANDLER_PROPERTY_NAME, handlerProp);
}
configureCluster(config);
final String persistencePath = config.getProperty(BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME);
LOG.info("Configuring Using persistent store file, path={}", persistencePath);
m_processorBootstrapper = new ProtocolProcessorBootstrapper();
final ProtocolProcessor processor = m_processorBootstrapper.init(config, handlers, authenticator, authorizator,
this);
LOG.info("Initialized MQTT protocol processor");
if (sslCtxCreator == null) {
LOG.warn("Using default SSL context creator");
sslCtxCreator = new DefaultMoquetteSslContextCreator(config);
}
LOG.info("Binding server to the configured ports");
m_acceptor = new NettyAcceptor();
m_acceptor.initialize(processor, config, sslCtxCreator);
m_processor = processor;
LOG.info("Moquette server has been initialized successfully");
m_initialized = true;
}
上面有三點需要註意,
1.是從啟動參數裏面獲取INTERCEPT_HANDLER_PROPERTY_NAME = "intercept.handler",放入config,說明註冊處理器,是需要通過啟動參數指定的,而且目前的版本還只能指定一個。
2.configureCluster(config);配置集群,我們暫且不看怎麽配置的。後面再講
3.通過ProtocolProcessorBootstrapper初始化ProtocolProcessor,這個類有多重要,前面幾篇文章已經講過了。它的init方法,很有意思,裏面做了很多事情,基本上就是初始化ProtocolProcessor裏面的各種對象,其中與本文要講的摘抄下來
LOG.info("Configuring message interceptors...");
List<InterceptHandler> observers = new ArrayList<>(embeddedObservers);
String interceptorClassName = props.getProperty(BrokerConstants.INTERCEPT_HANDLER_PROPERTY_NAME);
if (interceptorClassName != null && !interceptorClassName.isEmpty()) {
InterceptHandler handler = loadClass(interceptorClassName, InterceptHandler.class, Server.class, server);
if (handler != null) {
observers.add(handler);
}
}
BrokerInterceptor interceptor = new BrokBrokerInterceptor(props, observers);
這裏面就是根據classname,實例化處理器,然後創建BrokerInterceptor對象,它的狗爪方法裏面調用了這個方法addInterceptHandler
@Override
public void addInterceptHandler(InterceptHandler interceptHandler) {
Class<?>[] interceptedMessageTypes = getInterceptedMessageTypes(interceptHandler);
LOG.info("Adding MQTT message interceptor. InterceptorId={}, handledMessageTypes={}",
interceptHandler.getID(), interceptedMessageTypes);
for (Class<?> interceptMessageType : interceptedMessageTypes) {
this.handlers.get(interceptMessageType).add(interceptHandler);
}
}
private static Class<?>[] getInterceptedMessageTypes(InterceptHandler interceptHandler) {
Class<?>[] interceptedMessageTypes = interceptHandler.getInterceptedMessageTypes();
if (interceptedMessageTypes == null) {
return InterceptHandler.ALL_MESSAGE_TYPES;
}
return interceptedMessageTypes;
}
調用了interceptHandler.getInterceptedMessageTypes();說明每個InterceptHandler都實現了這個方法,即它自身必須告訴別人,它準備註冊到哪幾個事件上,即上面的七個類型。
一個處理器,可以註冊到上面七個中的一個,或多個,或全部,如果返回的是空,則默認是全部。HazelcastInterceptHandler就註冊到了所有的上面
到此我們就明白了HazelcastInterceptHandler註冊到攔截器的全過程。
三。集群間通信。
我們回答一下上面的問題,看一下configureCluster的全過程
private void configureCluster(IConfig config) throws FileNotFoundException {
LOG.info("Configuring embedded Hazelcast instance");
String interceptHandlerClassname = config.getProperty(BrokerConstants.INTERCEPT_HANDLER_PROPERTY_NAME);
if (interceptHandlerClassname == null || !HZ_INTERCEPT_HANDLER.equals(interceptHandlerClassname)) {
LOG.info("There are no Hazelcast intercept handlers. The server won‘t start a Hazelcast instance.");
return;
}
String hzConfigPath = config.getProperty(BrokerConstants.HAZELCAST_CONFIGURATION);
if (hzConfigPath != null) {
boolean isHzConfigOnClasspath = this.getClass().getClassLoader().getResource(hzConfigPath) != null;
Config hzconfig = isHzConfigOnClasspath
? new ClasspathXmlConfig(hzConfigPath)
: new FileSystemXmlConfig(hzConfigPath);
LOG.info("Starting Hazelcast instance. ConfigurationFile={}", hzconfig);
hazelcastInstance = Hazelcast.newHazelcastInstance(hzconfig);
} else {
LOG.info("Starting Hazelcast instance with default configuration");
hazelcastInstance = Hazelcast.newHazelcastInstance();
}
listenOnHazelCastMsg();
}
1.判斷是否註冊了HazelcastInterceptHandler,如果沒有直接跳出方法
2.從config裏面獲取hazelcast.configuration的位置,並且加載配置文件,同時創建HazelcastInstance實例。這個配置文件在moquette-master-improve/distribution/src/main/resources下有一個模版,也可以參考下面的
<network>
<public-address>IP1:5701</public-address>
<port>5701</port>
<join>
<multicast enabled="false" />
<tcp-ip enabled="true">
<required-member>IP2:5701</required-member>
</tcp-ip>
</join>
</network>
3.監聽HazelcastInstance,具體的監聽對象是HazelcastListener,這個類剛好和HazelcastInterceptHandler構成一對。剛好用來完成集群間的同步
先看一下HazelcastInterceptHandler,裏面只有一個方法,onPublish,即監聽事件的發生,發送消息。
然後看一下HazelcastListener,裏面也只有一個方法,onMessage,即當有集群中的其他節點發送消息給當前機器的時候,由Hazelcast,調用HazelcastListener的onMessage方法,看一下這個方法的邏輯
public void onMessage(Message<HazelcastMsg> msg) {
try {
if (!msg.getPublishingMember().equals(server.getHazelcastInstance().getCluster().getLocalMember())) {
HazelcastMsg hzMsg = msg.getMessageObject();
LOG.info("{} received from hazelcast for topic {} message: {}", hzMsg.getClientId(), hzMsg.getTopic(),
hzMsg.getPayload());
// TODO pass forward this information in somehow publishMessage.setLocal(false);
MqttQoS qos = MqttQoS.valueOf(hzMsg.getQos());
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, false, 0);
MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(hzMsg.getTopic(), 0);
ByteBuf payload = Unpooled.wrappedBuffer(hzMsg.getPayload());
MqttPublishMessage publishMessage = new MqttPublishMessage(fixedHeader, varHeader, payload);
server.internalPublish(publishMessage, hzMsg.getClientId());
}
} catch (Exception ex) {
LOG.error("error polling hazelcast msg queue", ex);
}
}
邏輯比較簡單,構建消息體,調用server.internalPublish(publishMessage, hzMsg.getClientId());原來Server裏面的這個方法幹這個用的,我之前看到這個方法是一臉懵逼呀。說白了當收到其他的節點發過來的消息的時候,偽造成某個client的publish報文,接著往下看
這個方法內部其實調用的是ProtocolProcessor.internalPublish(msg, clientId),這個方法裏面調用的是MessagesPublisher.publish2Subscribers(toStoreMsg, topic),這個方法很眼熟,是因為在這一篇中講過,http://blog.51cto.com/13579730/2074290
因為Qos0PublishHandler,Qos1PublishHandler,Qos2PublishHandler底層都調的這個方法,至此發現集群間的消息同步,其實就是模仿的client向broker發送publish消息,或者可以理解成:
當某個broker節點收到來自一個連接到它的client發送的publish消息的時候,它不僅會分發給訂閱這個消息並且連接到它的其他clients,同時會把消息分發給集群中的其他節點,以供集群中其他節點繼續分發下去。
終於把集群講完了。後面還有一篇,講topic目錄樹的,這個系列就算完結了。哈哈
mqtt協議-broker之moqutte源碼研究六之集群