1. 程式人生 > >RocketMQ系列(五)廣播與延遲訊息

RocketMQ系列(五)廣播與延遲訊息

今天要給大家介紹RocketMQ中的兩個功能,一個是“廣播”,這個功能是比較基礎的,幾乎所有的mq產品都是支援這個功能的;另外一個是“延遲消費”,這個應該算是RocketMQ的特色功能之一了吧。接下來,我們就分別看一下這兩個功能。 ## 廣播 廣播是把訊息傳送給訂閱了這個主題的所有消費者。這個定義很清楚,但是這裡邊的知識點你都掌握了嗎?咱們接著說“廣播”的機會,把消費者這端的內容好好和大家說說。 * 首先,消費者端的概念中,最大的應該是消費者組,一個消費者組中可以有多個消費者,這些消費者必須訂閱同一個Topic。 * 那麼什麼算是一個消費者呢?我們在寫消費端程式時,看到了setConsumeThreadMax這個方法,設定消費者的執行緒數,難道一個執行緒就是一個消費者?錯!這裡的一個消費者是一個程序,你可以理解為ip+埠。如果在同一個應用中,你例項化了兩個消費者,這兩個消費者配置了相同的消費者組名稱,那麼應用程式啟動時會報錯的,這裡不給大家演示了,感興趣的小夥伴私下裡試一下吧。 * 同一個訊息,可以被不同的消費者組同時消費。假設,我有兩個消費者組cg-1和cg-2,這兩個消費者組訂閱了同一個Topic,那麼這個Topic的訊息會被cg-1和cg-2同時消費。那這是不是廣播呢?錯!當然不是廣播,廣播是同一個消費者組中的多個消費者都消費這個訊息。如果配置的不是廣播,像前幾個章節中的那樣,一個訊息只能被一個消費者組消費一次。 好了,說了這麼多,我們實驗一下吧,先把消費者配置成廣播,如下: ```java @Bean(name = "broadcast", initMethod = "start",destroyMethod = "shutdown") public DefaultMQPushConsumer broadcast() throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broadcast"); consumer.setNamesrvAddr("192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;"); consumer.subscribe("cluster-topic","*"); consumer.setMessageModel(MessageModel.BROADCASTING); consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); return consumer; } ``` * 其中,NameServer,訂閱的Topic都沒有變化。 * 注意其中`consumer.setMessageModel(MessageModel.BROADCASTING);`這段程式碼,設定消費者為廣播。咱們可以看一下,`MessageModel`列舉中只有兩個值,`BROADCASTING`和`CLUSTERING`,預設為`CLUSTERING`。 因為要測試廣播,所以我們要啟動多個消費者,還記得什麼是消費者嗎?對了,一個ip+埠算是一個消費者,在這裡我們啟動兩個應用,埠分別是8080和8081。傳送端的程式不變,如下: ```java @Test public void producerTest() throws Exception { for (int i = 0;i<5;i++) { MessageExt message = new MessageExt(); message.setTopic("cluster-topic"); message.setKeys("key-"+i); message.setBody(("this is simpleMQ,my NO is "+i+"---"+new Date()).getBytes()); SendResult sendResult = defaultMQProducer.send(message); System.out.println("i=" + i); System.out.println("BrokerName:" + sendResult.getMessageQueue().getBrokerName()); } } ``` 我們執行一下發送端的程式,日誌如下: ```shell i=0 BrokerName:broker-a i=1 BrokerName:broker-a i=2 BrokerName:broker-b i=3 BrokerName:broker-b i=4 BrokerName:broker-b ``` 再來看看8080埠的應用後臺打印出來的日誌: ![](https://img2020.cnblogs.com/blog/1191201/202006/1191201-20200611145552047-1089641273.png) 消費了5個訊息,再看看8081的後臺列印的日誌, ![](https://img2020.cnblogs.com/blog/1191201/202006/1191201-20200611145602335-1419661476.png) 也消費了5個。兩個消費者同時消費了訊息,這就是廣播。有的小夥伴可能會有疑問了,如果不設定廣播,會怎麼樣呢?私下裡實驗一下吧,上面的程式中,只要把設定廣播的那段程式碼註釋掉就可以了。執行的結果當然是只有一個消費者可以消費訊息。 ## 延遲訊息 延遲訊息是指消費者過了一個指定的時間後,才去消費這個訊息。大家想象一個電商中場景,一個訂單超過30分鐘未支付,將自動取消。這個功能怎麼實現呢?一般情況下,都是寫一個定時任務,一分鐘掃描一下超過30分鐘未支付的訂單,如果有則被取消。這種方式由於每分鐘查詢一下訂單,一是時間不精確,二是查庫效率比較低。這個場景使用RocketMQ的延遲訊息最合適不過了,我們看看怎麼傳送延遲訊息吧,傳送端程式碼如下: ```java @Test public void producerTest() throws Exception { for (int i = 0;i<1;i++) { MessageExt message = new MessageExt(); message.setTopic("cluster-topic"); message.setKeys("key-"+i); message.setBody(("this is simpleMQ,my NO is "+i+"---"+new Date()).getBytes()); message.setDelayTimeLevel(2); SendResult sendResult = defaultMQProducer.send(message); System.out.println("i=" + i); System.out.println("BrokerName:" + sendResult.getMessageQueue().getBrokerName()); } } ``` * 我們只是增加了一句`message.setDelayTimeLevel(2);` * 為了方便,這次我們只發送一個訊息。 setDelayTimeLevel是什麼意思,設定的是2,難道是2s後消費嗎?怎麼引數也沒有時間單位呢?如果我要自定義延遲時間怎麼辦?我相信很多小夥伴都有這樣的疑問,我也是帶著這樣的疑問查了很多資料,最後在RocketMQ的Github官網上看到了說明, * 在RocketMQ的原始碼中,有一個MessageStoreConfig類,這個類中定義了延遲的時間,我們看一下, ```java // org/apache/rocketmq/store/config/MessageStoreConfig.java private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; ``` * 我們在程式中設定的是2,那麼這個訊息將在5s以後被消費。 * 目前RocketMQ還不支援自定義延遲時間,延遲時間只能從上面的時間中選。如果你非要定義一個時間怎麼辦呢?RocketMQ是開源的,下載程式碼,把上面的時間改一下,再打包部署,就OK了。 再看看消費端的程式碼, ```java @Bean(name = "broadcast", initMethod = "start",destroyMethod = "shutdown") public DefaultMQPushConsumer broadcast() throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broadcast"); consumer.setNamesrvAddr("192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;"); consumer.subscribe("cluster-topic","*"); consumer.setMessageModel(MessageModel.BROADCASTING); consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { for (MessageExt msg : msgs) { Date now = new Date(); System.out.println("消費時間:"+now); Date msgTime = new Date(); msgTime.setTime(msg.getBornTimestamp()); System.out.println("訊息生成時間:"+msgTime); System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); return consumer; } ``` * 我們還是使用廣播的模式,沒有變。 * 打印出了當前的時間,這個時間就是消費的時間。 * 通過msg.getBornTimestamp()方法,獲得了訊息的生成時間,也打印出來,看看是不是延遲5s。 啟動兩個消費者8080和8081,傳送訊息,再看看消費者的後臺日誌, ```shell 消費時間:Thu Jun 11 14:45:53 CST 2020 訊息生成時間:Thu Jun 11 14:45:48 CST 2020 this is simpleMQ,my NO is 0---Thu Jun 11 14:45:47 CST 2020 ``` 我們看到消費時間比生成時間晚5s,符合我們的預期。這個功能還是比較實用的,如果能夠自定義延遲時間就更好了。 ## 總結 RocketMQ的這兩個知識點還是比較簡單的,大家要分清楚什麼是消費者組,什麼是消費者,什麼是消費者執行緒。另外就是延遲訊息是不支援自定義的,大家可以在Github上看一下原始碼。好了~今天就到這