1. 程式人生 > >RocketMq獲取消費資訊

RocketMq獲取消費資訊

        這幾天專案需要檢視mq的消費情況,阿里開源的mq-console介面挺好的,但是裡面許可權太大了,所以就琢磨著自己開發介面獲取mq消費資訊。話不多說,貼出程式碼

long timeout = 1000 * 3;
String topic = "market_test_topic";
String group = "group_name";
String nameAddr="mq的nameAddr";
//MQClientManager建立MQClientInstance,需要傳入一個ClientConfig。
//因為DefaultMQProducer繼承了ClientConfig,所以這裡直接建立了DefaultMQProducer 
DefaultMQProducer producer = new DefaultMQProducer(group);
producer.setNamesrvAddr(nameAddr);
MQClientInstance mqClient = MQClientManager.getInstance().getAndCreateMQClientInstance(producer);
//這個start(),花費了我好長時間才意識到需要呼叫
mqClient.start();
//扣了rocketmq-tools的原始碼找到這個類
MQClientAPIImpl api = mqClient.getMQClientAPIImpl();
TopicRouteData routeData = api.getTopicRouteInfoFromNameServer(topic, timeout);
List<BrokerData> brokerDatas = routeData.getBrokerDatas();
List<MqBrokerConsumeInfo> consumeInfoList=new ArrayList<>();
long totalBrokerOff=0l,totalConsumerOff=0l,totalDiffOff=0l;
for (BrokerData brokerData : brokerDatas) {
    MqBrokerConsumeInfo consumeInfo=new MqBrokerConsumeInfo();
    String addr = brokerData.selectBrokerAddr();
    ConsumeStats consumeStats = api.getConsumeStats(addr, group, timeout);
    totalDiffOff+=consumeStats.computeTotalDiff();
    long brokerOffset=0l,consumerOffset=0l;
    Collection<OffsetWrapper> values = consumeStats.getOffsetTable().values();
    for(OffsetWrapper wrapper:values){
        brokerOffset+=wrapper.getBrokerOffset();
        totalBrokerOff+=wrapper.getBrokerOffset();
        consumerOffset+=wrapper.getConsumerOffset();
        totalConsumerOff+=wrapper.getConsumerOffset();
    }
    consumeInfo.setBroker(addr);
    consumeInfo.setDiffTotal(consumeStats.computeTotalDiff());
    consumeInfo.setBrokerOffset(brokerOffset);
    consumeInfo.setConsumerOffset(consumerOffset);
    consumeInfoList.add(consumeInfo);
}
MqConsumeInfo consumeInfo=new MqConsumeInfo();
consumeInfo.setTopic(topic);
consumeInfo.setGroup(group);
consumeInfo.setTotalBrokerOffset(totalBrokerOff);
consumeInfo.setTotalConsumerOffset(totalConsumerOff);
consumeInfo.setTotalDiffTotal(totalDiffOff);
consumeInfo.setBrokerConsumeInfoList(consumeInfoList);
System.out.println(JSON.toJSONString(consumeInfo));