前言

RocketMQ是阿里巴巴在2012年開源的分散式訊息中介軟體,記錄下SpringBoot整合RocketMQ的方式,RocketMQ的安裝可以檢視:Windows下安裝RocketMQ

環境

SpringBoot2.5.3 + RocketMQ4.7.0

具體實現

  • pom.xml
<!-- rocketmq -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>
  • application.yml
rocketmq:
producer:
producer-group: CoisiniProducerGroup
consumer:
consumer-group: CoisiniConsumerGroup
namesrv-addr: 127.0.0.1:9876
  • MQ生產者
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Objects; /**
* @Description MQ生產者
* @author coisini
* @date Aug 25, 2021
* @Version 1.0
*/
@Component
public class ProducerSchedule { private DefaultMQProducer producer; @Value("${rocketmq.producer.producer-group}")
private String producerGroup; @Value("${rocketmq.namesrv-addr}")
private String nameSrvAddr; public ProducerSchedule() { } /**
* 生產者構造
* @PostConstruct該註解被用來修飾一個非靜態的void()方法
* Bean初始化的執行順序:
* Constructor(構造方法) -> @Autowired(依賴注入) -> @PostConstruct(註釋的方法)
*/
@PostConstruct
public void defaultMQProducer() {
if (Objects.isNull(this.producer)) {
this.producer = new DefaultMQProducer(this.producerGroup);
this.producer.setNamesrvAddr(this.nameSrvAddr);
} try {
this.producer.start();
System.out.println("Producer start");
} catch (MQClientException e) {
e.printStackTrace();
}
} /**
* 訊息釋出
* @param topic
* @param tag
* @param messageText
* @return
*/
public String send(String topic, String messageText){
Message message = new Message(topic, messageText.getBytes()); /**
* 延遲訊息級別設定
* messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
*/
message.setDelayTimeLevel(4); SendResult result = null;
try {
result = this.producer.send(message);
System.out.println("MessageQueue: " + result.getMessageQueue());
System.out.println("MsgId: " + result.getMsgId());
System.out.println("SendStatus: " + result.getSendStatus());
} catch (Exception e) {
e.printStackTrace();
} return result.getMsgId();
}
}
  • MQ消費者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component; /**
* @Description MQ消費者
* CommandLineRunner 初始化預載入資料
* @author coisini
* @date Aug 25, 2021
* @Version 1.0
*/
@Component
public class ConsumerSchedule implements CommandLineRunner { @Value("${rocketmq.consumer.consumer-group}")
private String consumerGroup; @Value("${rocketmq.namesrv-addr}")
private String nameSrvAddr; public void messageListener() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(this.consumerGroup);
consumer.setNamesrvAddr(this.nameSrvAddr); /**
* 訂閱主題
*/
consumer.subscribe("Topic", "*"); /**
* 設定消費訊息數
*/
consumer.setConsumeMessageBatchMaxSize(1); /**
* 註冊訊息監聽
*/
consumer.registerMessageListener((MessageListenerConcurrently) (messages, context) -> {
for (Message message : messages) {
System.out.println("監聽到訊息:" + new String(message.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}); consumer.start();
} @Override
public void run(String... args) throws Exception {
this.messageListener();
}
}
  • 測試介面
@RestController
@RequestMapping("/test")
public class TestController { @Autowired
private ProducerSchedule producerSchedule; @GetMapping("/push")
public void pushMessageToMQ() throws Exception {
producerSchedule.send("Topic", "Coisini");
}
}
  • 介面呼叫:

  • 30s後延遲訊息觸發:

- End -



夢想是鹹魚
關注一下吧