1. 程式人生 > >RocketMQ有序訊息研究

RocketMQ有序訊息研究

在訂閱訊息的時候,有時我們希望訊息能按照一定業務順序消費,比如一個訂單建立,訂單修改,訂單完成。這時候是需要順序訊息。RocketMQ支援順序消費,下面來研究一下實現邏輯。

樣例

生產者

public class OrderedProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        MQProducer producer = new DefaultMQProducer("example_group_name"
); //Launch the instance. producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 100; i++) { int orderId = i % 10; //Create a message instance, specifying topic, tag and message body. Message msg = new
Message("TopicTest", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int
index = id % mqs.size(); return mqs.get(index); } }, orderId); System.out.printf("%s%n", sendResult); } //server shutdown producer.shutdown(); } }

消費者

public class OrderedConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicTest", "TagA || TagC || TagD");

        consumer.registerMessageListener(new MessageListenerOrderly() {
            AtomicLong consumeTimes = new AtomicLong(0);
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
                                                      ConsumeOrderlyContext context) {
                Random random = new Random();

                context.setAutoCommit(false);

                System.out.print(Thread.currentThread().getName() + " Receive New Messages: " );
                for (MessageExt msg: msgs) {
                    System.out.println("topic=" + msg.getTopic() + ",tags=" + msg.getTags() +  ", content:" + new String(msg.getBody()));
                }

                return ConsumeOrderlyStatus.SUCCESS;

            }
        });

        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

輸出如下,從結果來看,雖然消費端建立了多個執行緒消費,但是從不同tag來看都是有序的。

ConsumeMessageThread_16 Receive New Messages: topic=TopicTest,tags=TagA, content:Hello RocketMQ 40
ConsumeMessageThread_16 Receive New Messages: topic=TopicTest,tags=TagD, content:Hello RocketMQ 48
ConsumeMessageThread_16 Receive New Messages: topic=TopicTest,tags=TagA, content:Hello RocketMQ 50
ConsumeMessageThread_16 Receive New Messages: topic=TopicTest,tags=TagD, content:Hello RocketMQ 58
ConsumeMessageThread_16 Receive New Messages: topic=TopicTest,tags=TagA, content:Hello RocketMQ 60
ConsumeMessageThread_16 Receive New Messages: topic=TopicTest,tags=TagD, content:Hello RocketMQ 68
ConsumeMessageThread_16 Receive New Messages: topic=TopicTest,tags=TagA, content:Hello RocketMQ 70
ConsumeMessageThread_16 Receive New Messages: topic=TopicTest,tags=TagD, content:Hello RocketMQ 78
ConsumeMessageThread_16 Receive New Messages: topic=TopicTest,tags=TagA, content:Hello RocketMQ 80
ConsumeMessageThread_16 Receive New Messages: topic=TopicTest,tags=TagD, content:Hello RocketMQ 88
ConsumeMessageThread_16 Receive New Messages: topic=TopicTest,tags=TagA, content:Hello RocketMQ 90
ConsumeMessageThread_16 Receive New Messages: topic=TopicTest,tags=TagD, content:Hello RocketMQ 98
...

原始碼分析

生產者

生產者程式碼不能難看,在傳送訊息的時候,進行了orderId進行雜湊,讓同一個orderId每次傳送到同一個佇列,保證同一個佇列單個執行緒消費肯定是有序的。

消費者

首先了解一些概念
MessageQueue是邏輯佇列,包含topic和brokerName以及queueId。通過MessageQueue可以中broker定位到訊息佇列。

public class MessageQueue implements Comparable<MessageQueue>, Serializable {
    private static final long serialVersionUID = 6191200464116433425L;
    private String topic;
    private String brokerName;
    private int queueId;

PullRequest由MessageQueue和ProcessQueue。

public class PullRequest {
    private String consumerGroup;
    private MessageQueue messageQueue;
    private ProcessQueue processQueue;
    private long nextOffset;

ProcessQueue是消費端實際的消費載體。

/**
* Queue consumption snapshot
* 
* @author shijia.wxr<[email protected]>
* @since 2013-7-24
*/
public class ProcessQueue {
    public final static long RebalanceLockMaxLiveTime = Long.parseLong(System.getProperty(
        "rocketmq.client.rebalance.lockMaxLiveTime", "30000"));
    public final static long RebalanceLockInterval = Long.parseLong(System.getProperty(
        "rocketmq.client.rebalance.lockInterval", "20000"));

    private final Logger log = ClientLogger.getLog();
    private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
    private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
    private volatile long queueOffsetMax = 0L;
    private final AtomicLong msgCount = new AtomicLong();

入口

消費者內部實現比較複雜,實現邏輯在於ConsumeMessageOrderlyService。這個在註冊MessageListenerOrderly會生成。

            if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                this.consumeOrderly = true;
                this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this,
                            (MessageListenerOrderly) this.getMessageListenerInner());
            }
            else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                this.consumeOrderly = false;
                this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this,
                            (MessageListenerConcurrently) this.getMessageListenerInner());
            }

            this.consumeMessageService.start();

在consumer.start()後會建立了定時任務,預設每隔20秒向broker鎖住當前消費端的消費佇列MessageQueue。保證其他消費端不能消費這些佇列訊息。

    public void start() {
        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl
            .messageModel())) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    ConsumeMessageOrderlyService.this.lockMQPeriodically();
                }
            }, 1000 * 1, ProcessQueue.RebalanceLockInterval, TimeUnit.MILLISECONDS);
        }
    }


public void lockAll() {
        HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();

        Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, Set<MessageQueue>> entry = it.next();
            final String brokerName = entry.getKey();
            final Set<MessageQueue> mqs = entry.getValue();

            if (mqs.isEmpty())
                continue;

            FindBrokerResult findBrokerResult =
                    this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
            if (findBrokerResult != null) {
                LockBatchRequestBody requestBody = new LockBatchRequestBody();
                requestBody.setConsumerGroup(this.consumerGroup);
                requestBody.setClientId(this.mQClientFactory.getClientId());
                requestBody.setMqSet(mqs);

                try {
                    Set<MessageQueue> lockOKMQSet =
                            this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(
                                    findBrokerResult.getBrokerAddr(), requestBody, 1000);

                    for (MessageQueue mq : lockOKMQSet) {
                        ProcessQueue processQueue = this.processQueueTable.get(mq);
                        if (processQueue != null) {
                            if (!processQueue.isLocked()) {
                                log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);
                            }

                            processQueue.setLocked(true);
                            processQueue.setLastLockTimestamp(System.currentTimeMillis());
                        }
                    }
                    for (MessageQueue mq : mqs) {
                        if (!lockOKMQSet.contains(mq)) {
                            ProcessQueue processQueue = this.processQueueTable.get(mq);
                            if (processQueue != null) {
                                processQueue.setLocked(false);
                                log.warn("the message queue locked Failed, Group: {} {}", this.consumerGroup,
                                        mq);
                            }
                        }
                    }
                } catch (Exception e) {
                    log.error("lockBatchMQ exception, " + mqs, e);
                }
            }
        }
    }  

拉取訊息

PullMessageService負責不斷地把訊息拉過來消費。簡單從pullRequestQueue取出來PullRequest,執行拉取操作。PullRequest由DefaultMQPushConsumerImpl推過來放入pullRequestQueue中。

    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStoped()) {
            try {
                PullRequest pullRequest = this.pullRequestQueue.take();
                if (pullRequest != null) {
                    this.pullMessage(pullRequest);
                }
            }
            catch (InterruptedException e) {
            }
            catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }

        log.info(this.getServiceName() + " service end");
    }

負載均衡

RebalanceImpl定時10秒鐘負載權衡,傳送PullRequest,向broker請求拉取訊息。負載幾個就建立幾個PullRequest。

        List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
        for (MessageQueue mq : mqSet) {
            if (!this.processQueueTable.containsKey(mq)) {
                PullRequest pullRequest = new PullRequest();
                pullRequest.setConsumerGroup(consumerGroup);
                pullRequest.setMessageQueue(mq);
                pullRequest.setProcessQueue(new ProcessQueue());

                long nextOffset = this.computePullFromWhere(mq);
                if (nextOffset >= 0) {
                    pullRequest.setNextOffset(nextOffset);
                    pullRequestList.add(pullRequest);
                    changed = true;
                    this.processQueueTable.put(mq, pullRequest.getProcessQueue());
                    log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                } else {
                    log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                }
            }
        }

        this.dispatchPullRequest(pullRequestList);

RebalancePushImpl分發

    public void dispatchPullRequest(List<PullRequest> pullRequestList) {
        for (PullRequest pullRequest : pullRequestList) {
            this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
            log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
        }
    }

通過PullMessageService把PullRequest傳送出去

    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStoped()) {
            try {
                PullRequest pullRequest = this.pullRequestQueue.take();
                if (pullRequest != null) {
                    this.pullMessage(pullRequest);
                }
            }
            catch (InterruptedException e) {
            }
            catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }

        log.info(this.getServiceName() + " service end");
    }

拉取訊息

broker在收到PullRequest後會,響應生成PullResult返回給消費端。通過PullCallback回撥消費訊息。在callback中,DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest提交任務非同步消費。然後將pullRequest中佇列偏移量offset設定為PullRequest回傳後的偏移量,繼續傳送PullRequest拉取訊息。

    PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    pullResult =
                            DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(
                                pullRequest.getMessageQueue(), pullResult, subscriptionData);

                    switch (pullResult.getPullStatus()) {
                    case FOUND:
                        long prevRequestOffset = pullRequest.getNextOffset();
                        pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                        long pullRT = System.currentTimeMillis() - beginTimestamp;
                        DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(
                            pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT);

                        long firstMsgOffset = Long.MAX_VALUE;
                        if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                        }
                        else {
                            firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(
                                pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(),
                                pullResult.getMsgFoundList().size());

                            boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                            DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
                                pullResult.getMsgFoundList(), //
                                processQueue, //
                                pullRequest.getMessageQueue(), //
                                dispathToConsume);

                            if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                    DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                            }
                            else {
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            }
                        }

                        if (pullResult.getNextBeginOffset() < prevRequestOffset//
                                || firstMsgOffset < prevRequestOffset) {
                            log.warn(
                                "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",//
                                pullResult.getNextBeginOffset(),//
                                firstMsgOffset,//
                                prevRequestOffset);
                        }

                        break;

消費訊息

通過建立ConsumeRequest任務消費。每次都消費那些被鎖定的佇列。

每次獲取鎖,保證消費端只有一個執行緒消費。這樣保證消費有序的。而且只有訊息佇列被鎖定才能消費。

      public void run() {
            if (this.processQueue.isDropped()) {
                log.warn("run, the message queue not be able to consume, because it's dropped. {}",
                    this.messageQueue);
                return;
            }

            final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
            synchronized (objLock) {
                if (MessageModel.BROADCASTING
                    .equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                        || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) 
                        if (MessageModel.CLUSTERING
                            .equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl
                                .messageModel())
                                && !this.processQueue.isLocked()) {
                            log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
                            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue,
                                this.processQueue, 10);
                            break;
                        }

                        if (MessageModel.CLUSTERING
                            .equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl
                                .messageModel())
                                && this.processQueue.isLockExpired()) {
                            log.warn("the message queue lock expired, so consume later, {}",
                                this.messageQueue);
                            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue,
                                this.processQueue, 10);
                            break;
                        }
                ...
                }                          

呼叫processQueue.takeMessags拿出訊息,呼叫使用者端MessageListenerOrderly消費訊息

                        List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
...

                            try {
                                this.processQueue.getLockConsume().lock();
                                if (this.processQueue.isDropped()) {
                                    log.warn(
                                        "consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                                        this.messageQueue);
                                    break;
                                }

                                status =
                                        messageListener.consumeMessage(Collections.unmodifiableList(msgs),
                                            context);
                            }

總結

保證訊息順序消費有兩方面:

  • 1.通過ReblanceImp的lockAll方法每隔一段時間定時鎖住當前消費端消費的佇列,設定本地佇列ProcessQueue的locked屬性為true。保證broker中的每個訊息佇列只對應一個消費端。
    1. 在消費端也是通過鎖,保證每個ProcessQueue只有一個執行緒消費。

這裡保證有序是保證同一個佇列是有序消費的,但是不同的佇列消費順序是不能保證的。

相關推薦

RocketMQ有序訊息研究

在訂閱訊息的時候,有時我們希望訊息能按照一定業務順序消費,比如一個訂單建立,訂單修改,訂單完成。這時候是需要順序訊息。RocketMQ支援順序消費,下面來研究一下實現邏輯。 樣例 生產者 public class OrderedProducer {

rocketmq有序消息

先進先出 blog byte tin def 發送 rocket off void RocketMQ提供的順序消費消息實現是使用的FIFO 先進先出算法 Producer消息發送 public class Producer { public static void

Kafka、RabbitMQ、RocketMQ訊息中介軟體的對比 —— 訊息傳送效能和區別

分散式系統中,我們廣泛運用訊息中介軟體進行系統間的資料交換,便於非同步解耦。現在開源的訊息中介軟體有很多,前段時間我們自家的產品 RocketMQ (MetaQ的核心) 也順利開源,得到大家的關注。 那麼,訊息中介軟體效能究竟哪家強? 帶著這個疑問,我們中介軟體測

RocketMQ-事務訊息

1.RocketMQ有三種訊息     1)普通訊息     2)順序訊息     3)事務訊息 2.分散式事務-強調最終一致性而不是強一致性 單個應用,資料庫分庫分表了(跨資料庫) 多個應用,服務

RocketMQ-順序訊息

github程式碼下載地址 1.RocketMQ有三種訊息     1)普通訊息     2)順序訊息     3)事務訊息 2.順序訊息     概念:是MQ提供的一種嚴格按照順序進行釋出和消費

解析RocketMQ訊息索引檔案consumequeue

CommitLog的檔案結構 下圖展示了CommitLog的檔案結構,可以看到,包含了topic、queueId、訊息體等核心資訊。 同Kafka一樣,訊息是變長的,順序寫入。 如下圖所示: ConsumeQueue的檔案結構 ConsumeQueue中並不需要

架構師日記——Kafka、RabbitMQ、RocketMQ訊息中介軟體的對比

分散式系統中,我們廣泛運用訊息中介軟體進行系統間的資料交換,便於非同步解耦。現在開源的訊息中介軟體有很多,前段時間我們自家的產品 RocketMQ (MetaQ的核心) 也順利開源,得到大家的關注。 那麼,訊息中介軟體效能究竟哪家強? 帶著這個疑問,我們中

RocketMQ訊息傳送方式

同步傳送 簡單來說,同步傳送就是指 producer 傳送訊息後,會在接收到 broker 響應後才繼續發下一條訊息的通訊方式。 由於這種同步傳送的方式確保了訊息的可靠性,同時也能及時得到訊息傳送的結果,故而適合一些傳送比較重要的訊息場景,比如說重要的通知郵件、營銷簡訊等等。在實

RocketMq-延遲訊息及 程式碼實現

支援延遲訊息 RocketMQ 支援定時訊息,但是不支援任意時間精度,僅支援特定的 level,例如定時 5s, 10s, 1m 等。其中,level=0 級表示不延時,level=1 表示 1 級延時,level=2 表示 2 級延時,以此類推。 配置 開啟安裝目錄的

RocketMQ訊息佇列

訊息佇列的特徵:1.業務無關。訊息佇列不需要考慮上層的業務模型,只需要做好訊息分發;2.FIFO。先投遞先到達的保證是一個訊息佇列和一個buffer(快取)的本質區別;3.容災。主要包括節點的動態增刪和訊息的持久化。4.效能。訊息佇列的吞吐量提升,則整個系統的內部通訊效率就會提高。為什麼需要訊息佇列?當系統中

搞懂分散式技術19:使用RocketMQ事務訊息解決分散式事務

說到分散式事務,就會談到那個經典的”賬號轉賬”問題:2個賬號,分佈處於2個不同的DB,或者說2個不同的子系統裡面,A要扣錢,B要加錢,如何保證原子性?一般的思路都是通過訊息中介軟體來實現“最終一致性”:A系統扣錢,然後發條訊息給中介軟體,B系統接收此訊息,進行加錢。但這裡面有個問題:A是先update DB,

分散式訊息佇列RocketMQ--事務訊息--解決分散式事務

說到分散式事務,就會談到那個經典的”賬號轉賬”問題:2個賬號,分佈處於2個不同的DB,或者說2個不同的子系統裡面,A要扣錢,B要加錢,如何保證原子性? 一般的思路都是通過訊息中介軟體來實現“最終一致性”:A系統扣錢,然後發條訊息給中介軟體,B系統接收此訊息,進行加錢。 但這裡面有個問題:A是先update D

RocketMQ 解決訊息重複投遞的問題

業務與訊息耦合的情況下會產生訊息重複投遞,因為支付寶儲存了訊息記錄,餘額寶處理成功之後需要傳送處理成功的訊息給支付寶。支付寶進行確認之後,把訊息記錄狀態刪除或者做變更。而我們的業務與訊息解耦的方式則不需要考慮訊息重複投遞的問題。因為 我們完全信任MQ中介軟體。第一種方式的訊息

分散式訊息佇列RocketMQ&Kafka -- 訊息的“順序消費”-- 一個看似簡單的複雜問題

在說到訊息中介軟體的時候,我們通常都會談到一個特性:訊息的順序消費問題。這個問題看起來很簡單:Producer傳送訊息1, 2, 3。。。 Consumer按1, 2, 3。。。順序消費。 但實際情況卻是:無論RocketMQ,還是Kafka,預設都不保證訊息

RocketMQ原始碼分析之RocketMQ事務訊息實現原理上篇(二階段提交)

根據上文的描述,傳送事務訊息的入口為: TransactionMQProducer#sendMessageInTransaction: public TransactionSendResult sendMessageInTransaction(final Message msg, final Object

RocketMQ原始碼分析之從官方示例窺探:RocketMQ事務訊息實現基本思想

RocketMQ4.3.0版本開始支援事務訊息,後續分享將開始將剖析事務訊息的實現原理。首先從官方給出的Demo例項入手,以此通往RocketMQ事務訊息的世界中。 官方版本未釋出之前,從apache rocketmq第一個版本上線後,程式碼中存在與事務訊息相關的程式碼,例如COMMIT、ROLLBACK、

rocketmq 事務訊息

轉自  https://mp.weixin.qq.com/s?__biz=MzU4NzU0MDIzOQ==&mid=2247484003&idx=1&sn=8b9b084f463c6a0bd0ce04d9ca670079&scene=19 近日,

分散式訊息佇列RocketMQ--事務訊息--解決分散式事務的最佳實踐

說到分散式事務,就會談到那個經典的”賬號轉賬”問題:2個賬號,分佈處於2個不同的DB,或者說2個不同的子系統裡面,A要扣錢,B要加錢,如何保證原子性? 一般的思路都是通過訊息中介軟體來實現“最終一致性”:A系統扣錢,然後發條訊息給中介軟體,B系統接收此訊息,進行加錢。

Kafka、RabbitMQ、RocketMQ訊息中介軟體的對比

訊息中介軟體現在有不少,網上很多文章都對其做過對比,在這我對其做進一步總結與整理。RocketMQ淘寶內部的交易系統使用了淘寶自主研發的Notify訊息中介軟體,使用Mysql作為訊息儲存媒介,可完全水平擴容,為了進一步降低成本,我們認為儲存部分可以進一步優化,2011年初,

原始碼分析RocketMQ訊息ACK機制(消費進度)

首先簡要闡述一下訊息消費進度首先消費者訂閱訊息消費佇列(MessageQueue),當生產者將訊息負載傳送到MessageQueue中時,消費訂閱者開始消費訊息,訊息消費過程中,為了避免重複消費,需要一