1. 程式人生 > >阿里MQ普通+順序+延時訊息 整合Spring

阿里MQ普通+順序+延時訊息 整合Spring

前言

由於公司專案需要,研究了下AliWareMQ。阿里mq的普通訊息和延時訊息還是挺簡單的。不過在順序訊息的時候出現了一些瓶頸。後來查閱原始碼和依據demo整理了一版融合Spring的版本。

例項

mq配置檔案(Spring)

主要是順序訊息的配置,以及多例項的配置(需要在控制檯配置p/c)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd">
<!--MQ生產者相關開始--> <bean id="producer" class="com.aliyun.openservices.ons.api.bean.ProducerBean" init-method="start" destroy-method
="shutdown">
<property name="properties"> <map> <entry key="ProducerId" value="${ProducerId}"/> <!-- PID,請替換 --> <entry key="AccessKey" value="${AccessKey}"/> <!-- ACCESS_KEY,請替換 --> <entry key
="SecretKey" value="${SecretKey}"/>
<!-- SECRET_KEY,請替換 --> <entry key="ONSAddr" value="${ONSAddr}"/> </map> </property> </bean> <!--順序訊息--> <bean id="mqOrderProducer" class="com.rqbao.mq.listenter.MQOrderProducer" init-method="start" destroy-method="shutdown"> <constructor-arg name="producerProperties"> <props> <prop key="ProducerId">${OrderProducerId}</prop> <prop key="AccessKey">${AccessKey}</prop> <prop key="SecretKey">${SecretKey}</prop> <prop key="ONSAddr">${ONSAddr}</prop> <prop key="ORDER_TOPIC">${ORDER_TOPIC}</prop> <prop key="TAG">1111</prop> </props> </constructor-arg> </bean> <!--MQ生產者相關結束--> <!--MQ消費者相關開始--> <bean id="consumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown"> <property name="properties"> <map> <entry key="ConsumerId" value="${OrderConsumerId}"/> <!-- CID,請替換 --> <entry key="AccessKey" value="${AccessKey}"/> <!-- ACCESS_KEY,請替換 --> <entry key="SecretKey" value="${SecretKey}"/><!-- SECRET_KEY,請替換 --> <entry key="ONSAddr" value="${ONSAddr}"/> </map> </property> <property name="subscriptionTable"> <map> <entry value-ref="messageListener"> <key> <bean class="com.aliyun.openservices.ons.api.bean.Subscription"> <property name="topic" value="${topic}"/> <!-- Topic,請替換 --> <property name="expression" value="*"/><!-- MessageType名: 多個messageType 用 “||”分割 --> </bean> </key> </entry> </map> </property> </bean> <!--順序訊息--> <bean id="consumerOrder" class="com.rqbao.mq.listenter.MQOrderConsumer" init-method="start" destroy-method="shutdown"> <constructor-arg name="consumerProperties"> <props> <prop key="ConsumerId">${ConsumerId}</prop> <prop key="AccessKey">${AccessKey}</prop> <prop key="SecretKey">${SecretKey}</prop> <prop key="ONSAddr">${ONSAddr}</prop> <prop key="ORDER_TOPIC">${ORDER_TOPIC}</prop> <prop key="TAG">1111</prop> </props> </constructor-arg> </bean> <!-- 訊息處理器 --> <bean id="messageListener" class="com.rqbao.mq.listenter.MQListener"/> <!--順序--> <!--MQ消費者相關結束--> </beans>

順序生產者

    package com.rqbao.mq.listenter;

import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.order.OrderProducer;

import java.util.Properties;

/**
 * Created by ricky on 2017/7/2.
 */
public class MQOrderProducer {
    private  Properties producerProperties = new Properties();
    private  OrderProducer producer;

    public MQOrderProducer(Properties producerProperties) {
        this.producerProperties = producerProperties;
        producer = ONSFactory.createOrderProducer(producerProperties);
    }
    public OrderProducer getOrderProducer(){
        return  producer;
    }
    public  void  start(){
        new MQOrderProducer(producerProperties);
        producer.start();
    }
    public  void  shutdown(){
        producer.shutdown();
    }

    public Properties getProducerProperties() {
        return producerProperties;
    }

    public void setProducerProperties(Properties producerProperties) {
        this.producerProperties = producerProperties;
    }

    public OrderProducer getProducer() {
        return producer;
    }

    public void setProducer(OrderProducer producer) {
        this.producer = producer;
    }
}

順序消費者

package com.rqbao.mq.listenter;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.order.ConsumeOrderContext;
import com.aliyun.openservices.ons.api.order.MessageOrderListener;
import com.aliyun.openservices.ons.api.order.OrderAction;
import com.aliyun.openservices.ons.api.order.OrderConsumer;

import java.util.Properties;

/**
 * Created by ricky on 2017/7/2.
 */
public class MQOrderConsumer {
    private  Properties consumerProperties = new Properties();
    private  OrderConsumer consumer;

    public MQOrderConsumer(Properties consumerProperties) {
        this.consumerProperties = consumerProperties;
        consumer = ONSFactory.createOrderedConsumer(consumerProperties);
        consumer.subscribe(consumerProperties.get("ORDER_TOPIC").toString(), consumerProperties.get("TAG").toString(),  new MessageOrderListener() {
            @Override
            public OrderAction consume(final Message message, final ConsumeOrderContext context) {
                System.out.println(message);
                return OrderAction.Success;
            }
        });
    }
    public  void  start(){
        new MQOrderProducer(consumerProperties);
        consumer.start();
    }
    public  void  shutdown(){
        consumer.shutdown();
    }

    public Properties getConsumerProperties() {
        return consumerProperties;
    }

    public void setConsumerProperties(Properties consumerProperties) {
        this.consumerProperties = consumerProperties;
    }

    public OrderConsumer getConsumer() {
        return consumer;
    }

    public void setConsumer(OrderConsumer consumer) {
        this.consumer = consumer;
    }
}

引數檔案

#ALiMQ相關引數
#入鑰
AccessKey=XXXXXX
#祕鑰
SecretKey=XXXXXXX
#阿里雲MQ線上地址
#PropertyKeyConst.ONSAddr 請根據不同Region進行配置
#公網測試: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
#公有云生產: http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
#杭州金融雲: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
#深圳金融雲: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal
ONSAddr=http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
#主題
topic=RICKY_CGBUSINESS_LOCAL
ORDER_TOPIC=RICKY_CGBUSINESS_TEST
#生產者Id
ProducerId=PID_RICKYCG_LOCAL
OrderProducerId=PID_RICKYCG_TEST
#消費者
ConsumerId=CID_RICKYCG_LOCAL
OrderConsumerId=CID_RICKYCG_TEST

順序訊息生產

mport com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.order.OrderProducer;
import com.rqbao.mq.common.config.Global;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Date;
import java.util.UUID;

/**
 * Created by ricky on 2017/6/8.
 *MQ訊息生產功能,可能附加一些其他功能
 */
@Service
public class MQOrderService implements InitializingBean{
    private static final Logger logger = Logger.getLogger(MQOrderService.class);
    @Autowired
    MQOrderProducer mqOrderProducer;

    /**
     * 傳送順序MQ訊息
     * @param TAG 業務tag  tag將使用CGUTILS中的存管介面變數
     * @param msg 需傳遞對應介面需要引數,格式待考究
     *        delayTime 延遲時間  3000--->3s
     * @return
     */
    public String sendOrderMQMsg(String TAG,String msg){
        OrderProducer producer=mqOrderProducer.getOrderProducer();
        Message message = new Message(Global.getConfig("ORDER_TOPIC"), TAG,msg.getBytes());
        // 設定代表訊息的業務關鍵屬性,請儘可能全域性唯一。
        String orderId = "rqb_" + UUID.randomUUID().toString().replaceAll("-","");
        message.setKey(orderId);
        // 分割槽順序訊息中區分不同分割槽的關鍵欄位,sharding key於普通訊息的key是完全不同的概念。
        // 全域性順序訊息,該欄位可以設定為任意非空字串。
        String shardingKey = String.valueOf(orderId);
        SendResult sendResult = producer.send(message, shardingKey);
        if (sendResult != null) {
            System.out.println(new Date() + " Send mq message success! Topic is:" + Global.getConfig("topic") + "msgId is: " + sendResult.getMessageId());
        }
        return sendResult.toString();
    }
    @Override
    public void afterPropertiesSet() throws Exception {

    }
}

順序訊息消費

具體在MQOrderConsumer中寫消費處理

關於延時和定時訊息

依據官方說明,延時訊息必須是無序的topic,切記。
這裡寫圖片描述

可參閱官方文件

問題

MQ存在重複消費的問題,我這裡的解決方案是進行MQ訊息記錄,比如訊息有tag再加上body中相關的業務id,聯合組成唯一索引,避免重複消費問題。

附demo地址:

交流群

244930845 不懂的歡迎進群諮詢。