新的閱讀體驗:http://www.zhouhong.icu/post/157

一、業務需求

  需要實現一個提前二十分鐘通知使用者去做某件事的一個業務,拿到這個業務首先想到的最簡單得方法就是使用Redis監控Key值:在排計劃時候計算當前時間與提前二十分鐘這個時間差,然後使用一個唯一的業務Key壓入Redis中並設定好過期時間,然後只需要讓Redis監控這個Key值即可,當這個Key過期後就可以直接拿到這個Key的值然後實現發訊息等業務。

  關於Redis實現該業務的具體實現在之前我已經記過一篇筆記,有興趣的可以直接去瞅瞅,但是現在感覺有好多不足之處。

       Redis實現定時: http://www.zhouhong.icu/post/144

二、Redis實現定時推送等功能的不足之處

  由於Redis不止你一個使用,其他業務也會使用Redis,那麼最容易想到的一個缺點就是:1、如果在提醒的那一刻有大量的其他業務的Key也過期了,那麼就會很長時間都輪不到你的這個Key,就會出現訊息推送延遲等缺點;2、還有一個缺點就是像阿里雲他們的Redis根本就不支援對 Redis 的 Key值得監控(我也是因為公司使用阿里雲的Redis沒法對Key監控才從之前使用Redis監控轉移到使用RocketMQ的延時訊息推送的。。。)

三、阿里雲RocketMQ定時/延遲訊息佇列實現

  其實在實現上非常簡單

1、首先去阿里雲控制檯建立所需訊息佇列資源,包括訊息佇列 RocketMQ 的例項、Topic、Group ID (GID),以及鑑權需要的 AccessKey(AK),一般公司都有現成的可以直接使用。
2、在springboot專案pom.xml新增需要的依賴。
<!--阿里雲MQ TCP-->
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.7.1.Final</version>
</dependency>
3、在對應環境的application.properties檔案配置引數
console:
rocketmq:
tcp:
accessKey: XXXXXXXX使用自己的
secretKey: XXXXXXXXXXXXX使用自己的
nameSrvAddr: XXXXXXXXXXXXXXXX使用自己的
topic: XXXXXXX使用自己的
groupId: XXXXXXX使用自己的
tag: XXXXXXXXX使用自己的
4、封裝MQ配置類
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary; import java.util.Properties;
/**
* @Description: MQ配置類
* @Author: zhouhong
* @Date: 2021/8/4
*/
@Configuration
@EnableConfigurationProperties({PatrolMqConfig.class})
@ConfigurationProperties(prefix = "console.rocketmq.tcp")
@Primary
public class PatrolMqConfig { private String accessKey;
private String secretKey;
private String nameSrvAddr;
private String topic;
private String groupId;
private String tag;
private String orderTopic;
private String orderGroupId;
private String orderTag; public Properties getMqPropertie() {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
return properties;
}
public String getAccessKey() {
return accessKey;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
public String getSecretKey() {
return secretKey;
}
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
public String getNameSrvAddr() {
return nameSrvAddr;
}
public void setNameSrvAddr(String nameSrvAddr) {
this.nameSrvAddr = nameSrvAddr;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getGroupId() {
return groupId;
}
public void setGroupId(String groupId) {
this.groupId = groupId;
}
public String getTag() {
return tag;
}
public void setTag(String tag) {
this.tag = tag;
}
public String getOrderTopic() {
return orderTopic;
}
public void setOrderTopic(String orderTopic) {
this.orderTopic = orderTopic;
}
public String getOrderGroupId() {
return orderGroupId;
}
public void setOrderGroupId(String orderGroupId) {
this.orderGroupId = orderGroupId;
}
public String getOrderTag() {
return orderTag;
}
public void setOrderTag(String orderTag) {
this.orderTag = orderTag;
}
}
5、配置生產者
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.honyar.iot.ibs.smartpatrol.modular.mq.tcp.config.PatrolMqConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; @Configuration
public class PatrolProducerClient { @Autowired
private PatrolMqConfig mqConfig;
@Bean(name = "ConsoleProducer", initMethod = "start", destroyMethod = "shutdown")
public ProducerBean buildProducer() {
ProducerBean producer = new ProducerBean();
producer.setProperties(mqConfig.getMqPropertie());
return producer;
}
}
6、消費者訂閱
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.honyar.iot.ibs.smartpatrol.modular.mq.tcp.config.PatrolMqConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties; //專案中加上 @Configuration 註解,這樣服務啟動時consumer也啟動了
@Configuration
@Slf4j
public class PatrolConsumerClient { @Autowired
private PatrolMqConfig mqConfig; @Autowired
private MqTimeMessageListener messageListener; @Bean(initMethod = "start", destroyMethod = "shutdown")
public ConsumerBean buildConsumer() {
ConsumerBean consumerBean = new ConsumerBean();
//配置檔案
Properties properties = mqConfig.getMqPropertie();
properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId());
//將消費者執行緒數固定為20個 20為預設值
properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
consumerBean.setProperties(properties);
//訂閱關係
Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>();
Subscription subscription = new Subscription();
subscription.setTopic(mqConfig.getTopic());
subscription.set);
subscriptionTable.put(subscription, messageListener);
//訂閱多個topic如上面設定
consumerBean.setSubscriptionTable(subscriptionTable);
System.err.println("訂閱成功!");
return consumerBean;
}
}
7、定時延時MQ訊息監聽消費
/**
* @Description: 定時/延時MQ訊息監聽消費
* @Author: zhouhong
* @Create: 2021-08-03 09:16
**/
@Component
public class MqTimeMessageListener implements MessageListener {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public Action consume(Message message, ConsumeContext context) {
System.err.println("收到訊息啦!!");
logger.info("接收到MQ訊息 -- Topic:{}, tag:{},msgId:{} , Key:{}, body:{}",
message.getTopic(),message.getTag(),message.getMsgID(),message.getKey(),new String(message.getBody()));
try {
String msgTag = message.getTag(); // 訊息型別
String msgKey = message.getKey(); // 業務唯一id
switch (msgTag) {
case "XXXX":
// TODO 具體業務實現,比如發訊息等操作
System.err.println("推送成功!!!!");
break;
}
return Action.CommitMessage;
} catch (Exception e) {
logger.error("消費MQ訊息失敗! msgId:" + message.getMsgID()+"----ExceptionMsg:"+e.getMessage());
//消費失敗,告知伺服器稍後再投遞這條訊息,繼續消費其他訊息
return Action.ReconsumeLater;
}
}
}
8、封裝一個發延時/定時訊息的工具類
/**
* @Description: MQ傳送訊息助手
* @Author: zhouhong
* @Create: 2021-08-03 09:06
**/
@Component
public class ProducerUtil {
private Logger logger = LoggerFactory.getLogger(ProducerUtil.class);
@Autowired
private PatrolMqConfig config;
@Resource(name = "ConsoleProducer")
ProducerBean producerBean;
public SendResult sendTimeMsg(String msgTag,byte[] messageBody,String msgKey,long delayTime) {
Message msg = new Message(config.getTopic(),msgTag,msgKey,messageBody);
msg.setStartDeliverTime(delayTime);
return this.send(msg,Boolean.FALSE);
}
/**
* 普通訊息傳送發放
* @param msg 訊息
* @param isOneWay 是否單向傳送
*/
private SendResult send(Message msg,Boolean isOneWay) {
try {
if(isOneWay) {
//由於在 oneway 方式傳送訊息時沒有請求應答處理,一旦出現訊息傳送失敗,則會因為沒有重試而導致資料丟失。
//若資料不可丟,建議選用同步或非同步傳送方式。
producerBean.sendOneway(msg);
success(msg, "單向訊息MsgId不返回");
return null;
}else {
//可靠同步傳送
SendResult sendResult = producerBean.send(msg);
//獲取傳送結果,不拋異常即傳送成功
if (sendResult != null) {
success(msg, sendResult.getMessageId());
return sendResult;
}else {
error(msg,null);
return null;
}
}
} catch (Exception e) {
error(msg,e);
return null;
}
}
private ExecutorService threads = Executors.newFixedThreadPool(3);
private void error(Message msg,Exception e) {
logger.error("傳送MQ訊息失敗-- Topic:{}, Key:{}, tag:{}, body:{}"
,msg.getTopic(),msg.getKey(),msg.getTag(),new String(msg.getBody()));
logger.error("errorMsg --- {}",e.getMessage());
}
private void success(Message msg,String messageId) {
logger.info("傳送MQ訊息成功 -- Topic:{} ,msgId:{} , Key:{}, tag:{}, body:{}"
,msg.getTopic(),messageId,msg.getKey(),msg.getTag(),new String(msg.getBody()));
}
}
9、介面測試(10000表示延遲10秒,可以根據自己的業務計算出)
// 測試MQ延時
@Autowired
ProducerUtil producerUtil;
@PostMapping("/patrolTaskTemp/mqtest")
public void mqTime(){
producerUtil.sendTimeMsg(
"SMARTPATROL",
"你好鴨!!!".getBytes(),
"紅紅火火恍恍惚惚!!",
System.currentTimeMillis() + 10000
);
}
10、結果
2021-08-04 22:07:12.677  INFO 17548 --- [nio-8498-exec-2] c.h.i.i.s.m.common.util.ProducerUtil     : 傳送MQ訊息成功 -- Topic:TID_COMMON ,msgId:C0A80168448C2F0E140B14322CB30000 , Key:紅紅火火恍恍惚惚!!, tag:SMARTPATROL, body:你好鴨!!!
收到訊息啦!!
推送成功!!!!
2021-08-04 22:07:22.179 INFO 17548 --- [MessageThread_1] c.h.i.i.s.m.m.t.n.MqTimeMessageListener : 接收到MQ訊息 -- Topic:TID_COMMON, tag:SMARTPATROL,msgId:0b17f2e71ebd1b054c2c156f6d1d1655 , Key:紅紅火火恍恍惚惚!!, body:你好鴨!!!