RocketMQ中介軟體訊息佇列在Maven專案中的配置使用操作 (分散式釋出訂閱訊息系統)
阿新 • • 發佈:2018-11-01
一、專案引用
三、程式碼示例
3.1 建立一對一消費模式的生產者
<dependency>
<groupId>com.foriseland.fjf.mq</groupId>
<artifactId>fjf-mq-rocketmq</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
二、配置檔案
統一自動走配置中心,配置中心的配置為:
ImmediateRocketProductor immediateRocketProductor = context.getBean(ImmediateRocketProductor.class); immediateRocketProductor.setProducerGroup("-", "immediateRocketProductor"); immediateRocketProductor.start(); Message msg = new Message("TopicTest1", // topic "Tag1", // tag "OrderID001", // key (i + " - Hello MetaQ zhangyan1").getBytes());// body SendResult sendResult = immediateRocketProductor.send(msg); immediateRocketProductor.shutdown();
3.2 建立一對一消費模式的消費者
3.3 建立一對多消費模式的生產者(即組播模式,一端傳送)ImmediateRocketConsumer immediateRocketConsumer = context.getBean(ImmediateRocketConsumer.class); immediateRocketConsumer.setConsumerGroup("-", "immediateRocketConsumer"); immediateRocketConsumer.subscribe("TopicTest1", ""); int consumeMessageBatchMaxSize = immediateRocketConsumer.getRocketMQPushConsumer().getRocketMqClientConfig().getRocketConfig().getConsumeMessageBatchMaxSize(); immediateRocketConsumer.getRocketMQPushConsumer().newInstance().setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize); MessageListenerConcurrently lister = new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { immediateRocketConsumer.getLogger().info(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size()); MessageExt msg = msgs.get(0); immediateRocketConsumer.getLogger().info( msg.getTopic() ); immediateRocketConsumer.getLogger().info(msg.getReconsumeTimes() ); try { if (msg.getTopic().equals("TopicTest1")) { // 執行TopicTest1的消費邏輯 if (msg.getTags() != null && msg.getTags().equals("Tag1")) { // 執行Tag1的消費 immediateRocketConsumer.getLogger().info("TopicTest1:==Tag1==="); immediateRocketConsumer.getLogger().info(new String(msg.getBody())); } else if (msg.getTags() != null & msg.getTags().equals("Tag2")) { // 執行Tag1的消費 immediateRocketConsumer.getLogger().info("TopicTest1:==Tag2==="); immediateRocketConsumer.getLogger().info(new String(msg.getBody())); } else if (msg.getTags() != null && msg.getTags().equals("Tag3")){ // 執行Tag1的消費 immediateRocketConsumer.getLogger().info("TopicTest1:==Tag3==="); immediateRocketConsumer.getLogger().info(new String(msg.getBody())); } else { } } else if (msg.getTopic().equals("TopicTest2")) { if (msg.getTags() != null && msg.getTags().equals("Tag2")) { // 執行Tag2的消費 immediateRocketConsumer.getLogger().info("TopicTest2:====="); immediateRocketConsumer.getLogger().info(new String(msg.getBody())); }else{ } } else if (msg.getTopic().equals("TopicTest3")) { if (msg.getTags() != null && msg.getTags().equals("Tag3")) { // 執行Tag2的消費 immediateRocketConsumer.getLogger().info("TopicTest3:====="); immediateRocketConsumer.getLogger().info(new String(msg.getBody())); } } else { } } catch (Exception e) { if (msg.getReconsumeTimes() == immediateRocketConsumer.getRocketMQPushConsumer().getMessageDelayLevel()) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }else{ return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }; /** * Consumer物件在使用之前必須要呼叫start初始化,初始化一次即可<br> */ immediateRocketConsumer.registerMessageListener(lister);
BroadcastProducer broadcast = context.getBean(BroadcastProducer.class);
// broadcast.setProducerGroup("192.168.2.31:9876","broadcast");
// broadcast.setProducerGroup("127.0.0.1:9876","broadcast"); ;
// broadcast.setProducerGroup("192.168.3.104:9876","broadcast");
broadcast.setProducerGroup("-","broadcast");
for (int i = 0; i <1; i++) {
SendResult sendResult = broadcast.send("broadcast", "broadcast", "TagA", "orderId01", "hello consumer broadcast "+ i);
}
broadcast.shutdown();
3.4 建立一對多消費模式的消費者(即組播模式,多端接收)
BroadcastConsumerA bcA = context.getBean(BroadcastConsumerA.class);
// 訊息模型,支援以下兩種:叢集消費(clustering),廣播消費(broadcasting)
bcA.setMessageModel(MessageModel.BROADCASTING);
// bcA.setConsumerGroup("192.168.2.31:9876","broadcast");
bcA.setConsumerGroup("-", "broadcast");
bcA.subscribe("broadcast", "Tag1 || Tag2 || Tag3 || TagA");
MessageListenerConcurrently lister = new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt msg = msgs.get(0);
try {
bcA.getLogger().info(Thread.currentThread().getName() + " consumerA Receive New Messages: " + msgs + "%n");
String m = new String(msg.getBody());
bcA.getLogger().info(m);
} catch (Exception e) {
if (msg.getReconsumeTimes() == bcA.getRocketMQPushConsumer().getMessageDelayLevel()) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} else {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
};
bcA.registerMessageListener(lister);