RocketMQ 命令列工具原始碼結構解析
概述
RocketMQ 提供有控制檯及一系列控制檯命令,用於管理員對主題,叢集,broker 等資訊的管理;
進入 RocketMQ 的bin 目錄,可以看到 mqadmin 指令碼檔案。

執行 mqadmin 指令碼顯示如下:

顯示了 mqadmin 命令支援的所有操作。
如果想具體查新某一個操作的詳細命令,可以使用
mqadmin help 命令名稱
比如:mqadmin help updateTopic

檢視 mqadmin指令碼

可以發現 mqadmin 的命令呼叫的是 tools 命令,設定的啟動類為 org.apache.rocketmq.tools.command.MQAdminStartup 。
tools 模組結構

MQAdminStartup 啟動類
public static void main(String[] args) { main0(args, null); } public static void main0(String[] args, RPCHook rpcHook) { System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); //PackageConflictDetect.detectFastjson(); initCommand(); try { initLogback(); switch (args.length) { case 0: printHelp(); break; case 2: if (args[0].equals("help")) { SubCommand cmd = findSubCommand(args[1]); if (cmd != null) { Options options = ServerUtil.buildCommandlineOptions(new Options()); options = cmd.buildCommandlineOptions(options); if (options != null) { ServerUtil.printCommandLineHelp("mqadmin " + cmd.commandName(), options); } } else { System.out.printf("The sub command %s not exist.%n", args[1]); } break; } case 1: default: SubCommand cmd = findSubCommand(args[0]); if (cmd != null) { String[] subargs = parseSubArgs(args); Options options = ServerUtil.buildCommandlineOptions(new Options()); final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { return; } if (commandLine.hasOption('n')) { String namesrvAddr = commandLine.getOptionValue('n'); System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr); } cmd.execute(commandLine, options, rpcHook); } else { System.out.printf("The sub command %s not exist.%n", args[0]); } break; } } catch (Exception e) { e.printStackTrace(); } }
1、首先呼叫initCommand() 方法載入所有的命令。
2、初始化日誌
3、判斷啟動該類main 方法傳入的引數。
- 3.1 如果沒有引數,則列印幫助資訊。
- 3.2 如果引數為2個,並且第一個是 help,第二個引數是initCommand() 載入的命令名稱,則呼叫 ServerUtil.printCommandLineHelp() 方法列印指定命令的幫助資訊。
- 3.3 如果參賽為一個、或2個,並且第一個引數不為 help,或多個。並且第一個參賽為 initCommand() 載入的命令,則呼叫 該initCommand() 載入類中的 execute() 方法。
cmd.execute(commandLine, options, rpcHook);
initCommand() 方法
public static void initCommand() { initCommand(new UpdateTopicSubCommand()); initCommand(new DeleteTopicSubCommand()); initCommand(new UpdateSubGroupSubCommand()); initCommand(new DeleteSubscriptionGroupCommand()); initCommand(new UpdateBrokerConfigSubCommand()); initCommand(new UpdateTopicPermSubCommand()); initCommand(new TopicRouteSubCommand()); initCommand(new TopicStatusSubCommand()); initCommand(new TopicClusterSubCommand()); initCommand(new BrokerStatusSubCommand()); initCommand(new QueryMsgByIdSubCommand()); initCommand(new QueryMsgByKeySubCommand()); initCommand(new QueryMsgByUniqueKeySubCommand()); initCommand(new QueryMsgByOffsetSubCommand()); initCommand(new PrintMessageSubCommand()); initCommand(new PrintMessageByQueueCommand()); initCommand(new SendMsgStatusCommand()); initCommand(new BrokerConsumeStatsSubCommad()); initCommand(new ProducerConnectionSubCommand()); initCommand(new ConsumerConnectionSubCommand()); initCommand(new ConsumerProgressSubCommand()); initCommand(new ConsumerStatusSubCommand()); initCommand(new CloneGroupOffsetCommand()); initCommand(new ClusterListSubCommand()); initCommand(new TopicListSubCommand()); initCommand(new UpdateKvConfigCommand()); initCommand(new DeleteKvConfigCommand()); initCommand(new WipeWritePermSubCommand()); initCommand(new ResetOffsetByTimeCommand()); initCommand(new UpdateOrderConfCommand()); initCommand(new CleanExpiredCQSubCommand()); initCommand(new CleanUnusedTopicCommand()); initCommand(new StartMonitoringSubCommand()); initCommand(new StatsAllSubCommand()); initCommand(new AllocateMQSubCommand()); initCommand(new CheckMsgSendRTCommand()); initCommand(new CLusterSendMsgRTCommand()); initCommand(new GetNamesrvConfigCommand()); initCommand(new UpdateNamesrvConfigCommand()); initCommand(new GetBrokerConfigCommand()); initCommand(new QueryConsumeQueueCommand()); initCommand(new SendMessageCommand()); initCommand(new ConsumeMessageCommand()); }
叢類名中可以看出跟上面控制檯 執行 mqadmin
指令輸出命令的名字和這裡的類名可以一一對應上。
initCommand 方法
protected static List<SubCommand> subCommandList = new ArrayList<SubCommand>(); public static void initCommand(SubCommand command) { subCommandList.add(command); }
把 init 載入到一個List集合中。
SubCommand 介面定義
所有的操作命令都實現了 SubCommand 介面
public interface SubCommand { String commandName(); String commandDesc(); Options buildCommandlineOptions(final Options options); void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException; }
1、commandName() 命令名稱
2、commandDesc()命令描述
3、buildCommandlineOptions() 構建命令解析器
4、execute() 執行命令
建立 Topic 原始碼分析
下面我們以建立 Topic 命令來分析實現原理。
updateTopic 命令是建立Topic的命令。

通過該命令可以檢視 updateTopic 支援那麼多引數。
下面我們來分析下 UpdateTopicPermSubCommand 類的實現
UpdateTopicPermSubCommand 解析
commandName()
@Override public String commandName() { return "updateTopic"; }
命令名稱
commandDesc()
@Override public String commandDesc() { return "Update or create topic"; }
命令描述
buildCommandlineOptions()
@Override public Options buildCommandlineOptions(Options options) { Option opt = new Option("b", "brokerAddr", true, "create topic to which broker"); opt.setRequired(false); options.addOption(opt); opt = new Option("c", "clusterName", true, "create topic to which cluster"); opt.setRequired(false); options.addOption(opt); opt = new Option("t", "topic", true, "topic name"); opt.setRequired(true); options.addOption(opt); opt = new Option("r", "readQueueNums", true, "set read queue nums"); opt.setRequired(false); options.addOption(opt); opt = new Option("w", "writeQueueNums", true, "set write queue nums"); opt.setRequired(false); options.addOption(opt); opt = new Option("p", "perm", true, "set topic's permission(2|4|6), intro[2:W 4:R; 6:RW]"); opt.setRequired(false); options.addOption(opt); opt = new Option("o", "order", true, "set topic's order(true|false)"); opt.setRequired(false); options.addOption(opt); opt = new Option("u", "unit", true, "is unit topic (true|false)"); opt.setRequired(false); options.addOption(opt); opt = new Option("s", "hasUnitSub", true, "has unit sub (true|false)"); opt.setRequired(false); options.addOption(opt); return options; }
從該方法中可以看到定義的命令及其說明。
execute() 方法
@Override public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException { DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); try { TopicConfig topicConfig = new TopicConfig(); topicConfig.setReadQueueNums(8); topicConfig.setWriteQueueNums(8); topicConfig.setTopicName(commandLine.getOptionValue('t').trim()); // readQueueNums if (commandLine.hasOption('r')) { topicConfig.setReadQueueNums(Integer.parseInt(commandLine.getOptionValue('r').trim())); } // writeQueueNums if (commandLine.hasOption('w')) { topicConfig.setWriteQueueNums(Integer.parseInt(commandLine.getOptionValue('w').trim())); } // perm if (commandLine.hasOption('p')) { topicConfig.setPerm(Integer.parseInt(commandLine.getOptionValue('p').trim())); } boolean isUnit = false; if (commandLine.hasOption('u')) { isUnit = Boolean.parseBoolean(commandLine.getOptionValue('u').trim()); } boolean isCenterSync = false; if (commandLine.hasOption('s')) { isCenterSync = Boolean.parseBoolean(commandLine.getOptionValue('s').trim()); } int topicCenterSync = TopicSysFlag.buildSysFlag(isUnit, isCenterSync); topicConfig.setTopicSysFlag(topicCenterSync); boolean isOrder = false; if (commandLine.hasOption('o')) { isOrder = Boolean.parseBoolean(commandLine.getOptionValue('o').trim()); } topicConfig.setOrder(isOrder); if (commandLine.hasOption('b')) { String addr = commandLine.getOptionValue('b').trim(); defaultMQAdminExt.start(); defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig); if (isOrder) { String brokerName = CommandUtil.fetchBrokerNameByAddr(defaultMQAdminExt, addr); String orderConf = brokerName + ":" + topicConfig.getWriteQueueNums(); defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(), orderConf, false); System.out.printf("%s", String.format("set broker orderConf. isOrder=%s, orderConf=[%s]", isOrder, orderConf.toString())); } System.out.printf("create topic to %s success.%n", addr); System.out.printf("%s", topicConfig); return; } else if (commandLine.hasOption('c')) { String clusterName = commandLine.getOptionValue('c').trim(); defaultMQAdminExt.start(); Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); for (String addr : masterSet) { defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig); System.out.printf("create topic to %s success.%n", addr); } if (isOrder) { Set<String> brokerNameSet = CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExt, clusterName); StringBuilder orderConf = new StringBuilder(); String splitor = ""; for (String s : brokerNameSet) { orderConf.append(splitor).append(s).append(":") .append(topicConfig.getWriteQueueNums()); splitor = ";"; } defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(), orderConf.toString(), true); System.out.printf("set cluster orderConf. isOrder=%s, orderConf=[%s]", isOrder, orderConf); } System.out.printf("%s", topicConfig); return; } ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); } catch (Exception e) { throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); } finally { defaultMQAdminExt.shutdown(); } }
從上面程式碼中可以看出,很大一部分程式碼都是解析 commandLine 引數。
解析出來的引數來填充 TopicConfig 物件。
然後呼叫 DefaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig) 方法來建立 Topic。
從上面的程式碼中可以看出 -b 和 -c 引數只能有一個生效。
-b 引數是在指定的 broker 上建立 topic
-c 是在指定的叢集上每一個 broker 建立 topic。
優先判斷的是 -b 引數,如果指定 -b 引數就會在指定的 broker 上建立,而不會在 -c 指定的叢集上建立。
其它的 SubCommand 命令的實現方式都一樣,就不一一解析了。