1. 程式人生 > >RocketMq——順序消費和事務

RocketMq——順序消費和事務

RocketMQ不遵循JMS規範,自己有一套自定義的機制,使用訂閱主題的方式傳送和接收任務,支援廣播叢集兩種消費模式。

叢集模式:設定消費端物件屬性:MessageModel.CLUSTERING,這種方式就可以達到ActiveMQ水平擴充套件負載均衡消費訊息的實現。比較特殊的是這種行為可以支援先發送資料(生產端先發送到MQ),消費端訂閱主題發生在生產端之後也可以收到資料,比較靈活。

廣播模式:設定消費端物件屬性:MessageModel.BROADCASTING,相當於生產端傳送資料到MQ,多個消費端都可以獲得資料。

在RocketMQ裡有個很重要的概念,就是GroupName,無論是生產端還是消費端,都必須指定一個GroupName,這個組名稱是維護在應用系統級別上。

比如生產端指定一個ProduccerGroupName,這個名稱需要由應用系統來保證唯一性,一類Producer集合的名稱,這類Producer通常傳送一類訊息,且傳送邏輯一致。消費端同理。

Topic主題,每個主題表示一個邏輯上的儲存概念,而在MQ上,會有著與之對應的多個Queue佇列,這個是物理儲存的概念

RocketMQ提供了三種不同的producer:

1.NomalProducer 普通
2.OrderProducer 順序
3.TransactionProducer 事務

1.普通模式:使用傳統的send傳送訊息,不能保證訊息的順序一致性。

2.順序模式:可以嚴格的保證訊息的順序執行。遵循全域性順序的時候使用一個queue,區域性順序使用多個queue並行消費。

3.事務模式:支援事務方式對訊息進行提交處理,在rocket裡事務分兩個階段

第一個階段把訊息傳給MQ,只不過消費端不可見,但資料其實已經在Broker上了。

第二個階段為本地訊息回撥處理,如果都成功返回COMMIT_MESSAGE,則在broker上的資料對消費端可見,失敗則為ROLLBACK_MESSAGE,消費端不可見。

順序消費

//如果使用順序消費,則必須自己實現MessageQueueSelector,保證訊息進入同一個佇列中
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {

    @Override
    public
MessageQueue select(List<MessageQueue> msgs, Message msg, Object arg) { Integer id = (Integer)arg; return msgs.get(id); } }, 0);

broker只保證訊息是順序傳送到消費端,但若消費端是多執行緒的,可能收到的第二個訊息會比第一個訊息處理得更快。

//messageListenerOrderly 保證順序消費,消費端接收的是同一個佇列的訊息,避免多執行緒時順序錯亂
consumer.registerMessageListener(new MessageListenerOrderly() {

    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> arg0, ConsumeOrderlyContext arg1) {

        return null;
    }
});

consumer開啟多執行緒,只需設定consumer的執行緒數。

consumer.setConsumeThreadMax(10);
consumer.setConsumeThreadMin(10);

事務

生產端先將憑證訊息傳送到broker伺服器上,憑證訊息對消費端不可見;再回調執行本地事務,若執行成功則返回COMMIT,broker再將憑證訊息對消費端可見,若失敗返回ROLLBACK。

Producer:

public class Producer {

    public static void main(String[] args) throws MQClientException, InterruptedException {
        String group_name = "transaction_producer";

        final TransactionMQProducer producer = new TransactionMQProducer(group_name);

        producer.setNamesrvAddr("192.168.0.2:9876;192.168.0.3:9876");
        producer.setCheckRequestHoldMax(200);
        producer.setCheckThreadPoolMaxSize(20);
        producer.setCheckThreadPoolMinSize(5);
        producer.start();
        //伺服器回撥producer,檢查本地事務分支成功還是失敗
        producer.setTransactionCheckListener(new TransactionCheckListener() {

            @Override
            public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
                System.out.println("state --" + new String(msg.getBody()));
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });

        TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();

        for(int i=0; i<2 ; i++) {
            try {
                Message msg = new Message("TopicTransaction","transaction" + i,"key",("hello " + i).getBytes());
                SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, "tq");
                System.out.println(sendResult);
            }catch(Exception e) {
                e.printStackTrace();
            }
        }
        Thread.sleep(3000);
        producer.shutdown();
    }

}

TransactionExecuterImpl:

/*
 * 執行本地事務,由客戶端回撥
 */
public class TransactionExecuterImpl implements LocalTransactionExecuter {

    @Override
    public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
        System.out.println("msg :" + new String(msg.getBody()));
        System.out.println("arg :" + arg);
        String tag = msg.getTags();
        if(tag.equals("transaction1")) {
            //這裡有一個分階段提交任務概念
            System.out.println("這裡處理業務邏輯,如操作資料庫,失敗情況下進行ROLLBACK");
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }

        return LocalTransactionState.COMMIT_MESSAGE;
        //return LocalTransactionState.UNKNOW;
    }

}

Consumer:

public class Consumer {

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_consumer");
        //設定consumer第一次啟動是從佇列頭部開始還是尾部開始消費,若非第一次啟動,那麼按照上次消費的位置繼續消費
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicTransaction", "*");
        //批量消費,一次消費多少條訊息,預設為1條,最大情況能拿多少條不代表每次能拿這麼多條
        //consumer.setConsumeMessageBatchMaxSize(3);
        //messageListenerOrderly 保證順序消費,消費端接收的是同一個佇列的訊息,避免多執行緒時順序錯亂
        /*consumer.registerMessageListener(new MessageListenerOrderly() {

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> arg0, ConsumeOrderlyContext arg1) {

                return null;
            }
        });*/
        consumer.setConsumeThreadMax(10);
        consumer.setConsumeThreadMin(10);
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                //System.out.println(Thread.currentThread().getName() + "Receive: " + msgs);
                //獲取一次性消費多少條訊息
                //System.out.println("訊息條數 : " + msgs.size());
                MessageExt msg1 = null;
                try {
                    for(MessageExt msg : msgs) {
                        msg1 = msg;
                        String topic = msg.getTopic();
                        String msgbody = new String(msg.getBody(),"utf-8");
                        String tag = msg.getTags();
                        System.out.println("收到訊息: " + "topic:" + topic + " tags:" + tag + " msg:" + msgbody);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    //若已經重試了5次則不再重試
                    if(msg1.getReconsumeTimes() == 5) {
                        //此處記錄日誌操作。。。
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.setNamesrvAddr("192.168.0.2:9876;192.168.0.3:9876");
        consumer.start();
        System.out.println("Consumer started...");
    }

}

Producer console:

msg :hello 0
arg :tq
SendResult [sendStatus=SEND_OK, msgId=C0A8000200002A9F0000000000003B1C, messageQueue=MessageQueue [topic=TopicTransaction, brokerName=broker-a, queueId=0], queueOffset=0]
msg :hello 1
arg :tq
這裡處理業務邏輯,如操作資料庫,失敗情況下進行ROLLBACK
SendResult [sendStatus=SEND_OK, msgId=C0A8000200002A9F0000000000003C9E, messageQueue=MessageQueue [topic=TopicTransaction, brokerName=broker-a, queueId=1], queueOffset=0]

Consumer console :

收到訊息: topic:TopicTransaction tags:transaction0 msg:hello 0