1. 程式人生 > >RocketMQ的順序消費和事務消費

RocketMQ的順序消費和事務消費

一、三種消費 :1.普通消費 2. 順序消費 3.事務消費

1.1  順序消費:在網購的時候,我們需要下單,那麼下單需要假如有三個順序,第一、建立訂單 ,第二:訂單付款,第三:訂單完成。也就是這個三個環節要有順序,這個訂單才有意義。RocketMQ可以保證順序消費,他的實現是生產者(一個生產者可以對多個主題去傳送訊息)將這個三個訊息放在topic(一個topic預設有4個佇列)的一個佇列裡面,單機支援上萬個持久化佇列,消費端去消費的時候也是隻能有一個Consumer去取得這個佇列裡面的資料,然後順序消費。

單個節點(Producer端1個、Consumer端1個)

Producer端

  1. package order;

  2. import java.util.List;

  3. import com.alibaba.rocketmq.client.exception.MQBrokerException;

  4. import com.alibaba.rocketmq.client.exception.MQClientException;

  5. import com.alibaba.rocketmq.client.producer.DefaultMQProducer;

  6. import com.alibaba.rocketmq.client.producer.MessageQueueSelector;

  7. import com.alibaba.rocketmq.client.producer.SendResult;

  8. import com.alibaba.rocketmq.common.message.Message;

  9. import com.alibaba.rocketmq.common.message.MessageQueue;

  10. import com.alibaba.rocketmq.remoting.exception.RemotingException;

  11. /**

  12. * Producer,傳送順序訊息

  13. */

  14. public class Producer {

  15. public static void main(String[] args) {

  16. try {

  17. DefaultMQProducer producer = new DefaultMQProducer("order_Producer");

  18. producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");

  19. producer.start();

  20. // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD",

  21. // "TagE" };

  22. for (int i = 1; i <= 5; i++) {

  23. Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes());

  24. SendResult sendResult = producer.send(msg, new MessageQueueSelector() {

  25. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {

  26. Integer id = (Integer) arg;

  27. int index = id % mqs.size();

  28. return mqs.get(index);

  29. }

  30. }, 0);

  31. System.out.println(sendResult);

  32. }

  33. producer.shutdown();

  34. } catch (MQClientException e) {

  35. e.printStackTrace();

  36. } catch (RemotingException e) {

  37. e.printStackTrace();

  38. } catch (MQBrokerException e) {

  39. e.printStackTrace();

  40. } catch (InterruptedException e) {

  41. e.printStackTrace();

  42. }

  43. }

  44. }

Consumer端程式碼

  1. package order;

  2. import java.util.List;

  3. import java.util.concurrent.TimeUnit;

  4. import java.util.concurrent.atomic.AtomicLong;

  5. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

  6. import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;

  7. import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;

  8. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;

  9. import com.alibaba.rocketmq.client.exception.MQClientException;

  10. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

  11. import com.alibaba.rocketmq.common.message.MessageExt;

  12. /**

  13. * 順序訊息消費,帶事務方式(應用可控制Offset什麼時候提交)

  14. */

  15. public class Consumer1 {

  16. public static void main(String[] args) throws MQClientException {

  17. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");

  18. consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");

  19. /**

  20. * 設定Consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費<br>

  21. * 如果非第一次啟動,那麼按照上次消費的位置繼續消費

  22. */

  23. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

  24. consumer.subscribe("TopicOrderTest", "*");

  25. consumer.registerMessageListener(new MessageListenerOrderly() {

  26. AtomicLong consumeTimes = new AtomicLong(0);

  27. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {

  28. // 設定自動提交

  29. context.setAutoCommit(true);

  30. for (MessageExt msg : msgs) {

  31. System.out.println(msg + ",內容:" + new String(msg.getBody()));

  32. }

  33. try {

  34. TimeUnit.SECONDS.sleep(5L);

  35. } catch (InterruptedException e) {

  36. e.printStackTrace();

  37. }

  38. ;

  39. return ConsumeOrderlyStatus.SUCCESS;

  40. }

  41. });

  42. consumer.start();

  43. System.out.println("Consumer1 Started.");

  44. }

  45. }

結果如下圖所示:

這個五條資料被順序消費了

多個節點(Producer端1個、Consumer端2個)

Producer端程式碼:

  1. package order;

  2. import java.util.List;

  3. import com.alibaba.rocketmq.client.exception.MQBrokerException;

  4. import com.alibaba.rocketmq.client.exception.MQClientException;

  5. import com.alibaba.rocketmq.client.producer.DefaultMQProducer;

  6. import com.alibaba.rocketmq.client.producer.MessageQueueSelector;

  7. import com.alibaba.rocketmq.client.producer.SendResult;

  8. import com.alibaba.rocketmq.common.message.Message;

  9. import com.alibaba.rocketmq.common.message.MessageQueue;

  10. import com.alibaba.rocketmq.remoting.exception.RemotingException;

  11. /**

  12. * Producer,傳送順序訊息

  13. */

  14. public class Producer {

  15. public static void main(String[] args) {

  16. try {

  17. DefaultMQProducer producer = new DefaultMQProducer("order_Producer");

  18. producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");

  19. producer.start();

  20. // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD",

  21. // "TagE" };

  22. for (int i = 1; i <= 5; i++) {

  23. Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes());

  24. SendResult sendResult = producer.send(msg, new MessageQueueSelector() {

  25. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {

  26. Integer id = (Integer) arg;

  27. int index = id % mqs.size();

  28. return mqs.get(index);

  29. }

  30. }, 0);

  31. System.out.println(sendResult);

  32. }

  33. for (int i = 1; i <= 5; i++) {

  34. Message msg = new Message("TopicOrderTest", "order_2", "KEY" + i, ("order_2 " + i).getBytes());

  35. SendResult sendResult = producer.send(msg, new MessageQueueSelector() {

  36. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {

  37. Integer id = (Integer) arg;

  38. int index = id % mqs.size();

  39. return mqs.get(index);

  40. }

  41. }, 1);

  42. System.out.println(sendResult);

  43. }

  44. for (int i = 1; i <= 5; i++) {

  45. Message msg = new Message("TopicOrderTest", "order_3", "KEY" + i, ("order_3 " + i).getBytes());

  46. SendResult sendResult = producer.send(msg, new MessageQueueSelector() {

  47. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {

  48. Integer id = (Integer) arg;

  49. int index = id % mqs.size();

  50. return mqs.get(index);

  51. }

  52. }, 2);

  53. System.out.println(sendResult);

  54. }

  55. producer.shutdown();

  56. } catch (MQClientException e) {

  57. e.printStackTrace();

  58. } catch (RemotingException e) {

  59. e.printStackTrace();

  60. } catch (MQBrokerException e) {

  61. e.printStackTrace();

  62. } catch (InterruptedException e) {

  63. e.printStackTrace();

  64. }

  65. }

  66. }

Consumer1

  1. /**

  2. * 順序訊息消費,帶事務方式(應用可控制Offset什麼時候提交)

  3. */

  4. public class Consumer1 {

  5. public static void main(String[] args) throws MQClientException {

  6. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");

  7. consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");

  8. /**

  9. * 設定Consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費<br>

  10. * 如果非第一次啟動,那麼按照上次消費的位置繼續消費

  11. */

  12. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

  13. consumer.subscribe("TopicOrderTest", "*");

  14. /**

  15. * 實現了MessageListenerOrderly表示一個佇列只會被一個執行緒取到

  16. *,第二個執行緒無法訪問這個佇列

  17. */

  18. consumer.registerMessageListener(new MessageListenerOrderly() {

  19. AtomicLong consumeTimes = new AtomicLong(0);

  20. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {

  21. // 設定自動提交

  22. context.setAutoCommit(true);

  23. for (MessageExt msg : msgs) {

  24. System.out.println(msg + ",內容:" + new String(msg.getBody()));

  25. }

  26. try {

  27. TimeUnit.SECONDS.sleep(5L);

  28. } catch (InterruptedException e) {

  29. e.printStackTrace();

  30. }

  31. ;

  32. return ConsumeOrderlyStatus.SUCCESS;

  33. }

  34. });

  35. consumer.start();

  36. System.out.println("Consumer1 Started.");

  37. }

  38. }

Consumer2

  1. /**

  2. * 順序訊息消費,帶事務方式(應用可控制Offset什麼時候提交)

  3. */

  4. public class Consumer2 {

  5. public static void main(String[] args) throws MQClientException {

  6. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");

  7. consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");

  8. /**

  9. * 設定Consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費<br>

  10. * 如果非第一次啟動,那麼按照上次消費的位置繼續消費

  11. */

  12. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

  13. consumer.subscribe("TopicOrderTest", "*");

  14. /**

  15. * 實現了MessageListenerOrderly表示一個佇列只會被一個執行緒取到

  16. *,第二個執行緒無法訪問這個佇列

  17. */

  18. consumer.registerMessageListener(new MessageListenerOrderly() {

  19. AtomicLong consumeTimes = new AtomicLong(0);

  20. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {

  21. // 設定自動提交

  22. context.setAutoCommit(true);

  23. for (MessageExt msg : msgs) {

  24. System.out.println(msg + ",內容:" + new String(msg.getBody()));

  25. }

  26. try {

  27. TimeUnit.SECONDS.sleep(5L);

  28. } catch (InterruptedException e) {

  29. e.printStackTrace();

  30. }

  31. ;

  32. return ConsumeOrderlyStatus.SUCCESS;

  33. }

  34. });

  35. consumer.start();

  36. System.out.println("Consumer2 Started.");

  37. }

  38. }

先啟動Consumer1和Consumer2,然後啟動Producer,Producer會發送15條訊息 Consumer1消費情況如圖,都按照順序執行了

Consumer2消費情況如圖,都按照順序執行了

二、事務消費

這裡說的主要是分散式事物。下面的例子的資料庫分別安裝在不同的節點上。

事物消費需要先說說什麼是事物。比如說:我們跨行轉賬,從工商銀行轉到建設銀行,也就是我從工商銀行扣除1000元之後,我的建設銀行也必須加1000元。這樣才能保證資料的一致性。假如工商銀行轉1000元之後,建設銀行的伺服器突然宕機,那麼我扣除了1000,但是並沒有在建設銀行給我加1000,就出現了資料的不一致。因此加1000和減1000才行,減1000和減1000必須一起成功,一起失敗。

再比如,我們進行網購的時候,我們下單之後,訂單提交成功,倉庫商品的數量必須減一。但是訂單可能是一個數據庫,倉庫數量可能又是在另個數據庫裡面。有可能訂單提交成功之後,倉庫數量伺服器突然宕機。這樣也出現了資料不一致的問題。

使用訊息佇列來解決分散式事物:

現在我們去外面飯店吃飯,很多時候都不會直接給了錢之後直接在付款的視窗遞飯菜,而是付款之後他會給你一張小票,你拿著這個小票去出飯的視窗取飯。這裡和我們的系統類似,提高了吞吐量。即使你到第二個視窗,師傅告訴你已經沒飯了,你可以拿著這個憑證去退款,即使中途由於出了意外你無法到達視窗進行取飯,但是隻要憑證還在,可以將錢退給你。這樣就保證了資料的一致性。

如何保證憑證(訊息)有2種方法:

1、在工商銀行扣款的時候,餘額表扣除1000,同時記錄日誌,而且這2個表是在同一個資料庫例項中,可以使用本地事物解決。然後我們通知建設銀行需要加1000給該使用者,建設銀行收到之後給我返回已經加了1000給使用者的確認資訊之後,我再標記日誌表裡面的日誌為已經完成。

2、通過訊息中介軟體

原文地址:http://www.jianshu.com/p/453c6e7ff81c

RocketMQ第一階段傳送Prepared訊息時,會拿到訊息的地址,第二階段執行本地事物,第三階段通過第一階段拿到的地址去訪問訊息,並修改訊息的狀態。

細心的你可能又發現問題了,如果確認訊息傳送失敗了怎麼辦?RocketMQ會定期掃描訊息叢集中的事物訊息,如果發現了Prepared訊息,它會向訊息傳送端(生產者)確認,Bob的錢到底是減了還是沒減呢?如果減了是回滾還是繼續傳送確認訊息呢?RocketMQ會根據傳送端設定的策略來決定是回滾還是繼續傳送確認訊息。這樣就保證了訊息傳送與本地事務同時成功或同時失敗。

例子:

Consumer 端

  1. package transaction;

  2. import java.util.List;

  3. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

  4. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

  5. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

  6. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;

  7. import com.alibaba.rocketmq.client.exception.MQClientException;

  8. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

  9. import com.alibaba.rocketmq.common.message.MessageExt;

  10. /**

  11. * Consumer,訂閱訊息

  12. */

  13. public class Consumer {

  14. public static void main(String[] args) throws InterruptedException, MQClientException {

  15. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_Consumer");

  16. consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");

  17. consumer.setConsumeMessageBatchMaxSize(10);

  18. /**

  19. * 設定Consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費<br>

  20. * 如果非第一次啟動,那麼按照上次消費的位置繼續消費

  21. */

  22. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

  23. consumer.subscribe("TopicTransactionTest", "*");

  24. consumer.registerMessageListener(new MessageListenerConcurrently() {

  25. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

  26. try {

  27. for (MessageExt msg : msgs) {

  28. System.out.println(msg + ",內容:" + new String(msg.getBody()));

  29. }

  30. } catch (Exception e) {

  31. e.printStackTrace();

  32. return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重試

  33. }

  34. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功

  35. }

  36. });

  37. consumer.start();

  38. System.out.println("transaction_Consumer Started.");

  39. }

  40. }

Producer端

  1. package transaction;

  2. import com.alibaba.rocketmq.client.exception.MQClientException;

  3. import com.alibaba.rocketmq.client.producer.SendResult;

  4. import com.alibaba.rocketmq.client.producer.TransactionCheckListener;

  5. import com.alibaba.rocketmq.client.producer.TransactionMQProducer;

  6. import com.alibaba.rocketmq.common.message.Message;

  7. /**

  8. * 傳送事務訊息例子

  9. *

  10. */

  11. public class Producer {

  12. public static void main(String[] args) throws MQClientException, InterruptedException {

  13. TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();

  14. TransactionMQProducer producer = new TransactionMQProducer("transaction_Producer");

  15. producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");

  16. // 事務回查最小併發數

  17. producer.setCheckThreadPoolMinSize(2);

  18. // 事務回查最大併發數

  19. producer.setCheckThreadPoolMaxSize(2);

  20. // 佇列數

  21. producer.setCheckRequestHoldMax(2000);

  22. producer.setTransactionCheckListener(transactionCheckListener);

  23. producer.start();

  24. // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", "TagE"

  25. // };

  26. TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();

  27. for (int i = 1; i <= 2; i++) {

  28. try {

  29. Message msg = new Message("TopicTransactionTest", "transaction" + i, "KEY" + i,

  30. ("Hello RocketMQ " + i).getBytes());

  31. SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);

  32. System.out.println(sendResult);

  33. Thread.sleep(10);

  34. } catch (MQClientException e) {

  35. e.printStackTrace();

  36. }

  37. }

  38. for (int i = 0; i < 100000; i++) {

  39. Thread.sleep(1000);

  40. }

  41. producer.shutdown();

  42. }

  43. }

TransactionExecuterImpl  --執行本地事務

  1. package transaction;

  2. import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;

  3. import com.alibaba.rocketmq.client.producer.LocalTransactionState;

  4. import com.alibaba.rocketmq.common.message.Message;

  5. /**

  6. * 執行本地事務

  7. */

  8. public class TransactionExecuterImpl implements LocalTransactionExecuter {

  9. // private AtomicInteger transactionIndex = new AtomicInteger(1);

  10. public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {

  11. System.out.println("執行本地事務msg = " + new String(msg.getBody()));

  12. System.out.println("執行本地事務arg = " + arg);

  13. String tags = msg.getTags();

  14. if (tags.equals("transaction2")) {

  15. System.out.println("======我的操作============,失敗了 -進行ROLLBACK");

  16. return LocalTransactionState.ROLLBACK_MESSAGE;

  17. }

  18. return LocalTransactionState.COMMIT_MESSAGE;

  19. // return LocalTransactionState.UNKNOW;

  20. }

  21. }

TransactionCheckListenerImpl--未決事務,伺服器回查客戶端(目前已經被閹割啦)

  1. package transaction;

  2. import com.alibaba.rocketmq.client.producer.LocalTransactionState;

  3. import com.alibaba.rocketmq.client.producer.TransactionCheckListener;

  4. import com.alibaba.rocketmq.common.message.MessageExt;

  5. /**

  6. * 未決事務,伺服器回查客戶端

  7. */

  8. public class TransactionCheckListenerImpl implements TransactionCheckListener {

  9. // private AtomicInteger transactionIndex = new AtomicInteger(0);

  10. //在這裡,我們可以根據由MQ回傳的key去資料庫查詢,這條資料到底是成功了還是失敗了。

  11. public LocalTransactionState checkLocalTransactionState(MessageExt msg) {

  12. System.out.println("未決事務,伺服器回查客戶端msg =" + new String(msg.getBody().toString()));

  13. // return LocalTransactionState.ROLLBACK_MESSAGE;

  14. return LocalTransactionState.COMMIT_MESSAGE;

  15. // return LocalTransactionState.UNKNOW;

  16. }

  17. }

producer端:傳送資料到MQ,並且處理本地事物。這裡模擬了一個成功一個失敗。Consumer只會接收到本地事物成功的資料。第二個資料失敗了,不會被消費。

 

Consumer只會接收到一個,第二個資料不會被接收到

--------------------- 本文來自 Mr_蝸牛 的CSDN 部落格 ,全文地址請點選:https://blog.csdn.net/u010634288/article/details/57158374?utm_source=copy