1. 程式人生 > >RocketMQ與Spring整合(含生產者消費者)

RocketMQ與Spring整合(含生產者消費者)

RocketMQ與Spring整合,採用push方式接收訊息。後面有生產者與Spring整合,以及使用方法。

maven依賴

        <dependency>
            <groupId>com.alibaba.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>3.2.6</version>
        </dependency>
        <dependency
>
<groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-all</artifactId> <version>3.2.6</version> <type>pom</type> </dependency>

監聽處理類

public class MessageListenerImpl implements MessageListenerConcurrently
{
private static final Logger LOG = LoggerFactory.getLogger(MessageListenerImpl.class); @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { LOG.info(Thread.currentThread().getName() + " Receive New Messages: "
+ msgs.size()+";msg:" + msgs); for (MessageExt msg : msgs) { try { System.out.println(">>>>"+new String(msg.getBody(),"UTF-8")); } catch (UnsupportedEncodingException e) { LOG.error(e.toString()); } } // 有異常丟擲來,不要全捕獲了,這樣保證不能消費的訊息下次重推,每次重新消費間隔:10s,30s,1m,2m,3m // 如果沒有異常會認為都成功消費 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }

Spring配置

    <!-- rocketmq配置 -->
    <bean id="messageListeners" class="com.xx.service.mq.rocketmq.MessageListenerImpl"></bean>
    <!-- 匯入Spring配置檔案 -->
    <bean id="rocketmqConsumer" class="com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer" init-method="start" destroy-method="shutdown">
        <property name="consumerGroup" value="${rocketmq.consumerGroup}"/>
        <property name="instanceName">
            <!-- 獲取靜態方法返回值作為引數值 -->
            <bean class="org.springframework.beans.factory.config.MethodInvokingFactoryBean">
                <property name="targetClass">
                    <value>com.xx.utils.RunTimeUtil</value>
                </property>
                <property name="targetMethod">
                    <!-- 必須是靜態方法 -->
                    <value>getRocketMqUniqeInstanceName</value>
                </property>
            </bean>
        </property>
        <property name="namesrvAddr" value="${rocketmq.namesrvAddr}"/>
        <property name="messageListener" ref="messageListeners"/>
        <property name="subscription">
            <map>
                <entry key="${rocketmq.topic}" value="${rocketmq.tags}" />
            </map>
        </property>
    </bean>

consumerGroup name不需要同生產者一致,可以自行定義,一般定義為同topic,應該具有唯一性;
instanceName不需要同生產者一致,可以自行定義,應該具有唯一性。便於將來可以在控制檯根據名稱搜尋到對應服務。

RunTimeUtil

這是個生成唯一id的工具類

import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;

public class RunTimeUtil {
    private static AtomicInteger index = new AtomicInteger();

    public RunTimeUtil() {
    }

    public static int getPid() {
        String info = getRunTimeInfo();
        int pid = (new Random()).nextInt();
        int index = info.indexOf("@");
        if(index > 0) {
            pid = Integer.parseInt(info.substring(0, index));
        }

        return pid;
    }

    public static String getRunTimeInfo() {
        RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
        String info = runtime.getName();
        return info;
    }

    public static String getRocketMqUniqeInstanceName() {
        return "pid" + getPid() + "_index" + index.incrementAndGet();
    }
}

生產者的整合方式

spring配置:

    <bean id="rocketMQProducer" class="com.XXX.service.util.RocketMQProducer" init-method="init" destroy-method="destroy">
        <property name="producerGroup" value="${rocketmq.producer.group}"/>
        <property name="instanceName">
            <!-- 獲取靜態方法返回值作為引數值 -->
            <bean class="org.springframework.beans.factory.config.MethodInvokingFactoryBean">
                <property name="targetClass">
                    <value>com.XXX.utils.RunTimeUtil</value>
                </property>
                <property name="targetMethod">
                    <!-- 必須是靜態方法 -->
                    <value>getRocketMqUniqeInstanceName</value>
                </property>
            </bean>
        </property>
        <property name="namesrvAddr" value="${rocketmq.namesrvAddr}"/>
        <!-- 失敗重試次數 -->
        <property name="retryTimes" value="${rocketmq.producer.retryTimes}" />
    </bean>

生產者:

public class RocketMQProducer {
    private static final Logger logger = LoggerFactory.getLogger(RocketMQProducer.class);

    private DefaultMQProducer defaultMQProducer;
    private String producerGroup;
    private String namesrvAddr;
    private String instanceName;
    private int retryTimes;

    public void init() throws MQClientException {
        this.defaultMQProducer = new DefaultMQProducer(this.producerGroup);
        defaultMQProducer.setNamesrvAddr(this.namesrvAddr);
        defaultMQProducer.setInstanceName(this.instanceName);
        defaultMQProducer.setRetryTimesWhenSendFailed(this.retryTimes);
        defaultMQProducer.start();
        logger.info("rocketMQ初始化生產者完成[producerGroup:" + producerGroup + ",instanceName:"+ instanceName +"]");
    }

    public void destroy() {
        defaultMQProducer.shutdown();
        logger.info("rocketMQ生產者[producerGroup: " + producerGroup + ",instanceName: "+ instanceName +"]已停止");
    }

    public DefaultMQProducer getDefaultMQProducer() {
        return defaultMQProducer;
    }

    public void setProducerGroup(String producerGroup) {
        this.producerGroup = producerGroup;
    }

    public void setNamesrvAddr(String namesrvAddr) {
        this.namesrvAddr = namesrvAddr;
    }

    public void setInstanceName(String instanceName) {
        this.instanceName = instanceName;
    }

    public void setRetryTimes(int retryTimes) {
        this.retryTimes = retryTimes;
    }
}

呼叫生產者,傳送訊息:

    // 注入進來
    @Autowired
    @Qualifier("rocketMQProducer")
    private RocketMQProducer rocketMQProducer;

    // 傳送訊息
    for (Message msg : msgs) {
        Message message = new Message(topic, msg.getBytes("UTF-8"));
        rocketMQProducer.getDefaultMQProducer().send(message);
    }