1. 程式人生 > >SpringBoot中如何監聽兩個不同源的RabbitMQ訊息佇列

SpringBoot中如何監聽兩個不同源的RabbitMQ訊息佇列

spring-boot如何配置監聽兩個不同的RabbitMQ

由於前段時間在公司開發過程中碰到了一個問題,需要同時監聽兩個不同的rabbitMq,但是之前沒有同時監聽兩個RabbitMq的情況,因此在同事的幫助下,成功實現了監聽多個MQ。下面我給大家一步一步講解下,也為自己做個筆記;

詳細步驟:

1. application.properties 檔案配置:

u.rabbitmq.addresses=10.0.0.1:5672
u.rabbitmq.username=username1
u.rabbitmq.password=password1
u.rabbitmq.vhost
.provider=/provider b.rabbitmq.addresses=10.0.0.2:5672 b.rabbitmq.username=username2 b.rabbitmq.password=password1 b.rabbitmq.vhost.provider=/provider

u.rabbitmq:為第一個MQ的配置,後面簡稱UMQ
b.rabbitmq:為第二個MQ的配置,後面簡稱BMQ

2. RabbitMqConfig中讀取配置資訊

@Value("${u.rabbitmq.username}")
private String uname;
@Value("${u.rabbitmq.password}"
) private String upassword; @Value("${u.rabbitmq.vhost.provider}") private String uhost; @Value("${u.rabbitmq.addresses}") private String uaddress; @Value("${b.rabbitmq.username}") private String bname; @Value("${b.rabbitmq.password}") private String bpassword; @Value("${b.rabbitmq.vhost.provider}"
) private String bhost; @Value("${b.rabbitmq.addresses}") private String baddress;

3.RabbitMqConfig中配置連結

@Bean(name="uConnectFactory")
@Primary
public ConnectionFactory uConnectFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setAddresses(uaddress);
    connectionFactory.setUsername(uname);
    connectionFactory.setPassword(upassword);
    connectionFactory.setVirtualHost(uhost);
    return connectionFactory;
}

@Bean(name="bConnectFactory")
public ConnectionFactory bConnectFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setAddresses(rpbaddress);
    connectionFactory.setUsername(rpbusername);
    connectionFactory.setPassword(rpbpassword);
    connectionFactory.setVirtualHost(rpbvhost);
    return connectionFactory;
}

@Primary標籤指若不指定連結,預設選擇連結,在配置了多個連結必須新增@Primary標籤,否則有可能會找不到對應的rabbitMQ的連線。

4.配置監聽連線

@Bean(name="uRabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory uRabbitListenerContainerFactory(
        @Qualifier("uConnectFactory") ConnectionFactory connectionFactory,
        RabbitProperties config
) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    RabbitProperties.Listener listenerConfig = config.getListener();
    factory.setAutoStartup(listenerConfig.isAutoStartup());
    if (listenerConfig.getAcknowledgeMode() != null) {
        factory.setAcknowledgeMode(listenerConfig.getAcknowledgeMode());
    }
    if (listenerConfig.getConcurrency() != null) {
        factory.setConcurrentConsumers(listenerConfig.getConcurrency());
    }
    if (listenerConfig.getMaxConcurrency() != null) {
        factory.setMaxConcurrentConsumers(listenerConfig.getMaxConcurrency());
    }
    if (listenerConfig.getPrefetch() != null) {
        factory.setPrefetchCount(listenerConfig.getPrefetch());
    }
    if (listenerConfig.getTransactionSize() != null) {
        factory.setTxSize(listenerConfig.getTransactionSize());
    }
    return factory;
}

@Bean(name="bRabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory brabbitListenerContainerFactory(
        @Qualifier("bConnectionFactory") ConnectionFactory connectionFactory,
        RabbitProperties config
) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    RabbitProperties.Listener listenerConfig = config.getListener();
    factory.setAutoStartup(listenerConfig.isAutoStartup());
    if (listenerConfig.getAcknowledgeMode() != null) {
        factory.setAcknowledgeMode(listenerConfig.getAcknowledgeMode());
    }
    if (listenerConfig.getConcurrency() != null) {
        factory.setConcurrentConsumers(listenerConfig.getConcurrency());
    }
    if (listenerConfig.getMaxConcurrency() != null) {
        factory.setMaxConcurrentConsumers(listenerConfig.getMaxConcurrency());
    }
    if (listenerConfig.getPrefetch() != null) {
        factory.setPrefetchCount(listenerConfig.getPrefetch());
    }
    if (listenerConfig.getTransactionSize() != null) {
        factory.setTxSize(listenerConfig.getTransactionSize());
    }
    return factory;
} 

5.配置訊息代理伺服器

@Bean(name="uAmqpAdmin")
public AmqpAdmin amqpAdmin(@Qualifier("uConnectFactory") ConnectionFactory connectionFactory) {
    RabbitAdmin admin = new RabbitAdmin(connectionFactory);
    admin.setAutoStartup(false);
    return admin;
}

@Bean(name="bAmqpAdmin")
public AmqpAdmin rpdAmqpAdmin(@Qualifier("bConnectionFactory") ConnectionFactory connectionFactory) {
    RabbitAdmin admin = new RabbitAdmin(connectionFactory);
    admin.setAutoStartup(false);
    return admin;
} 

6.具體監聽佇列及使用配置

UMQ佇列queue監聽配置

    @RabbitListener(queues = "queue", containerFactory = uRabbitListenerContainerFactory")
     public void handleMessage(String message) throws Exception {
    具體操作。。。。
 }

RMQ佇列queue監聽配置

@RabbitListener(queues = "queue", containerFactory = rRabbitListenerContainerFactory")
    public void handleMessage(String message) throws Exception {
    具體操作。。。。
}