1. 程式人生 > >rocketmq學習筆記 五 原始碼之rocketmq-broker

rocketmq學習筆記 五 原始碼之rocketmq-broker

/**
 * Copyright (C) 2010-2013 Alibaba Group Holding Limited
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.alibaba.rocketmq.broker;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.rocketmq.broker.client.ClientHousekeepingService;
import com.alibaba.rocketmq.broker.client.ConsumerIdsChangeListener;
import com.alibaba.rocketmq.broker.client.ConsumerManager;
import com.alibaba.rocketmq.broker.client.DefaultConsumerIdsChangeListener;
import com.alibaba.rocketmq.broker.client.ProducerManager;
import com.alibaba.rocketmq.broker.client.net.Broker2Client;
import com.alibaba.rocketmq.broker.client.rebalance.RebalanceLockManager;
import com.alibaba.rocketmq.broker.filtersrv.FilterServerManager;
import com.alibaba.rocketmq.broker.longpolling.PullRequestHoldService;
import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageHook;
import com.alibaba.rocketmq.broker.mqtrace.SendMessageHook;
import com.alibaba.rocketmq.broker.offset.ConsumerOffsetManager;
import com.alibaba.rocketmq.broker.out.BrokerOuterAPI;
import com.alibaba.rocketmq.broker.processor.AdminBrokerProcessor;
import com.alibaba.rocketmq.broker.processor.ClientManageProcessor;
import com.alibaba.rocketmq.broker.processor.EndTransactionProcessor;
import com.alibaba.rocketmq.broker.processor.PullMessageProcessor;
import com.alibaba.rocketmq.broker.processor.QueryMessageProcessor;
import com.alibaba.rocketmq.broker.processor.SendMessageProcessor;
import com.alibaba.rocketmq.broker.slave.SlaveSynchronize;
import com.alibaba.rocketmq.broker.subscription.SubscriptionGroupManager;
import com.alibaba.rocketmq.broker.topic.TopicConfigManager;
import com.alibaba.rocketmq.common.BrokerConfig;
import com.alibaba.rocketmq.common.DataVersion;
import com.alibaba.rocketmq.common.MixAll;
import com.alibaba.rocketmq.common.ThreadFactoryImpl;
import com.alibaba.rocketmq.common.TopicConfig;
import com.alibaba.rocketmq.common.UtilAll;
import com.alibaba.rocketmq.common.constant.LoggerName;
import com.alibaba.rocketmq.common.constant.PermName;
import com.alibaba.rocketmq.common.namesrv.RegisterBrokerResult;
import com.alibaba.rocketmq.common.protocol.RequestCode;
import com.alibaba.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import com.alibaba.rocketmq.remoting.RPCHook;
import com.alibaba.rocketmq.remoting.RemotingServer;
import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
import com.alibaba.rocketmq.remoting.netty.NettyRemotingServer;
import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
import com.alibaba.rocketmq.remoting.netty.NettyServerConfig;
import com.alibaba.rocketmq.store.DefaultMessageStore;
import com.alibaba.rocketmq.store.MessageStore;
import com.alibaba.rocketmq.store.config.BrokerRole;
import com.alibaba.rocketmq.store.config.MessageStoreConfig;
import com.alibaba.rocketmq.store.stats.BrokerStats;
import com.alibaba.rocketmq.store.stats.BrokerStatsManager;


/**
 * Broker各個服務控制器
 * 
 * @author shijia.wxr<
[email protected]
> * @since 2013-7-26 */ public class BrokerController { private static final Logger log = LoggerFactory.getLogger(LoggerName.BrokerLoggerName); // 伺服器配置 private final BrokerConfig brokerConfig; // 通訊層配置 private final NettyServerConfig nettyServerConfig; private final NettyClientConfig nettyClientConfig; // 儲存層配置 private final MessageStoreConfig messageStoreConfig; // 配置檔案版本號 private final DataVersion configDataVersion = new DataVersion(); // 消費進度儲存 private final ConsumerOffsetManager consumerOffsetManager; // Consumer連線、訂閱關係管理 private final ConsumerManager consumerManager; // Producer連線管理 private final ProducerManager producerManager; // 檢測所有客戶端連線 private final ClientHousekeepingService clientHousekeepingService; private final PullMessageProcessor pullMessageProcessor; private final PullRequestHoldService pullRequestHoldService; // Broker主動呼叫Client private final Broker2Client broker2Client; // 訂閱組配置管理 private final SubscriptionGroupManager subscriptionGroupManager; // 訂閱組內成員發生變化,立刻通知所有成員 private final ConsumerIdsChangeListener consumerIdsChangeListener; // 管理佇列的鎖分配 private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager(); // Broker的通訊層客戶端 private final BrokerOuterAPI brokerOuterAPI; private final ScheduledExecutorService scheduledExecutorService = Executors .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("BrokerControllerScheduledThread")); // Slave定期從Master同步資訊 private final SlaveSynchronize slaveSynchronize; // 儲存層物件 private MessageStore messageStore; // 通訊層物件 private RemotingServer remotingServer; // Topic配置 private TopicConfigManager topicConfigManager; // 處理髮送訊息執行緒池 private ExecutorService sendMessageExecutor; // 處理拉取訊息執行緒池 private ExecutorService pullMessageExecutor; // 處理管理Broker執行緒池 private ExecutorService adminBrokerExecutor; // 處理管理Client執行緒池 private ExecutorService clientManageExecutor; // 是否需要定期更新HA Master地址 private boolean updateMasterHAServerAddrPeriodically = false; private BrokerStats brokerStats; // 對訊息寫入進行流控 private final BlockingQueue<Runnable> sendThreadPoolQueue; // 對訊息讀取進行流控 private final BlockingQueue<Runnable> pullThreadPoolQueue; // FilterServer管理 private final FilterServerManager filterServerManager; private final BrokerStatsManager brokerStatsManager; public BrokerController(// final BrokerConfig brokerConfig, // final NettyServerConfig nettyServerConfig, // final NettyClientConfig nettyClientConfig, // final MessageStoreConfig messageStoreConfig // ) { this.brokerConfig = brokerConfig; this.nettyServerConfig = nettyServerConfig; this.nettyClientConfig = nettyClientConfig; this.messageStoreConfig = messageStoreConfig; this.consumerOffsetManager = new ConsumerOffsetManager(this); this.topicConfigManager = new TopicConfigManager(this); this.pullMessageProcessor = new PullMessageProcessor(this); this.pullRequestHoldService = new PullRequestHoldService(this); this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this); this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener); this.producerManager = new ProducerManager(); this.clientHousekeepingService = new ClientHousekeepingService(this); this.broker2Client = new Broker2Client(this); this.subscriptionGroupManager = new SubscriptionGroupManager(this); this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig); this.filterServerManager = new FilterServerManager(this); if (this.brokerConfig.getNamesrvAddr() != null) { this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr()); log.info("user specfied name server address: {}", this.brokerConfig.getNamesrvAddr()); } this.slaveSynchronize = new SlaveSynchronize(this); this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity()); this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity()); this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName()); } public boolean initialize() { boolean result = true; // 載入Topic配置 result = result && this.topicConfigManager.load(); // 載入Consumer Offset result = result && this.consumerOffsetManager.load(); // 載入Consumer subscription result = result && this.subscriptionGroupManager.load(); // 初始化儲存層 if (result) { try { this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager); } catch (IOException e) { result = false; e.printStackTrace(); } } // 載入本地訊息資料 result = result && this.messageStore.load(); if (result) { // 初始化通訊層 this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService); // 初始化執行緒池 this.sendMessageExecutor = new ThreadPoolExecutor(// this.brokerConfig.getSendMessageThreadPoolNums(),// this.brokerConfig.getSendMessageThreadPoolNums(),// 1000 * 60,// TimeUnit.MILLISECONDS,// this.sendThreadPoolQueue,// new ThreadFactoryImpl("SendMessageThread_")); this.pullMessageExecutor = new ThreadPoolExecutor(// this.brokerConfig.getPullMessageThreadPoolNums(),// this.brokerConfig.getPullMessageThreadPoolNums(),// 1000 * 60,// TimeUnit.MILLISECONDS,// this.pullThreadPoolQueue,// new ThreadFactoryImpl("PullMessageThread_")); this.adminBrokerExecutor = Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl("AdminBrokerThread_")); this.clientManageExecutor = Executors.newFixedThreadPool(this.brokerConfig.getClientManageThreadPoolNums(), new ThreadFactoryImpl("ClientManageThread_")); this.registerProcessor(); this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore); // 每天凌晨00:00:00統計訊息量 final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis(); final long period = 1000 * 60 * 60 * 24; this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.getBrokerStats().record(); } catch (Exception e) { log.error("schedule record error.", e); } } }, initialDelay, period, TimeUnit.MILLISECONDS); // 定時刷消費進度 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.consumerOffsetManager.persist(); } catch (Exception e) { log.error("schedule persist consumerOffset error.", e); } } }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS); // 定時刪除非常落後的消費進度,10分鐘掃描一次 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.consumerOffsetManager.scanUnsubscribedTopic(); } catch (Exception e) { log.error("schedule scanUnsubscribedTopic error.", e); } } }, 10, 60, TimeUnit.MINUTES); // 先獲取Name Server地址 if (this.brokerConfig.getNamesrvAddr() != null) { this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr()); } // 定時獲取Name Server地址 else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.brokerOuterAPI.fetchNameServerAddr(); } catch (Exception e) { log.error("ScheduledTask fetchNameServerAddr exception", e); } } }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); } // 如果是slave if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) { this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress()); this.updateMasterHAServerAddrPeriodically = false; } else { this.updateMasterHAServerAddrPeriodically = true; } // Slave定時從Master同步配置資訊 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.slaveSynchronize.syncAll(); } catch (Exception e) { log.error("ScheduledTask syncAll slave exception", e); } } }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); } // 如果是Master,增加統計日誌 else { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.printMasterAndSlaveDiff(); } catch (Exception e) { log.error("schedule printMasterAndSlaveDiff error.", e); } } }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); } } return result; } public void registerProcessor() { /** * SendMessageProcessor */ SendMessageProcessor sendProcessor = new SendMessageProcessor(this); sendProcessor.registerSendMessageHook(sendMessageHookList); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor); /** * PullMessageProcessor */ this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor); this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList); /** * QueryMessageProcessor */ NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this); this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.pullMessageExecutor); this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.pullMessageExecutor); /** * ClientManageProcessor */ ClientManageProcessor clientProcessor = new ClientManageProcessor(this); clientProcessor.registerConsumeMessageHook(this.consumeMessageHookList); this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor); this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor); this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, clientProcessor, this.clientManageExecutor); /** * Offset儲存更新轉移到ClientProcessor處理 */ this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, clientProcessor, this.clientManageExecutor); this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, clientProcessor, this.clientManageExecutor); /** * EndTransactionProcessor */ this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor); /** * Default */ AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this); this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor); } public Broker2Client getBroker2Client() { return broker2Client; } public BrokerConfig getBrokerConfig() { return brokerConfig; } public String getConfigDataVersion() { return this.configDataVersion.toJson(); } public ConsumerManager getConsumerManager() { return consumerManager; } public ConsumerOffsetManager getConsumerOffsetManager() { return consumerOffsetManager; } public MessageStore getMessageStore() { return messageStore; } public void setMessageStore(MessageStore messageStore) { this.messageStore = messageStore; } public MessageStoreConfig getMessageStoreConfig() { return messageStoreConfig; } public NettyServerConfig getNettyServerConfig() { return nettyServerConfig; } public ProducerManager getProducerManager() { return producerManager; } public PullMessageProcessor getPullMessageProcessor() { return pullMessageProcessor; } public PullRequestHoldService getPullRequestHoldService() { return pullRequestHoldService; } public RemotingServer getRemotingServer() { return remotingServer; } public void setRemotingServer(RemotingServer remotingServer) { this.remotingServer = remotingServer; } public SubscriptionGroupManager getSubscriptionGroupManager() { return subscriptionGroupManager; } public void shutdown() { if (this.brokerStatsManager != null) { this.brokerStatsManager.shutdown(); } if (this.clientHousekeepingService != null) { this.clientHousekeepingService.shutdown(); } if (this.pullRequestHoldService != null) { this.pullRequestHoldService.shutdown(); } if (this.remotingServer != null) { this.remotingServer.shutdown(); } if (this.messageStore != null) { this.messageStore.shutdown(); } this.scheduledExecutorService.shutdown(); try { this.scheduledExecutorService.awaitTermination(5000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } this.unregisterBrokerAll(); if (this.sendMessageExecutor != null) { this.sendMessageExecutor.shutdown(); } if (this.pullMessageExecutor != null) { this.pullMessageExecutor.shutdown(); } if (this.adminBrokerExecutor != null) { this.adminBrokerExecutor.shutdown(); } if (this.brokerOuterAPI != null) { this.brokerOuterAPI.shutdown(); } this.consumerOffsetManager.persist(); if (this.filterServerManager != null) { this.filterServerManager.shutdown(); } } private void unregisterBrokerAll() { this.brokerOuterAPI.unregisterBrokerAll(// this.brokerConfig.getBrokerClusterName(), // this.getBrokerAddr(), // this.brokerConfig.getBrokerName(), // this.brokerConfig.getBrokerId()); } public String getBrokerAddr() { String addr = this.brokerConfig.getBrokerIP1() + ":" + this.nettyServerConfig.getListenPort(); return addr; } public void start() throws Exception { if (this.messageStore != null) { this.messageStore.start(); } if (this.remotingServer != null) { this.remotingServer.start(); } if (this.brokerOuterAPI != null) { this.brokerOuterAPI.start(); } if (this.pullRequestHoldService != null) { this.pullRequestHoldService.start(); } if (this.clientHousekeepingService != null) { this.clientHousekeepingService.start(); } if (this.filterServerManager != null) { this.filterServerManager.start(); } // 啟動時,強制註冊 this.registerBrokerAll(true); // 定時註冊Broker到Name Server this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.registerBrokerAll(true); } catch (Exception e) { log.error("registerBrokerAll Exception", e); } } }, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS); if (this.brokerStatsManager != null) { this.brokerStatsManager.start(); } // 刪除多餘的Topic this.addDeleteTopicTask(); } public synchronized void registerBrokerAll(final boolean checkOrderConfig) { TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper(); // 同步 Broker 讀寫許可權 if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>(topicConfigWrapper.getTopicConfigTable()); for (TopicConfig topicConfig : topicConfigTable.values()) { topicConfig.setPerm(this.getBrokerConfig().getBrokerPermission()); } topicConfigWrapper.setTopicConfigTable(topicConfigTable); } RegisterBrokerResult registerBrokerResult = this.brokerOuterAPI.registerBrokerAll(// this.brokerConfig.getBrokerClusterName(), // this.getBrokerAddr(), // this.brokerConfig.getBrokerName(), // this.brokerConfig.getBrokerId(), // this.getHAServerAddr(), // topicConfigWrapper,// this.filterServerManager.buildNewFilterServerList()// ); if (registerBrokerResult != null) { if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) { this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr()); } this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr()); // 檢查 topic config 的順序訊息配置 if (checkOrderConfig) { this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable()); } } } public TopicConfigManager getTopicConfigManager() { return topicConfigManager; } public void setTopicConfigManager(TopicConfigManager topicConfigManager) { this.topicConfigManager = topicConfigManager; } public String getHAServerAddr() { String addr = this.brokerConfig.getBrokerIP2() + ":" + this.messageStoreConfig.getHaListenPort(); return addr; } public void updateAllConfig(Properties properties) { MixAll.properties2Object(properties, brokerConfig); MixAll.properties2Object(properties, nettyServerConfig); MixAll.properties2Object(properties, nettyClientConfig); MixAll.properties2Object(properties, messageStoreConfig); this.configDataVersion.nextVersion(); this.flushAllConfig(); } private void flushAllConfig() { String allConfig = this.encodeAllConfig(); try { MixAll.string2File(allConfig, BrokerPathConfigHelper.getBrokerConfigPath()); log.info("flush broker config, {} OK", BrokerPathConfigHelper.getBrokerConfigPath()); } catch (IOException e) { log.info("flush broker config Exception, " + BrokerPathConfigHelper.getBrokerConfigPath(), e); } } public String encodeAllConfig() { StringBuilder sb = new StringBuilder(); { Properties properties = MixAll.object2Properties(this.brokerConfig); if (properties != null) { sb.append(MixAll.properties2String(properties)); } else { log.error("encodeAllConfig object2Properties error"); } } { Properties properties = MixAll.object2Properties(this.messageStoreConfig); if (properties != null) { sb.append(MixAll.properties2String(properties)); } else { log.error("encodeAllConfig object2Properties error"); } } { Properties properties = MixAll.object2Properties(this.nettyServerConfig); if (properties != null) { sb.append(MixAll.properties2String(properties)); } else { log.error("encodeAllConfig object2Properties error"); } } { Properties properties = MixAll.object2Properties(this.nettyClientConfig); if (properties != null) { sb.append(MixAll.properties2String(properties)); } else { log.error("encodeAllConfig object2Properties error"); } } return sb.toString(); } public RebalanceLockManager getRebalanceLockManager() { return rebalanceLockManager; } public SlaveSynchronize getSlaveSynchronize() { return slaveSynchronize; } public BrokerOuterAPI getBrokerOuterAPI() { return brokerOuterAPI; } public ExecutorService getPullMessageExecutor() { return pullMessageExecutor; } public void setPullMessageExecutor(ExecutorService pullMessageExecutor) { this.pullMessageExecutor = pullMessageExecutor; } public BrokerStats getBrokerStats() { return brokerStats; } public void setBrokerStats(BrokerStats brokerStats) { this.brokerStats = brokerStats; } public BlockingQueue<Runnable> getSendThreadPoolQueue() { return sendThreadPoolQueue; } public FilterServerManager getFilterServerManager() { return filterServerManager; } public BrokerStatsManager getBrokerStatsManager() { return brokerStatsManager; } private void printMasterAndSlaveDiff() { long diff = this.messageStore.slaveFallBehindMuch(); // XXX: warn and notify me log.info("slave fall behind master, how much, {} bytes", diff); } public void addDeleteTopicTask() { // 5分鐘後,嘗試刪除topic this.scheduledExecutorService.schedule(new Runnable() { @Override public void run() { int removedTopicCnt = BrokerController.this.messageStore.cleanUnusedTopic(BrokerController.this .getTopicConfigManager().getTopicConfigTable().keySet()); log.info("addDeleteTopicTask removed topic count {}", removedTopicCnt); } }, 5, TimeUnit.MINUTES); } // 註冊傳送訊息軌跡 hook private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>(); public void registerSendMessageHook(final SendMessageHook hook) { this.sendMessageHookList.add(hook); log.info("register SendMessageHook Hook, {}", hook.hookName()); } // 註冊消費訊息軌跡 hook private final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>(); public void registerConsumeMessageHook(final ConsumeMessageHook hook) { this.consumeMessageHookList.add(hook); log.info("register ConsumeMessageHook Hook, {}", hook.hookName()); } public void registerServerRPCHook(RPCHook rpcHook) { getRemotingServer().registerRPCHook(rpcHook); } public void registerClientRPCHook(RPCHook rpcHook) { this.getBrokerOuterAPI().registerRPCHook(rpcHook); } }


相關推薦

rocketmq學習筆記 原始碼rocketmq-broker

/** * Copyright (C) 2010-2013 Alibaba Group Holding Limited * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this

rocketmq學習筆記 原始碼rocketmq-store

因為broker東西比較多,所以放到最後。今天來學習下 rocketmq-store 核心流程 問題 1.看看訊息如何做的持久化 2.看看如何做的主從同步 config 儲存的配置資訊 public enum BrokerRole { ASYNC_MASTE

rocketmq學習筆記 六 流程存訊息

private RemotingCommand sendMessage(final ChannelHandlerContext ctx, // final RemotingCommand request, //

rocketmq學習筆記 六 流程取訊息

前面一章學習了,客戶端拉訊息的流程,那麼broker端得到RemotingCommand 之後怎麼操作呢? 請聽本文的分析 流程圖 核心程式碼 PullMessageProcessor private RemotingCommand processReques

RocketMQ學習筆記(1)----RocketMQ的簡介

1. 什麼是RecketMQ?         是一個佇列模型的訊息中介軟體,具有高效能、高可靠、高實時、分散式特點。  Producer、Consumer、佇列都可以分散式。   Producer 吐一些佇列輪流収送訊息,佇列集合稱為Topic,Consumer 如果做廣播消費,則一個con

RocketMQ學習筆記(4)----RocketMQ搭建雙Master叢集

前面已經學習了RockeMQ的四種叢集方式,接下來就來搭建一個雙Master(2m)的叢集環境。 1. 雙Master伺服器環境 序號    ip      使用者名稱    密碼        角色            模式 (1)   47.105.145.123  root          n

RocketMQ學習筆記(5)----RocketMQ監控平臺rocketmq-console-ng的搭建

1. 下載rocketmq-console-ng   官網地址:https://github.com/apache/rocketmq-externals      拉下來之後,使用idea開啟rocketmq-console,在application.properties配置檔案中修改rocketmq.

RocketMQ學習筆記(14)----RocketMQ的去重策略

1. Exactly Only Once   (1). 傳送訊息階段,不允許傳送重複的訊息   (2). 消費訊息階段,不允許消費重複的訊息。   只有以上兩個條件都滿足情況下,才能認為訊息是“Exactly Only Once”,而要實現以上兩點,在分散式系統環   境下,不可避免要產生巨大的開銷。

RocketMQ學習筆記(16)----RocketMQ搭建雙主雙從(非同步複製)叢集

1. 修改RocketMQ預設啟動埠   由於只有兩臺機器,部署雙主雙從需要四個節點,所以只能修改rocketmq的預設啟動埠,從官網下載rocketmq的source檔案,解壓後使用idea開啟,全域性搜尋9876,將所有使用9876埠的地方改為9877。   在終端開啟,使用:mvn -Preleas

RocketMQ學習筆記【DefaultMQPushConsumer使用與流程原理分析】

版本:        <dependency>        <groupId>org.apache.rocketmq</groupId>   &

RocketMQ學習筆記【DefaultMQPushConsumer流量控制】

上一節我們已經把DefaultMQPushConsumer的大體流程分析了一下,從這節開始我們分析一部分訊息處理的細節問題。 繼續在DefaultMQPushConsumerImpl的pullMessage方法中有個ProcessQueue,待會我們來分析這個佇列的作用。 public voi

RocketMQ學習筆記【DefaultMQPullConsumer使用與流程簡單分析】

我們首先看下DefaultMQPullConsumer使用例子: package com.swk.springboot.rocketmq; import java.util.HashMap; import java.util.List; import java.util.Map; import

Java學習筆記——設計模式.工廠方法

strong scanner multipl 石頭 simple 決定 定義 opera 下使用 水邊一只青蛙在笑       ——石頭和水 工廠方法模式(Factory Method),定義了一個用於創建對象的接口,讓實現類決定實例化哪一個類。工廠方法使一個類的實

RabbitMQ學習筆記:RabbitMQ優先級消息隊列

-c virtual 調用 itl 3.5 rri color images 執行順序 RabbitMQ優先級隊列註意點: 1、只有當消費者不足,不能及時進行消費的情況下,優先級隊列才會生效 2、RabbitMQ3.5以後才支持優先級隊列 代碼在博客:RabbitMQ學習筆

JSP學習筆記application和page物件

這篇部落格將9大物件的剩下的物件部分全部講掉。 E、Application內建物件 Application物件直接包裝了servlet的ServletContext類的物件,是javax.servlet.ServletContext 類的例項。這個物件在JSP頁面的整個生命週

RocketMQ學習筆記(一)eclipse版的quickstart

學而時習之,不亦說乎! 自己搭建個學習用的RocketMQ總是很麻煩,需要虛擬機器環境,網路,需要安裝rocketmq,啟動。時間久了再去看,又不知道這個虛擬機器是幹嘛的了。 直接在eclipse中啟動,簡單省事,用的時候啟動,用完關掉就行了。 先看下圖,給還沒開始的你一點信心。  

【js操作dom物件學習筆記事件冒泡、location物件、history物件、定時器】

1.總結addEventListener()和attachEvent()的區別      相同點:都可以為元素繫結事件   不同點:1.方法名不一樣          2.引數的個數不一樣addEventListener三個引數,attachEvent兩個引數        

Zookeeper學習筆記監控與通知

監控與通知 Zookeeper通常以遠端服務的方式被訪問,如果每次訪問znode時,客戶端都需要獲得節點中的內容。這樣的代價就非常大。因為這樣會導致更高的延遲,而且Zookeeper需要做更多的操作,例如下圖中,第二次呼叫getChildren /tasks返回了相同的值,

opencv學習筆記十九:影象融合背景替換

以證件照為例,圖片中有大部分為背景,先用kmeans對影象進行分割,可以得到背景的標籤,然後將影象分為前景和背景兩部分,非背景的都當作前景,顯示kmeans分割後的影象dst,將原影象前景賦給dst, 背景都設為0,得到kmeans分割後的影象如下,可看到邊緣處有一些小藍邊,

多執行緒學習筆記讀寫鎖實現分析

目錄 簡介 讀寫狀態 讀鎖計數器 共享鎖的獲取 tryAcquireShared(int unused) doAcquireShared(int arg) 共享鎖的釋放 tryReleaseShared(int unus