SpringBoot中如何監聽兩個不同源的RabbitMQ訊息佇列
阿新 • • 發佈:2019-01-28
spring-boot如何配置監聽兩個不同的RabbitMQ
由於前段時間在公司開發過程中碰到了一個問題,需要同時監聽兩個不同的rabbitMq,但是之前沒有同時監聽兩個RabbitMq的情況,因此在同事的幫助下,成功實現了監聽多個MQ。下面我給大家一步一步講解下,也為自己做個筆記;
詳細步驟:
1. application.properties 檔案配置:
u.rabbitmq.addresses=10.0.0.1:5672
u.rabbitmq.username=username1
u.rabbitmq.password=password1
u.rabbitmq.vhost .provider=/provider
b.rabbitmq.addresses=10.0.0.2:5672
b.rabbitmq.username=username2
b.rabbitmq.password=password1
b.rabbitmq.vhost.provider=/provider
u.rabbitmq:為第一個MQ的配置,後面簡稱UMQ
b.rabbitmq:為第二個MQ的配置,後面簡稱BMQ
2. RabbitMqConfig中讀取配置資訊
@Value("${u.rabbitmq.username}")
private String uname;
@Value("${u.rabbitmq.password}" )
private String upassword;
@Value("${u.rabbitmq.vhost.provider}")
private String uhost;
@Value("${u.rabbitmq.addresses}")
private String uaddress;
@Value("${b.rabbitmq.username}")
private String bname;
@Value("${b.rabbitmq.password}")
private String bpassword;
@Value("${b.rabbitmq.vhost.provider}" )
private String bhost;
@Value("${b.rabbitmq.addresses}")
private String baddress;
3.RabbitMqConfig中配置連結
@Bean(name="uConnectFactory")
@Primary
public ConnectionFactory uConnectFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(uaddress);
connectionFactory.setUsername(uname);
connectionFactory.setPassword(upassword);
connectionFactory.setVirtualHost(uhost);
return connectionFactory;
}
@Bean(name="bConnectFactory")
public ConnectionFactory bConnectFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(rpbaddress);
connectionFactory.setUsername(rpbusername);
connectionFactory.setPassword(rpbpassword);
connectionFactory.setVirtualHost(rpbvhost);
return connectionFactory;
}
@Primary標籤指若不指定連結,預設選擇連結,在配置了多個連結必須新增@Primary標籤,否則有可能會找不到對應的rabbitMQ的連線。
4.配置監聽連線
@Bean(name="uRabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory uRabbitListenerContainerFactory(
@Qualifier("uConnectFactory") ConnectionFactory connectionFactory,
RabbitProperties config
) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
RabbitProperties.Listener listenerConfig = config.getListener();
factory.setAutoStartup(listenerConfig.isAutoStartup());
if (listenerConfig.getAcknowledgeMode() != null) {
factory.setAcknowledgeMode(listenerConfig.getAcknowledgeMode());
}
if (listenerConfig.getConcurrency() != null) {
factory.setConcurrentConsumers(listenerConfig.getConcurrency());
}
if (listenerConfig.getMaxConcurrency() != null) {
factory.setMaxConcurrentConsumers(listenerConfig.getMaxConcurrency());
}
if (listenerConfig.getPrefetch() != null) {
factory.setPrefetchCount(listenerConfig.getPrefetch());
}
if (listenerConfig.getTransactionSize() != null) {
factory.setTxSize(listenerConfig.getTransactionSize());
}
return factory;
}
@Bean(name="bRabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory brabbitListenerContainerFactory(
@Qualifier("bConnectionFactory") ConnectionFactory connectionFactory,
RabbitProperties config
) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
RabbitProperties.Listener listenerConfig = config.getListener();
factory.setAutoStartup(listenerConfig.isAutoStartup());
if (listenerConfig.getAcknowledgeMode() != null) {
factory.setAcknowledgeMode(listenerConfig.getAcknowledgeMode());
}
if (listenerConfig.getConcurrency() != null) {
factory.setConcurrentConsumers(listenerConfig.getConcurrency());
}
if (listenerConfig.getMaxConcurrency() != null) {
factory.setMaxConcurrentConsumers(listenerConfig.getMaxConcurrency());
}
if (listenerConfig.getPrefetch() != null) {
factory.setPrefetchCount(listenerConfig.getPrefetch());
}
if (listenerConfig.getTransactionSize() != null) {
factory.setTxSize(listenerConfig.getTransactionSize());
}
return factory;
}
5.配置訊息代理伺服器
@Bean(name="uAmqpAdmin")
public AmqpAdmin amqpAdmin(@Qualifier("uConnectFactory") ConnectionFactory connectionFactory) {
RabbitAdmin admin = new RabbitAdmin(connectionFactory);
admin.setAutoStartup(false);
return admin;
}
@Bean(name="bAmqpAdmin")
public AmqpAdmin rpdAmqpAdmin(@Qualifier("bConnectionFactory") ConnectionFactory connectionFactory) {
RabbitAdmin admin = new RabbitAdmin(connectionFactory);
admin.setAutoStartup(false);
return admin;
}
6.具體監聽佇列及使用配置
UMQ佇列queue監聽配置
@RabbitListener(queues = "queue", containerFactory = uRabbitListenerContainerFactory")
public void handleMessage(String message) throws Exception {
具體操作。。。。
}
RMQ佇列queue監聽配置
@RabbitListener(queues = "queue", containerFactory = rRabbitListenerContainerFactory")
public void handleMessage(String message) throws Exception {
具體操作。。。。
}