RocketMQ有序訊息研究
在訂閱訊息的時候,有時我們希望訊息能按照一定業務順序消費,比如一個訂單建立,訂單修改,訂單完成。這時候是需要順序訊息。RocketMQ支援順序消費,下面來研究一下實現邏輯。
樣例
生產者
public class OrderedProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
MQProducer producer = new DefaultMQProducer("example_group_name" );
//Launch the instance.
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 100; i++) {
int orderId = i % 10;
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
//server shutdown
producer.shutdown();
}
}
消費者
public class OrderedConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
Random random = new Random();
context.setAutoCommit(false);
System.out.print(Thread.currentThread().getName() + " Receive New Messages: " );
for (MessageExt msg: msgs) {
System.out.println("topic=" + msg.getTopic() + ",tags=" + msg.getTags() + ", content:" + new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
輸出如下,從結果來看,雖然消費端建立了多個執行緒消費,但是從不同tag來看都是有序的。
ConsumeMessageThread_16 Receive New Messages: topic=TopicTest,tags=TagA, content:Hello RocketMQ 40
ConsumeMessageThread_16 Receive New Messages: topic=TopicTest,tags=TagD, content:Hello RocketMQ 48
ConsumeMessageThread_16 Receive New Messages: topic=TopicTest,tags=TagA, content:Hello RocketMQ 50
ConsumeMessageThread_16 Receive New Messages: topic=TopicTest,tags=TagD, content:Hello RocketMQ 58
ConsumeMessageThread_16 Receive New Messages: topic=TopicTest,tags=TagA, content:Hello RocketMQ 60
ConsumeMessageThread_16 Receive New Messages: topic=TopicTest,tags=TagD, content:Hello RocketMQ 68
ConsumeMessageThread_16 Receive New Messages: topic=TopicTest,tags=TagA, content:Hello RocketMQ 70
ConsumeMessageThread_16 Receive New Messages: topic=TopicTest,tags=TagD, content:Hello RocketMQ 78
ConsumeMessageThread_16 Receive New Messages: topic=TopicTest,tags=TagA, content:Hello RocketMQ 80
ConsumeMessageThread_16 Receive New Messages: topic=TopicTest,tags=TagD, content:Hello RocketMQ 88
ConsumeMessageThread_16 Receive New Messages: topic=TopicTest,tags=TagA, content:Hello RocketMQ 90
ConsumeMessageThread_16 Receive New Messages: topic=TopicTest,tags=TagD, content:Hello RocketMQ 98
...
原始碼分析
生產者
生產者程式碼不能難看,在傳送訊息的時候,進行了orderId進行雜湊,讓同一個orderId每次傳送到同一個佇列,保證同一個佇列單個執行緒消費肯定是有序的。
消費者
首先了解一些概念
MessageQueue是邏輯佇列,包含topic和brokerName以及queueId。通過MessageQueue可以中broker定位到訊息佇列。
public class MessageQueue implements Comparable<MessageQueue>, Serializable {
private static final long serialVersionUID = 6191200464116433425L;
private String topic;
private String brokerName;
private int queueId;
PullRequest由MessageQueue和ProcessQueue。
public class PullRequest {
private String consumerGroup;
private MessageQueue messageQueue;
private ProcessQueue processQueue;
private long nextOffset;
ProcessQueue是消費端實際的消費載體。
/**
* Queue consumption snapshot
*
* @author shijia.wxr<[email protected]>
* @since 2013-7-24
*/
public class ProcessQueue {
public final static long RebalanceLockMaxLiveTime = Long.parseLong(System.getProperty(
"rocketmq.client.rebalance.lockMaxLiveTime", "30000"));
public final static long RebalanceLockInterval = Long.parseLong(System.getProperty(
"rocketmq.client.rebalance.lockInterval", "20000"));
private final Logger log = ClientLogger.getLog();
private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
private volatile long queueOffsetMax = 0L;
private final AtomicLong msgCount = new AtomicLong();
入口
消費者內部實現比較複雜,實現邏輯在於ConsumeMessageOrderlyService。這個在註冊MessageListenerOrderly會生成。
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this,
(MessageListenerOrderly) this.getMessageListenerInner());
}
else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this,
(MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();
在consumer.start()後會建立了定時任務,預設每隔20秒向broker鎖住當前消費端的消費佇列MessageQueue。保證其他消費端不能消費這些佇列訊息。
public void start() {
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl
.messageModel())) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
ConsumeMessageOrderlyService.this.lockMQPeriodically();
}
}, 1000 * 1, ProcessQueue.RebalanceLockInterval, TimeUnit.MILLISECONDS);
}
}
public void lockAll() {
HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();
Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator();
while (it.hasNext()) {
Entry<String, Set<MessageQueue>> entry = it.next();
final String brokerName = entry.getKey();
final Set<MessageQueue> mqs = entry.getValue();
if (mqs.isEmpty())
continue;
FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
if (findBrokerResult != null) {
LockBatchRequestBody requestBody = new LockBatchRequestBody();
requestBody.setConsumerGroup(this.consumerGroup);
requestBody.setClientId(this.mQClientFactory.getClientId());
requestBody.setMqSet(mqs);
try {
Set<MessageQueue> lockOKMQSet =
this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(
findBrokerResult.getBrokerAddr(), requestBody, 1000);
for (MessageQueue mq : lockOKMQSet) {
ProcessQueue processQueue = this.processQueueTable.get(mq);
if (processQueue != null) {
if (!processQueue.isLocked()) {
log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);
}
processQueue.setLocked(true);
processQueue.setLastLockTimestamp(System.currentTimeMillis());
}
}
for (MessageQueue mq : mqs) {
if (!lockOKMQSet.contains(mq)) {
ProcessQueue processQueue = this.processQueueTable.get(mq);
if (processQueue != null) {
processQueue.setLocked(false);
log.warn("the message queue locked Failed, Group: {} {}", this.consumerGroup,
mq);
}
}
}
} catch (Exception e) {
log.error("lockBatchMQ exception, " + mqs, e);
}
}
}
}
拉取訊息
PullMessageService負責不斷地把訊息拉過來消費。簡單從pullRequestQueue取出來PullRequest,執行拉取操作。PullRequest由DefaultMQPushConsumerImpl推過來放入pullRequestQueue中。
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStoped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
if (pullRequest != null) {
this.pullMessage(pullRequest);
}
}
catch (InterruptedException e) {
}
catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
負載均衡
RebalanceImpl定時10秒鐘負載權衡,傳送PullRequest,向broker請求拉取訊息。負載幾個就建立幾個PullRequest。
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(new ProcessQueue());
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
pullRequest.setNextOffset(nextOffset);
pullRequestList.add(pullRequest);
changed = true;
this.processQueueTable.put(mq, pullRequest.getProcessQueue());
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
this.dispatchPullRequest(pullRequestList);
RebalancePushImpl分發
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
for (PullRequest pullRequest : pullRequestList) {
this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
}
}
通過PullMessageService把PullRequest傳送出去
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStoped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
if (pullRequest != null) {
this.pullMessage(pullRequest);
}
}
catch (InterruptedException e) {
}
catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
拉取訊息
broker在收到PullRequest後會,響應生成PullResult返回給消費端。通過PullCallback回撥消費訊息。在callback中,DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest提交任務非同步消費。然後將pullRequest中佇列偏移量offset設定為PullRequest回傳後的偏移量,繼續傳送PullRequest拉取訊息。
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult =
DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(
pullRequest.getMessageQueue(), pullResult, subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(
pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(
pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(),
pullResult.getMsgFoundList().size());
boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
pullResult.getMsgFoundList(), //
processQueue, //
pullRequest.getMessageQueue(), //
dispathToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
}
else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
if (pullResult.getNextBeginOffset() < prevRequestOffset//
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",//
pullResult.getNextBeginOffset(),//
firstMsgOffset,//
prevRequestOffset);
}
break;
消費訊息
通過建立ConsumeRequest任務消費。每次都消費那些被鎖定的佇列。
每次獲取鎖,保證消費端只有一個執行緒消費。這樣保證消費有序的。而且只有訊息佇列被鎖定才能消費。
public void run() {
if (this.processQueue.isDropped()) {
log.warn("run, the message queue not be able to consume, because it's dropped. {}",
this.messageQueue);
return;
}
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
if (MessageModel.BROADCASTING
.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired()))
if (MessageModel.CLUSTERING
.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl
.messageModel())
&& !this.processQueue.isLocked()) {
log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue,
this.processQueue, 10);
break;
}
if (MessageModel.CLUSTERING
.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl
.messageModel())
&& this.processQueue.isLockExpired()) {
log.warn("the message queue lock expired, so consume later, {}",
this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue,
this.processQueue, 10);
break;
}
...
}
呼叫processQueue.takeMessags拿出訊息,呼叫使用者端MessageListenerOrderly消費訊息
List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
...
try {
this.processQueue.getLockConsume().lock();
if (this.processQueue.isDropped()) {
log.warn(
"consumeMessage, the message queue not be able to consume, because it's dropped. {}",
this.messageQueue);
break;
}
status =
messageListener.consumeMessage(Collections.unmodifiableList(msgs),
context);
}
總結
保證訊息順序消費有兩方面:
- 1.通過ReblanceImp的lockAll方法每隔一段時間定時鎖住當前消費端消費的佇列,設定本地佇列ProcessQueue的locked屬性為true。保證broker中的每個訊息佇列只對應一個消費端。
- 在消費端也是通過鎖,保證每個ProcessQueue只有一個執行緒消費。
這裡保證有序是保證同一個佇列是有序消費的,但是不同的佇列消費順序是不能保證的。
相關推薦
RocketMQ有序訊息研究
在訂閱訊息的時候,有時我們希望訊息能按照一定業務順序消費,比如一個訂單建立,訂單修改,訂單完成。這時候是需要順序訊息。RocketMQ支援順序消費,下面來研究一下實現邏輯。 樣例 生產者 public class OrderedProducer {
rocketmq有序消息
先進先出 blog byte tin def 發送 rocket off void RocketMQ提供的順序消費消息實現是使用的FIFO 先進先出算法 Producer消息發送 public class Producer { public static void
Kafka、RabbitMQ、RocketMQ等訊息中介軟體的對比 —— 訊息傳送效能和區別
分散式系統中,我們廣泛運用訊息中介軟體進行系統間的資料交換,便於非同步解耦。現在開源的訊息中介軟體有很多,前段時間我們自家的產品 RocketMQ (MetaQ的核心) 也順利開源,得到大家的關注。 那麼,訊息中介軟體效能究竟哪家強? 帶著這個疑問,我們中介軟體測
RocketMQ-事務訊息
1.RocketMQ有三種訊息 1)普通訊息 2)順序訊息 3)事務訊息 2.分散式事務-強調最終一致性而不是強一致性 單個應用,資料庫分庫分表了(跨資料庫) 多個應用,服務
RocketMQ-順序訊息
github程式碼下載地址 1.RocketMQ有三種訊息 1)普通訊息 2)順序訊息 3)事務訊息 2.順序訊息 概念:是MQ提供的一種嚴格按照順序進行釋出和消費
解析RocketMQ的訊息索引檔案consumequeue
CommitLog的檔案結構 下圖展示了CommitLog的檔案結構,可以看到,包含了topic、queueId、訊息體等核心資訊。 同Kafka一樣,訊息是變長的,順序寫入。 如下圖所示: ConsumeQueue的檔案結構 ConsumeQueue中並不需要
架構師日記——Kafka、RabbitMQ、RocketMQ等訊息中介軟體的對比
分散式系統中,我們廣泛運用訊息中介軟體進行系統間的資料交換,便於非同步解耦。現在開源的訊息中介軟體有很多,前段時間我們自家的產品 RocketMQ (MetaQ的核心) 也順利開源,得到大家的關注。 那麼,訊息中介軟體效能究竟哪家強? 帶著這個疑問,我們中
RocketMQ的訊息傳送方式
同步傳送 簡單來說,同步傳送就是指 producer 傳送訊息後,會在接收到 broker 響應後才繼續發下一條訊息的通訊方式。 由於這種同步傳送的方式確保了訊息的可靠性,同時也能及時得到訊息傳送的結果,故而適合一些傳送比較重要的訊息場景,比如說重要的通知郵件、營銷簡訊等等。在實
RocketMq-延遲訊息及 程式碼實現
支援延遲訊息 RocketMQ 支援定時訊息,但是不支援任意時間精度,僅支援特定的 level,例如定時 5s, 10s, 1m 等。其中,level=0 級表示不延時,level=1 表示 1 級延時,level=2 表示 2 級延時,以此類推。 配置 開啟安裝目錄的
RocketMQ的訊息佇列
訊息佇列的特徵:1.業務無關。訊息佇列不需要考慮上層的業務模型,只需要做好訊息分發;2.FIFO。先投遞先到達的保證是一個訊息佇列和一個buffer(快取)的本質區別;3.容災。主要包括節點的動態增刪和訊息的持久化。4.效能。訊息佇列的吞吐量提升,則整個系統的內部通訊效率就會提高。為什麼需要訊息佇列?當系統中
搞懂分散式技術19:使用RocketMQ事務訊息解決分散式事務
說到分散式事務,就會談到那個經典的”賬號轉賬”問題:2個賬號,分佈處於2個不同的DB,或者說2個不同的子系統裡面,A要扣錢,B要加錢,如何保證原子性?一般的思路都是通過訊息中介軟體來實現“最終一致性”:A系統扣錢,然後發條訊息給中介軟體,B系統接收此訊息,進行加錢。但這裡面有個問題:A是先update DB,
分散式訊息佇列RocketMQ--事務訊息--解決分散式事務
說到分散式事務,就會談到那個經典的”賬號轉賬”問題:2個賬號,分佈處於2個不同的DB,或者說2個不同的子系統裡面,A要扣錢,B要加錢,如何保證原子性? 一般的思路都是通過訊息中介軟體來實現“最終一致性”:A系統扣錢,然後發條訊息給中介軟體,B系統接收此訊息,進行加錢。 但這裡面有個問題:A是先update D
RocketMQ 解決訊息重複投遞的問題
業務與訊息耦合的情況下會產生訊息重複投遞,因為支付寶儲存了訊息記錄,餘額寶處理成功之後需要傳送處理成功的訊息給支付寶。支付寶進行確認之後,把訊息記錄狀態刪除或者做變更。而我們的業務與訊息解耦的方式則不需要考慮訊息重複投遞的問題。因為 我們完全信任MQ中介軟體。第一種方式的訊息
分散式訊息佇列RocketMQ&Kafka -- 訊息的“順序消費”-- 一個看似簡單的複雜問題
在說到訊息中介軟體的時候,我們通常都會談到一個特性:訊息的順序消費問題。這個問題看起來很簡單:Producer傳送訊息1, 2, 3。。。 Consumer按1, 2, 3。。。順序消費。 但實際情況卻是:無論RocketMQ,還是Kafka,預設都不保證訊息
RocketMQ原始碼分析之RocketMQ事務訊息實現原理上篇(二階段提交)
根據上文的描述,傳送事務訊息的入口為: TransactionMQProducer#sendMessageInTransaction: public TransactionSendResult sendMessageInTransaction(final Message msg, final Object
RocketMQ原始碼分析之從官方示例窺探:RocketMQ事務訊息實現基本思想
RocketMQ4.3.0版本開始支援事務訊息,後續分享將開始將剖析事務訊息的實現原理。首先從官方給出的Demo例項入手,以此通往RocketMQ事務訊息的世界中。 官方版本未釋出之前,從apache rocketmq第一個版本上線後,程式碼中存在與事務訊息相關的程式碼,例如COMMIT、ROLLBACK、
rocketmq 事務訊息
轉自 https://mp.weixin.qq.com/s?__biz=MzU4NzU0MDIzOQ==&mid=2247484003&idx=1&sn=8b9b084f463c6a0bd0ce04d9ca670079&scene=19 近日,
分散式訊息佇列RocketMQ--事務訊息--解決分散式事務的最佳實踐
說到分散式事務,就會談到那個經典的”賬號轉賬”問題:2個賬號,分佈處於2個不同的DB,或者說2個不同的子系統裡面,A要扣錢,B要加錢,如何保證原子性? 一般的思路都是通過訊息中介軟體來實現“最終一致性”:A系統扣錢,然後發條訊息給中介軟體,B系統接收此訊息,進行加錢。
Kafka、RabbitMQ、RocketMQ等訊息中介軟體的對比
訊息中介軟體現在有不少,網上很多文章都對其做過對比,在這我對其做進一步總結與整理。RocketMQ淘寶內部的交易系統使用了淘寶自主研發的Notify訊息中介軟體,使用Mysql作為訊息儲存媒介,可完全水平擴容,為了進一步降低成本,我們認為儲存部分可以進一步優化,2011年初,
原始碼分析RocketMQ之訊息ACK機制(消費進度)
首先簡要闡述一下訊息消費進度首先消費者訂閱訊息消費佇列(MessageQueue),當生產者將訊息負載傳送到MessageQueue中時,消費訂閱者開始消費訊息,訊息消費過程中,為了避免重複消費,需要一