1. 程式人生 > >(四) RabbitMQ實戰教程(面向Java開發人員)之@RabbitListener訊息消費

(四) RabbitMQ實戰教程(面向Java開發人員)之@RabbitListener訊息消費

使用RabbitListener註解進行訊息消費

在前一篇部落格中我們往MessageListenerContainer設定了MessageListener進行訊息的消費,本篇部落格將介紹一種更為簡單的訊息消費方式:使用@RabbitListener註解方式。使用RabbitListener進行訊息的消費步驟如下

1.在啟動類上新增@EnableRabbit註解
2.在Spring容器中託管一個RabbitListenerContainerFactory,預設實現類SimpleRabbitListenerContainerFactory
3.編寫一個訊息處理器類託管到Spring容器中,並使用@RabbitListener
註解標註該類為RabbitMQ的訊息處理類 4.使用@RabbitHandler註解標註在方法上,表示當有收到訊息的時候,就交給帶有@RabbitHandler的方法處理,具體找哪個方法需要根據MessageConverter轉換後的物件型別決定

1.建立生產者配置類

@Configuration
public class SpringAMQPProducerConfig {
    @Bean
    public org.springframework.amqp.rabbit.connection.ConnectionFactory connectionFactory() {
        com
.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory(); connectionFactory.setHost("192.168.56.128"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("roberto"); connectionFactory.setPassword
("roberto"); connectionFactory.setAutomaticRecoveryEnabled(true); connectionFactory.setNetworkRecoveryInterval(10000); Map<String, Object> connectionFactoryPropertiesMap = new HashMap(); connectionFactoryPropertiesMap.put("principal", "RobertoHuang"); connectionFactoryPropertiesMap.put("description", "RGP訂單系統V2.0"); connectionFactoryPropertiesMap.put("emailAddress", "[email protected]"); connectionFactory.setClientProperties(connectionFactoryPropertiesMap); CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory); return cachingConnectionFactory; } @Bean public RabbitAdmin rabbitAdmin(org.springframework.amqp.rabbit.connection.ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } @Bean public RabbitTemplate rabbitTemplate(org.springframework.amqp.rabbit.connection.ConnectionFactory connectionFactory) { return new RabbitTemplate(connectionFactory); } }

2.建立生產者啟動類

@ComponentScan(basePackages = "roberto.growth.process.rabbitmq.spring.amqp.annotation.producer")
public class ProducerApplication {
    public static void main(String[] args) {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ProducerApplication.class);
        RabbitAdmin rabbitAdmin = context.getBean(RabbitAdmin.class);
        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);

        rabbitAdmin.declareExchange(new DirectExchange("roberto.order", true, false, new HashMap<>()));

        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        messageProperties.setContentType("UTF-8");
        Message message = new Message("訂單資訊".getBytes(), messageProperties);
        rabbitTemplate.send("roberto.order", "", message);
    }
}

3.建立消費者配置類 將RabbitListenerContainerFactory交由Spring託管

@Configuration
public class SpringAMQPConsumerConfig {
    @Bean
    public ConnectionFactory connectionFactory() {
        com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory();

        connectionFactory.setHost("192.168.56.128");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("roberto");
        connectionFactory.setPassword("roberto");

        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(10000);

        Map<String, Object> connectionFactoryPropertiesMap = new HashMap();
        connectionFactoryPropertiesMap.put("principal", "RobertoHuang");
        connectionFactoryPropertiesMap.put("description", "RGP訂單系統V2.0");
        connectionFactoryPropertiesMap.put("emailAddress", "[email protected]");
        connectionFactory.setClientProperties(connectionFactoryPropertiesMap);

        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory);
        return cachingConnectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
        simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory);

        // 設定消費者執行緒數
        simpleRabbitListenerContainerFactory.setConcurrentConsumers(5);
        // 設定最大消費者執行緒數
        simpleRabbitListenerContainerFactory.setMaxConcurrentConsumers(10);

        // 設定消費者標籤
        simpleRabbitListenerContainerFactory.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String s) {
                return "RGP訂單系統ADD處理邏輯消費者";
            }
        });

        return simpleRabbitListenerContainerFactory;
    }
}

4.自定義消費者訊息處理器類 在訊息處理器類中使用@RabbitListener註解宣告該類為RabbitMQ訊息處理器類,並在bindings屬性中聲明瞭佇列和交換機已經它們之間的繫結關係(監聽roberto.order.add佇列),使用@RabbitHandler註解宣告具體訊息處理方法

@Component
@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "roberto.order.add", durable = "true", autoDelete = "false", exclusive = "false"), exchange = @Exchange(name = "roberto.order"))})
public class SpringAMQPMessageHandle {
    @RabbitHandler
    public void add(byte[] body) {
        System.out.println("----------byte[]方法進行處理----------");
        try {
            System.out.println(new String(body, "UTF-8"));
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
}

5.建立消費者啟動類 新增@EnableRabbit註解

@EnableRabbit
@ComponentScan(basePackages = "roberto.growth.process.rabbitmq.spring.amqp.annotation.consumer")
public class ConsumerApplication {
    public static void main(String[] args) {
        new AnnotationConfigApplicationContext(ConsumerApplication.class);
    }
}

6.依次啟動訊息消費者和生產者 控制檯輸出如下

----------byte[]方法進行處理----------
訂單資訊