1. 程式人生 > >rocketmq 順序消費理解

rocketmq 順序消費理解

要實現佇列的順序消費,比如(1)下單(2)支付(3)支付

預設的傳送會隨機指定一個佇列,

SendResult result = producer.send(msg);

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}
public MessageQueue selectOneMessageQueue
(final String lastBrokerName) { if (lastBrokerName == null) { return selectOneMessageQueue(); } else { int index = this.sendWhichQueue.getAndIncrement(); for (int i = 0; i < this.messageQueueList.size(); i++) { int pos = Math.abs(index++) % this.messageQueueList
.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue(); } }

就普通的roundbin,

可以自定義一個MessageQueueSelector ,然後需要實現順序消費的放到一個佇列中就行。

public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.defaultMQProducerImpl.send(msg, selector, arg);
}