1. 程式人生 > >RocketMQ——順序消費(程式碼)

RocketMQ——順序消費(程式碼)

本部落格主要是以程式碼示例來了解順序消費的相關內容,建議在此之前先了解下順序消費的原理。

注:RocketMQ可以嚴格的保證訊息有序,但這個順序,不是全域性順序,只是分割槽(queue)順序,如果想要全域性順序,那麼需要保證只有一個分割槽。

順序消費簡介

1.普通順序消費

順序消費的一種,正常情況下可以保證完全的順序訊息,但是一旦發生通訊異常,Broker重啟,由於佇列總數法還是能變化,雜湊取模後定位的佇列會變化,產生短暫的訊息順序不一致。

2.嚴格順序訊息

順序訊息的一種,無論正常異常情況都能保證順序,但是犧牲了分散式failover特性,即broker叢集中要有一臺機器不可用,則整個叢集都不可用,服務可用性大大降低。如果伺服器部署為同步雙寫模式,此缺陷可通過備機自動切換為主避免,不過仍然會存在幾分鐘的服務不可用。

目前已知的應用只有資料庫binlog同步強依賴嚴格順序訊息,其他應用絕大部分都可以容忍短暫亂序,推薦使用普通的順序訊息。

1.producer

package com.gwd.rocketmq;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.remoting.exception.RemotingException;

/** 
* @FileName Producer.java
* @Description:
* @author gu.weidong
* @version V1.0
* @createtime 2018年7月3日 上午9:59:38 
* 修改歷史:
* 時間           作者          版本        描述
*====================================================  
*
*/
/**
 * Producer,傳送順序訊息
 */
public class Producer {
	
    public static void main(String[] args) throws IOException {
        try {
            DefaultMQProducer producer = new DefaultMQProducer("sequence_producer");
 
            producer.setNamesrvAddr("192.168.159.128:9876;192.168.159.129:9876");
 
            producer.start();
 
            String[] tags = new String[] { "TagA", "TagC", "TagD" };
            
            // 訂單列表
            List<OrderDO> orderList =  new Producer().buildOrders();
            
            Date date = new Date();
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            String dateStr = sdf.format(date);
            for (int i = 0; i < 10; i++) {
                // 加個時間字尾
                String body = dateStr + " Hello RocketMQ " + orderList.get(i).getOrderId()+orderList.get(i).getDesc();
                Message msg = new Message("SequenceTopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());
 
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Long id = Long.valueOf((String)arg);
                        long index = id % mqs.size();
                        return mqs.get((int)index);
                    }
                }, orderList.get(i).getOrderId());//通過訂單id來獲取對應的messagequeue
 
                System.out.println(sendResult + ", body:" + body);
            }
            
            producer.shutdown();
 
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.in.read();
    }
    
    /**
     * 生成模擬訂單資料 
     */
    private List<OrderDO> buildOrders() {
    	List<OrderDO> orderList = new ArrayList<OrderDO>();
 
    	OrderDO OrderDO = new OrderDO();
        OrderDO.setOrderId("15103111039");
    	OrderDO.setDesc("建立");
    	orderList.add(OrderDO);
    	
    	OrderDO = new OrderDO();
    	OrderDO.setOrderId("15103111065");
    	OrderDO.setDesc("建立");
    	orderList.add(OrderDO);
    	
    	OrderDO = new OrderDO();
    	OrderDO.setOrderId("15103111039");
    	OrderDO.setDesc("付款");
    	orderList.add(OrderDO);
    	
    	OrderDO = new OrderDO();
    	OrderDO.setOrderId("15103117235");
    	OrderDO.setDesc("建立");
    	orderList.add(OrderDO);
    	
    	OrderDO = new OrderDO();
    	OrderDO.setOrderId("15103111065");
    	OrderDO.setDesc("付款");
    	orderList.add(OrderDO);
    	
    	OrderDO = new OrderDO();
    	OrderDO.setOrderId("15103117235");
    	OrderDO.setDesc("付款");
    	orderList.add(OrderDO);
    	
    	OrderDO = new OrderDO();
    	OrderDO.setOrderId("15103111065");
    	OrderDO.setDesc("完成");
    	orderList.add(OrderDO);
    	
    	OrderDO = new OrderDO();
    	OrderDO.setOrderId("15103111039");
    	OrderDO.setDesc("推送");
    	orderList.add(OrderDO);
    	
    	OrderDO = new OrderDO();
    	OrderDO.setOrderId("15103117235");
    	OrderDO.setDesc("完成");
    	orderList.add(OrderDO);
    	
    	OrderDO = new OrderDO();
    	OrderDO.setOrderId("15103111039");
    	OrderDO.setDesc("完成");
    	orderList.add(OrderDO);
    	return orderList;
    }
}
此處需要注意,producer.send(msg, new MessageQueueSelector()),如果需要全域性有序,只需要使new MessageQueueSelector().select(List<MessageQueue> mqs, Message msg, Object arg)方法返回值唯一且不變,例如:
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Long id = Long.valueOf((String)arg);
                        long index = id % mqs.size();
                        return mqs.get((int)index);
                    }
                }, orderList.get(0).getOrderId());//通過訂單id來獲取對應的messagequeue

這邊獲取到的queue永遠都是唯一的且確定的(此處只是舉個簡單的例子,orderList.get(i).getOrderId()改為0亦可)

2.錯誤的Consumer

package com.gwd.rocketmq;

import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;

/** 
* @FileName WrongConsumer.java
* @Description:
* @author gu.weidong
* @version V1.0
* @createtime 2018年7月3日 下午3:13:16 
* 修改歷史:
* 時間           作者          版本        描述
*====================================================  
*
*/
public class WrongConsumer {
	public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
        consumer.setNamesrvAddr("192.168.159.128:9876;192.168.159.129:9876");
        /**
         * 設定Consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費<br>
         * 如果非第一次啟動,那麼按照上次消費的位置繼續消費
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
 
        consumer.subscribe("SequenceTopicTest", "TagA || TagC || TagD");
 
        consumer.registerMessageListener(new MessageListenerConcurrently() {
 
            Random random = new Random();
 
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.print(Thread.currentThread().getName() + " Receive New Messages: " );
                for (MessageExt msg: msgs) {
                    System.out.println(msg + ", content:" + new String(msg.getBody()));
                }
                try {
                    //模擬業務邏輯處理中...
                    TimeUnit.SECONDS.sleep(random.nextInt(10));
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
 
        consumer.start();
 
        System.out.println("Consumer Started.");
    }
}

注意:要想要有順序,那麼這邊吃監聽器就不能是MessageListenerConcurrently了,其顯示效果如下:


3.正確的Consumer

package com.gwd.rocketmq;

import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;

/** 
* @FileName Consumer.java
* @Description:
* @author gu.weidong
* @version V1.0
* @createtime 2018年7月3日 上午10:05:26 
* 修改歷史:
* 時間           作者          版本        描述
*====================================================  
*
*/
/**
 * 順序訊息消費,帶事務方式(應用可控制Offset什麼時候提交)
 */
public class Consumer {
 
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
        consumer.setNamesrvAddr("192.168.159.128:9876;192.168.159.129:9876");
        /**
         * 設定Consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費<br>
         * 如果非第一次啟動,那麼按照上次消費的位置繼續消費
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
 
        consumer.subscribe("SequenceTopicTest", "TagA || TagC || TagD");
 
        consumer.registerMessageListener(new MessageListenerOrderly() {
 
            Random random = new Random();
 
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                System.out.print(Thread.currentThread().getName() + " Receive New Messages: " );
                for (MessageExt msg: msgs) {
                    System.out.println(msg + ", content:" + new String(msg.getBody()));
                }
                try {
                    //模擬業務邏輯處理中...
                    TimeUnit.SECONDS.sleep(random.nextInt(10));
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
 
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

這邊的Consumer和上面的最明顯的區別在於對應的監聽器是MessageListenerOrderlyMessageListenerOrderly是能夠保證順序消費的。

顯示結果:


4.多個消費者

那如果有多個消費者呢?因為訊息傳送時被分配到多個佇列,這些佇列又會被分別傳送給消費者唯一消費,現在啟動兩個消費者,其消費情況如下圖:



結論:多個消費者時,各個消費者的訊息依舊是順序消費,且不會重複消費

原文轉自:https://blog.csdn.net/earthhour/article/details/78323026 ,在此基礎上部分程式碼略作修改