Spring Cloud 2.x系列之整合rocketMQ
RocketMQ出了4的版本,而且本身這個mq有事務訊息,在分散式的場景中有很好的啟發性和作用,而且本身它也是阿里開源到apache的一個專案,從出身還是實力來說都很不錯的。
1、新建專案sc-rocketmq,對應的pom.xml如下
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>spring-cloud</groupId> <artifactId>sc-rocketmq</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>sc-rocketmq</name> <url>http://maven.apache.org</url> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.4.RELEASE</version> </parent> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Finchley.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.2.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.44</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> </dependencies> </project>
Producer單從分類producer的官網doc來看主要分成3種:
DefaultMQProducer
TransactionMQProducer
messagingAccessPoint.createProducer()
本文主要說的是DefaultMQProducer和TransactionMQProducer
預設的producer是DefaultMQProducer,從官方的文件來看,前四個都是對這個producer的運用只是set的值不同而已,而且是很細微的變化而已。
2、新建配置檔案application.yml
server: port: 8182 spring: application: name: sc-rocketmq rocketmq: consumer: groupName: consumerGroup # 消費者的組名 consumeThreadMin: 2 consumeThreadMax: 5 consumeMessageBatchMaxSize: 10 topics: rocketTopic,rocketTag producer: groupName: producerGroup # 生產者的組名 maxMessageSize: 100 sendMsgTimeout: 1000 retryTimesWhenSendFailed: 3 namesrvAddr: 127.0.0.1:9876 # NameServer地址
3、新建訊息生產者類
讀取application.yml配置:
package sc.rocketmq.config; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; @ConfigurationProperties(prefix = "rocketmq.producer") @Configuration public class ProducerConfig { private String namesrvAddr; private String groupName; public String getNamesrvAddr() { return namesrvAddr; } public void setNamesrvAddr(String namesrvAddr) { this.namesrvAddr = namesrvAddr; } public String getGroupName() { return groupName; } public void setGroupName(String groupName) { this.groupName = groupName; } @Override public String toString() { return "ProducerConfig [namesrvAddr=" + namesrvAddr + ", groupName=" + groupName + "]"; } }
訊息生產者:
package sc.rocketmq.config; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class ProducerConfigure { Logger log = LoggerFactory.getLogger(ProducerConfigure.class); @Autowired private ProducerConfig producerConfigure; /** * 建立普通訊息傳送者例項 * * @return * @throws MQClientException */ @Bean //@ConditionalOnProperty(prefix = "rocketmq.producer", value = "default", havingValue = "true") public DefaultMQProducer defaultProducer() throws MQClientException { log.info(producerConfigure.toString()); log.info("defaultProducer 正在建立---------------------------------------"); DefaultMQProducer producer = new DefaultMQProducer(producerConfigure.getGroupName()); producer.setNamesrvAddr(producerConfigure.getNamesrvAddr()); producer.setVipChannelEnabled(false); producer.setRetryTimesWhenSendAsyncFailed(10); producer.start(); log.info("rocketmq producer server開啟成功---------------------------------."); return producer; } }
4、新建訊息消費者類
讀取application.yml配置:
package sc.rocketmq.config; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; @ConfigurationProperties(prefix = "rocketmq.consumer") @Configuration public class ConsumerConfig { private String groupName; private String namesrvAddr; public String getGroupName() { return groupName; } public void setGroupName(String groupName) { this.groupName = groupName; } public String getNamesrvAddr() { return namesrvAddr; } public void setNamesrvAddr(String namesrvAddr) { this.namesrvAddr = namesrvAddr; } @Override public String toString() { return "ConsumerConfig [groupName=" + groupName + ", namesrvAddr=" + namesrvAddr + "]"; } }
訊息消費者類(抽象類):
package sc.rocketmq.config; import java.util.List; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; @Configuration public abstract class DefaultConsumerConfigure { Logger log = LoggerFactory.getLogger(DefaultConsumerConfigure.class); @Autowired private ConsumerConfig consumerConfig; // 開啟消費者監聽服務 public void listener(String topic, String tag) throws MQClientException { log.info("開啟" + topic + ":" + tag + "消費者-------------------"); log.info(consumerConfig.toString()); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerConfig.getGroupName()); consumer.setNamesrvAddr(consumerConfig.getNamesrvAddr()); consumer.subscribe(topic, tag); // 開啟內部類實現監聽 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { return DefaultConsumerConfigure.this.dealBody(msgs); } }); consumer.start(); log.info("rocketmq啟動成功---------------------------------------"); } // 處理body的業務 public abstract ConsumeConcurrentlyStatus dealBody(List<MessageExt> msgs); }
具體訊息消費者類:
package sc.rocketmq.service; import java.io.UnsupportedEncodingException; import java.util.List; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationListener; import org.springframework.context.annotation.Configuration; import org.springframework.context.event.ContextRefreshedEvent; import sc.rocketmq.config.DefaultConsumerConfigure; @Configuration public class CustomConsumer extends DefaultConsumerConfigure implements ApplicationListener<ContextRefreshedEvent> { Logger log = LoggerFactory.getLogger(CustomConsumer.class); @Override public void onApplicationEvent(ContextRefreshedEvent arg0) { try { super.listener("t_TopicTest", "Tag1"); } catch (MQClientException e) { log.error("消費者監聽器啟動失敗", e); } } @Override public ConsumeConcurrentlyStatus dealBody(List<MessageExt> msgs) { int num = 1; log.info("進入"); for (MessageExt msg : msgs) { log.info("第" + num + "次訊息"); try { String msgStr = new String(msg.getBody(), "utf-8"); log.info(msgStr); } catch (UnsupportedEncodingException e) { log.error("body轉字串解析失敗"); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
這個CustomConsumer類實現了ApplicationListener,讓他在啟動的時候就開啟執行DefaultConsumerConfigure的listener方法
5、新建springboot啟動類RocketMqApplication.java
package sc.rocketmq; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class RocketMqApplication { public static void main(String[] args) { SpringApplication.run(RocketMqApplication.class, args); } }
6、新建一個Controller,引入訊息生產者
package sc.rocketmq.controller; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import com.alibaba.fastjson.JSON; import sc.rocketmq.service.CustomConsumer; @RestController public class ProducerController { Logger log = LoggerFactory.getLogger(CustomConsumer.class); @Autowired private DefaultMQProducer producer; // @Autowired // private TransactionMQProducer producer; //@Autowired //private TestTransactionListener testTransactionListener; @GetMapping("/msg/product") public void test(String info) throws Exception { Message message = new Message("TopicTest", "Tag1", "12345", "rocketmq測試成功".getBytes()); // 這裡用到了這個mq的非同步處理,類似ajax,可以得到傳送到mq的情況,並做相應的處理 // 不過要注意的是這個是非同步的 producer.send(message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("傳輸成功"); log.info(JSON.toJSONString(sendResult)); } @Override public void onException(Throwable e) { log.error("傳輸失敗", e); } }); } }
7、驗證是否成功
訪問http://127.0.0.1:8080/msg/product
可以看到controller產生訊息,然後CustomConsumer類的dealBody方法訊息訊息。
原始碼:
https://gitee.com/hjj520/spring-cloud-2.x/tree/master/sc-apache-rocketmq
本文作者: java樂園
本文來自雲棲社群合作伙伴“ ofollow,noindex">JAVA樂園 ”,瞭解相關資訊可以關注“ JAVA樂園 ”