rocketmq實戰入門
阿新 • • 發佈:2019-01-07
1.pom檔案
<dependencies>
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>3.2.6</version>
</dependency>
</dependencies>
生產者 :
消費者0:package com.sun.rocketmq.qs; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; /** * * @ClassName: rocket生產者 * @Description: TODO * @author: sunqz * @date: 2017-6-3 下午8:11:50 */ public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("hwp"); //引數為組名 producer.setNamesrvAddr("192.168.1.244:9876;192.168.1.245:9876;192.168.1.242:9876;192.168.1.243:9876");//指定nameserver地址 producer.setRetryTimesWhenSendFailed(5); //訊息傳送失敗重試次數 producer.start(); for (int i = 0; i < 10; i++) { try { Message msg = new Message("TopicTest",// topic "TagA",// tag ("Hello RocketMQ " + i).getBytes()// body ); SendResult sendResult = producer.send(msg,1000);//該訊息1秒沒傳送成功則重試 System.out.println(sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } }
package com.sun.rocketmq.qs; import java.io.UnsupportedEncodingException; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; /** * Consumer,訂閱訊息 */ public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { /** * PushConsumer Consumer 的一種,應用通常吐 Consumer 物件註冊一個 Listener 介面,一旦收到訊息,Consumer 物件立 * 刻回撥 Listener 介面方法。 */ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("hw"); consumer.setNamesrvAddr("192.168.1.244:9876;192.168.1.245:9876;192.168.1.242:9876;192.168.1.243:9876"); /** * ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET一個新的訂閱組第一次啟動從佇列的最前位置開始消費,後續再啟動接著上次消費的進度開始消費 * ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET 一個新的訂閱組第一次啟動從佇列的最後位置開始消費,後續再啟動接著上次消費的進度開始消費 * ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST @Deprecated * ConsumeFromWhere.CONSUME_FROM_MAX_OFFSET @Deprecated * ConsumeFromWhere.CONSUME_FROM_MIN_OFFSET @Deprecated * ConsumeFromWhere.CONSUME_FROM_TIMESTAMP 一個新的訂閱組第一次啟動從指定時間點開始消費,後續再啟動接著上次消費的進度開始消費, * 時間點設定參見DefaultMQPushConsumer.consumeTimestamp引數 * * * MessageModel.CLUSTERING 叢集消費,一個 Consumer Group 中的 Consumer 例項平均分攤消費訊息。例如某個 Topic 有 9 條訊息,其中一個 Consumer Group 有 3 個例項(可能是 3 個迕程,戒者 3 臺機器),那舉每個例項只消費其中的 3 條訊息 * MessageModel.BROADCASTING 廣播 * */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.setMessageModel(MessageModel.CLUSTERING); /** * 主題Tpoic:第一級訊息型別,書的標題 * 標籤Tags:第二級訊息型別,書的目錄,可以基於Tag做訊息過濾 */ consumer.subscribe("TopicTest", //指定消費主題是topicTest "TagA||TagB"); //tag為TagA tagB的訊息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { /** * List<MessageExt> msgs 這裡雖然是list 但實際上基本都只會是一條,除非訊息堆積, * 且記住一定是先啟動消費者,再啟動生產者 * 否則極有可能導致訊息的重複消費 * */ for(MessageExt mext : msgs) { try { System.out.println("消費了一條訊息:"+new String(mext.getBody(),"utf-8")); } catch (UnsupportedEncodingException e) { // TODO Auto-generated catch block e.printStackTrace(); //消費失敗告訴mq重新發送繼續消費 如果多次消費仍不成功可以記錄在資料庫中,可以通過mext.getReconsumeTimes()獲取消費次數 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } /* * 告訴mq消費成功 */ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
消費者1:
package com.sun.rocketmq.qs; import java.io.UnsupportedEncodingException; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; /** * Consumer,訂閱訊息 */ public class Consumer1 { public static void main(String[] args) throws InterruptedException, MQClientException { /** * PushConsumer Consumer 的一種,應用通常吐 Consumer 物件註冊一個 Listener 介面,一旦收到訊息,Consumer 物件立 * 刻回撥 Listener 介面方法。 */ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("hw"); consumer.setNamesrvAddr("192.168.1.244:9876;192.168.1.245:9876;192.168.1.242:9876;192.168.1.243:9876"); /** * ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET一個新的訂閱組第一次啟動從佇列的最前位置開始消費,後續再啟動接著上次消費的進度開始消費 * ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET 一個新的訂閱組第一次啟動從佇列的最後位置開始消費,後續再啟動接著上次消費的進度開始消費 * ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST @Deprecated * ConsumeFromWhere.CONSUME_FROM_MAX_OFFSET @Deprecated * ConsumeFromWhere.CONSUME_FROM_MIN_OFFSET @Deprecated * ConsumeFromWhere.CONSUME_FROM_TIMESTAMP 一個新的訂閱組第一次啟動從指定時間點開始消費,後續再啟動接著上次消費的進度開始消費, * 時間點設定參見DefaultMQPushConsumer.consumeTimestamp引數 * * * MessageModel.CLUSTERING 叢集消費,一個 Consumer Group 中的 Consumer 例項平均分攤消費訊息。例如某個 Topic 有 9 條訊息,其中一個 Consumer Group 有 3 個例項(可能是 3 個迕程,戒者 3 臺機器),那舉每個例項只消費其中的 3 條訊息 * MessageModel.BROADCASTING 廣播 被每個消費組消費 * */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.setMessageModel(MessageModel.CLUSTERING); /** * 主題Tpoic:第一級訊息型別,書的標題 * 標籤Tags:第二級訊息型別,書的目錄,可以基於Tag做訊息過濾 */ consumer.subscribe("TopicTest", //指定消費主題是topicTest "TagA||TagB"); //tag為TagA tagB的訊息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { /** * List<MessageExt> msgs 這裡雖然是list 但實際上基本都只會是一條,除非訊息堆積, * 且記住一定是先啟動消費者,再啟動生產者 * 否則極有可能導致訊息的重複消費 * */ for(MessageExt mext : msgs) { try { System.out.println("消費了一條訊息:"+new String(mext.getBody(),"utf-8")); } catch (UnsupportedEncodingException e) { // TODO Auto-generated catch block e.printStackTrace(); //消費失敗告訴mq重新發送繼續消費 如果多次消費仍不成功可以記錄在資料庫中,可以通過mext.getReconsumeTimes()獲取消費次數 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } /* * 告訴mq消費成功 */ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
先啟動兩個消費者,然後啟動生產者,生產者將產生10條訊息,下圖是列印資訊:
通過列印資訊我們可以看到有的訊息被髮送到了broker-a ,有的訊息被髮送到了broker-b 實現了自動負載均衡
在叢集消費模式下,訊息會被消費組裡的負載均衡消費
customer0 ,列印訊息:
customer1,列印: