RocketMQ與Spring整合(含生產者消費者)
阿新 • • 發佈:2019-01-24
RocketMQ與Spring整合,採用push方式接收訊息。後面有生產者與Spring整合,以及使用方法。
maven依賴
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>3.2.6</version>
</dependency>
<dependency >
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>3.2.6</version>
<type>pom</type>
</dependency>
監聽處理類
public class MessageListenerImpl implements MessageListenerConcurrently {
private static final Logger LOG = LoggerFactory.getLogger(MessageListenerImpl.class);
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
LOG.info(Thread.currentThread().getName()
+ " Receive New Messages: " + msgs.size()+";msg:" + msgs);
for (MessageExt msg : msgs) {
try {
System.out.println(">>>>"+new String(msg.getBody(),"UTF-8"));
} catch (UnsupportedEncodingException e) {
LOG.error(e.toString());
}
}
// 有異常丟擲來,不要全捕獲了,這樣保證不能消費的訊息下次重推,每次重新消費間隔:10s,30s,1m,2m,3m
// 如果沒有異常會認為都成功消費
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
Spring配置
<!-- rocketmq配置 -->
<bean id="messageListeners" class="com.xx.service.mq.rocketmq.MessageListenerImpl"></bean>
<!-- 匯入Spring配置檔案 -->
<bean id="rocketmqConsumer" class="com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer" init-method="start" destroy-method="shutdown">
<property name="consumerGroup" value="${rocketmq.consumerGroup}"/>
<property name="instanceName">
<!-- 獲取靜態方法返回值作為引數值 -->
<bean class="org.springframework.beans.factory.config.MethodInvokingFactoryBean">
<property name="targetClass">
<value>com.xx.utils.RunTimeUtil</value>
</property>
<property name="targetMethod">
<!-- 必須是靜態方法 -->
<value>getRocketMqUniqeInstanceName</value>
</property>
</bean>
</property>
<property name="namesrvAddr" value="${rocketmq.namesrvAddr}"/>
<property name="messageListener" ref="messageListeners"/>
<property name="subscription">
<map>
<entry key="${rocketmq.topic}" value="${rocketmq.tags}" />
</map>
</property>
</bean>
consumerGroup name不需要同生產者一致,可以自行定義,一般定義為同topic,應該具有唯一性;
instanceName不需要同生產者一致,可以自行定義,應該具有唯一性。便於將來可以在控制檯根據名稱搜尋到對應服務。
RunTimeUtil
這是個生成唯一id的工具類
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
public class RunTimeUtil {
private static AtomicInteger index = new AtomicInteger();
public RunTimeUtil() {
}
public static int getPid() {
String info = getRunTimeInfo();
int pid = (new Random()).nextInt();
int index = info.indexOf("@");
if(index > 0) {
pid = Integer.parseInt(info.substring(0, index));
}
return pid;
}
public static String getRunTimeInfo() {
RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
String info = runtime.getName();
return info;
}
public static String getRocketMqUniqeInstanceName() {
return "pid" + getPid() + "_index" + index.incrementAndGet();
}
}
生產者的整合方式
spring配置:
<bean id="rocketMQProducer" class="com.XXX.service.util.RocketMQProducer" init-method="init" destroy-method="destroy">
<property name="producerGroup" value="${rocketmq.producer.group}"/>
<property name="instanceName">
<!-- 獲取靜態方法返回值作為引數值 -->
<bean class="org.springframework.beans.factory.config.MethodInvokingFactoryBean">
<property name="targetClass">
<value>com.XXX.utils.RunTimeUtil</value>
</property>
<property name="targetMethod">
<!-- 必須是靜態方法 -->
<value>getRocketMqUniqeInstanceName</value>
</property>
</bean>
</property>
<property name="namesrvAddr" value="${rocketmq.namesrvAddr}"/>
<!-- 失敗重試次數 -->
<property name="retryTimes" value="${rocketmq.producer.retryTimes}" />
</bean>
生產者:
public class RocketMQProducer {
private static final Logger logger = LoggerFactory.getLogger(RocketMQProducer.class);
private DefaultMQProducer defaultMQProducer;
private String producerGroup;
private String namesrvAddr;
private String instanceName;
private int retryTimes;
public void init() throws MQClientException {
this.defaultMQProducer = new DefaultMQProducer(this.producerGroup);
defaultMQProducer.setNamesrvAddr(this.namesrvAddr);
defaultMQProducer.setInstanceName(this.instanceName);
defaultMQProducer.setRetryTimesWhenSendFailed(this.retryTimes);
defaultMQProducer.start();
logger.info("rocketMQ初始化生產者完成[producerGroup:" + producerGroup + ",instanceName:"+ instanceName +"]");
}
public void destroy() {
defaultMQProducer.shutdown();
logger.info("rocketMQ生產者[producerGroup: " + producerGroup + ",instanceName: "+ instanceName +"]已停止");
}
public DefaultMQProducer getDefaultMQProducer() {
return defaultMQProducer;
}
public void setProducerGroup(String producerGroup) {
this.producerGroup = producerGroup;
}
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
public void setInstanceName(String instanceName) {
this.instanceName = instanceName;
}
public void setRetryTimes(int retryTimes) {
this.retryTimes = retryTimes;
}
}
呼叫生產者,傳送訊息:
// 注入進來
@Autowired
@Qualifier("rocketMQProducer")
private RocketMQProducer rocketMQProducer;
// 傳送訊息
for (Message msg : msgs) {
Message message = new Message(topic, msg.getBytes("UTF-8"));
rocketMQProducer.getDefaultMQProducer().send(message);
}