RocketMQ入門案例
阿新 • • 發佈:2019-03-29
val ets max linu tco 入門 自動 line admin
學習RocketMQ,先寫一個Demo演示一下看看效果。
一、服務端部署因為只是簡單的為了演示效果,服務端僅部署單Master模式 —— 一個Name Server節點,一個Broker節點。主要有一下過程。
- 下載RocketMQ源碼、編譯(也可以網上下載編譯好的文件),這裏使用最新的4.4.0版本,下載好之後放在Linux上通過一下命令解壓縮、編譯。
unzip rocketmq-all-4.4.0-source-release.zip cd rocketmq-all-4.4.0/ mvn -Prelease-all -DskipTests clean install –U
- 編譯之後到distribution/target/apache-rocketmq目錄,後續所有操作都是在該路徑下。
cd distribution/target/apache-rocketmq
- 啟動Name Server,查看日誌確認啟動成功。
nohup sh bin/mqnamesrv & tail
- 啟動Broker,查看日誌確認啟動成功。
nohup sh bin/mqbroker -n localhost:9876 & tail -f ~/logs/rocketmqlogs/broker.log
Name Server和Broker都成功啟動,服務器就部署完成了。更詳細的參考官方文檔手冊,裏面還包含在服務器上運行Producer、Customer示例,這裏主要是在項目中使用。
官網手冊戳這裏:Quick Start
二、客戶端搭建:Spring Boot項目中使用客戶端分為消息生產者和消息消費者,這裏通過日誌打印輸出查看效果,為了看起來更清晰,我新建了兩個模塊分別作為消息生產者和消息消費者。
- 添加依賴,在兩個模塊的pom文件中添加以下配置。
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency>
- 配置生產者模塊。
- application.yml文件中增加用來初始化producer的相關配置,這裏只配了一部分,更詳細的配置參數可以查看官方文檔。
# RocketMQ生產者 rocketmq: producer: # Producer組名,多個Producer如果屬於一個應用,發送同樣的消息,則應該將它們歸為同一組。默認DEFAULT_PRODUCER producerGroup: ${spring.application.name} # namesrv地址 namesrvAddr: 192.168.101.213:9876 # 客戶端限制的消息大小,超過報錯,同時服務端也會限制,需要跟服務端配合使用。默認4MB maxMessageSize: 4096 # 發送消息超時時間,單位毫秒。默認10000 sendMsgTimeout: 5000 # 如果消息發送失敗,最大重試次數,該參數只對同步發送模式起作用。默認2 retryTimesWhenSendFailed: 2 # 消息Body超過多大開始壓縮(Consumer收到消息會自動解壓縮),單位字節。默認4096 compressMsgBodyOverHowmuch: 4096 # 在發送消息時,自動創建服務器不存在的topic,需要指定Key,該Key可用於配置發送消息所在topic的默認路由。 createTopicKey: XIAO_LIU
- 新增producer配置類,系統啟動時讀取yml文件的配置信息初始化producer。集群模式下,如果在同一個jvm中,要往多個的MQ集群發送消息,則需要創建多個的producer並設置不同的instanceName,默認不需要設置該參數。
@Configuration public class ProducerConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(ProducerConfiguration.class); /** * Producer組名,多個Producer如果屬於一個應用,發送同樣的消息,則應該將它們歸為同一組。默認DEFAULT_PRODUCER */ @Value("${rocketmq.producer.producerGroup}") private String producerGroup; /** * namesrv地址 */ @Value("${rocketmq.producer.namesrvAddr}") private String namesrvAddr; /** * 客戶端限制的消息大小,超過報錯,同時服務端也會限制,需要跟服務端配合使用。默認4MB */ @Value("${rocketmq.producer.maxMessageSize}") private Integer maxMessageSize; /** * 發送消息超時時間,單位毫秒。默認10000 */ @Value("${rocketmq.producer.sendMsgTimeout}") private Integer sendMsgTimeout; /** * 如果消息發送失敗,最大重試次數,該參數只對同步發送模式起作用。默認2 */ @Value("${rocketmq.producer.retryTimesWhenSendFailed}") private Integer retryTimesWhenSendFailed; /** * 消息Body超過多大開始壓縮(Consumer收到消息會自動解壓縮),單位字節。默認4096 */ @Value("${rocketmq.producer.compressMsgBodyOverHowmuch}") private Integer compressMsgBodyOverHowmuch; /** * 在發送消息時,自動創建服務器不存在的topic,需要指定Key,該Key可用於配置發送消息所在topic的默認路由。 */ @Value("${rocketmq.producer.createTopicKey}") private String createTopicKey; @Bean public DefaultMQProducer getRocketMQProducer() { DefaultMQProducer producer = new DefaultMQProducer(this.producerGroup); producer.setNamesrvAddr(this.namesrvAddr); producer.setCreateTopicKey(this.createTopicKey); if (this.maxMessageSize != null) { producer.setMaxMessageSize(this.maxMessageSize); } if (this.sendMsgTimeout != null) { producer.setSendMsgTimeout(this.sendMsgTimeout); } if (this.retryTimesWhenSendFailed != null) { producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed); } if (this.compressMsgBodyOverHowmuch != null) { producer.setCompressMsgBodyOverHowmuch(this.compressMsgBodyOverHowmuch); } if (Strings.isNotBlank(this.createTopicKey)) { producer.setCreateTopicKey(this.createTopicKey); } try { producer.start(); LOGGER.info("Producer Started : producerGroup:[{}], namesrvAddr:[{}]" , this.producerGroup, this.namesrvAddr); } catch (MQClientException e) { LOGGER.error("Producer Start Failed : {}", e.getMessage(), e); } return producer; } }
- 使用producer實例向MQ發送消息。
@RunWith(SpringRunner.class) @SpringBootTest public class ProducerServiceApplicationTests { private static final Logger LOGGER = LoggerFactory.getLogger(ProducerServiceApplicationTests.class); @Autowired private DefaultMQProducer defaultMQProducer; @Test public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException { for (int i = 0; i < 100; i++) { User user = new User(); user.setUsername("用戶" + i); user.setPassword("密碼" + i); user.setSex(i % 2); user.setBirthday(new Date()); Message message = new Message("user-topic", "user-tag", JSON.toJSONString(user).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = defaultMQProducer.send(message); LOGGER.info(sendResult.toString()); } } }
- application.yml文件中增加用來初始化producer的相關配置,這裏只配了一部分,更詳細的配置參數可以查看官方文檔。
- 配置消費者模塊。
- application.yml文件中增加用來初始化consumer的相關配置,同樣參數這裏只配了一部分,更詳細的配置參數可以查看官方文檔。
# RocketMQ消費者 rocketmq: consumer: # Consumer組名,多個Consumer如果屬於一個應用,訂閱同樣的消息,且消費邏輯一致,則應該將它們歸為同一組。默認DEFAULT_CONSUMER consumerGroup: ${spring.application.name} # namesrv地址 namesrvAddr: 192.168.101.213:9876 # 消費線程池最大線程數。默認10 consumeThreadMin: 10 # 消費線程池最大線程數。默認20 consumeThreadMax: 20 # 批量消費,一次消費多少條消息。默認1 consumeMessageBatchMaxSize: 1 # 批量拉消息,一次最多拉多少條。默認32 pullBatchSize: 32 # 訂閱的主題 topics: user-topic
- 新增consumer配置。
@Configuration public class ConsumerConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerConfiguration.class); @Value("${rocketmq.consumer.consumerGroup}") private String consumerGroup; @Value("${rocketmq.consumer.namesrvAddr}") private String namesrvAddr; @Value("${rocketmq.consumer.consumeThreadMin}") private int consumeThreadMin; @Value("${rocketmq.consumer.consumeThreadMax}") private int consumeThreadMax; @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}") private int consumeMessageBatchMaxSize; @Value("${rocketmq.consumer.pullBatchSize}") private int pullBatchSize; @Value("${rocketmq.consumer.topics}") private String topics; private final ConsumeMsgListener consumeMsgListener; @Autowired public ConsumerConfiguration(final ConsumeMsgListener consumeMsgListener) { this.consumeMsgListener = consumeMsgListener; } @Bean public DefaultMQPushConsumer getRocketMQConsumer() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); consumer.setNamesrvAddr(namesrvAddr); consumer.setConsumeThreadMin(consumeThreadMin); consumer.setConsumeThreadMax(consumeThreadMax); consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize); consumer.setPullBatchSize(pullBatchSize); consumer.registerMessageListener(consumeMsgListener); try { /** * 設置消費者訂閱的主題和tag。subExpression參數為*表示訂閱該主題下所有tag, * 如果需要訂閱該主題下的指定tag,subExpression設置為對應tag名稱,多個tag以||分割,例如"tag1 || tag2 || tag3" */ consumer.subscribe(topics, "*"); consumer.start(); LOGGER.info("Consumer Started : consumerGroup:{}, topics:{}, namesrvAddr:{}", consumerGroup, topics, namesrvAddr); } catch (Exception e) { LOGGER.error("Consumer Start Failed : consumerGroup:{}, topics:{}, namesrvAddr:{}", consumerGroup, topics, namesrvAddr, e); e.printStackTrace(); } return consumer; } }
-
新增消息監聽器,監聽到新消息後,執行對應的業務邏輯。
@Component public class ConsumeMsgListener implements MessageListenerConcurrently { private static final Logger LOGGER = LoggerFactory.getLogger(ConsumeMsgListener.class); @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { if (CollectionUtils.isEmpty(msgs)) { LOGGER.info("Msgs is Empty."); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } for (MessageExt msg : msgs) { try { if ("user-topic".equals(msg.getTopic())) { LOGGER.info("{} Receive New Messages: {}", Thread.currentThread().getName(), new String(msg.getBody())); // do something } } catch (Exception e) { if (msg.getReconsumeTimes() == 3) { // 超過3次不再重試 LOGGER.error("Msg Consume Failed."); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } else { // 重試 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
- application.yml文件中增加用來初始化consumer的相關配置,同樣參數這裏只配了一部分,更詳細的配置參數可以查看官方文檔。
Demo很簡單,但是裏面還有很多東西需要慢慢研究。
代碼可以戳這裏:spring-cloud-learn
RocketMQ入門案例