(四) RabbitMQ實戰教程(面向Java開發人員)之@RabbitListener訊息消費
阿新 • • 發佈:2018-12-25
使用RabbitListener註解進行訊息消費
在前一篇部落格中我們往MessageListenerContainer設定了MessageListener進行訊息的消費,本篇部落格將介紹一種更為簡單的訊息消費方式:使用@RabbitListener註解方式。使用RabbitListener進行訊息的消費步驟如下
1.在啟動類上新增@EnableRabbit註解
2.在Spring容器中託管一個RabbitListenerContainerFactory,預設實現類SimpleRabbitListenerContainerFactory
3.編寫一個訊息處理器類託管到Spring容器中,並使用@RabbitListener 註解標註該類為RabbitMQ的訊息處理類
4.使用@RabbitHandler註解標註在方法上,表示當有收到訊息的時候,就交給帶有@RabbitHandler的方法處理,具體找哪個方法需要根據MessageConverter轉換後的物件型別決定
1.建立生產者配置類
@Configuration
public class SpringAMQPProducerConfig {
@Bean
public org.springframework.amqp.rabbit.connection.ConnectionFactory connectionFactory() {
com .rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory();
connectionFactory.setHost("192.168.56.128");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("roberto");
connectionFactory.setPassword ("roberto");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(10000);
Map<String, Object> connectionFactoryPropertiesMap = new HashMap();
connectionFactoryPropertiesMap.put("principal", "RobertoHuang");
connectionFactoryPropertiesMap.put("description", "RGP訂單系統V2.0");
connectionFactoryPropertiesMap.put("emailAddress", "[email protected]");
connectionFactory.setClientProperties(connectionFactoryPropertiesMap);
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory);
return cachingConnectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(org.springframework.amqp.rabbit.connection.ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public RabbitTemplate rabbitTemplate(org.springframework.amqp.rabbit.connection.ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
}
2.建立生產者啟動類
@ComponentScan(basePackages = "roberto.growth.process.rabbitmq.spring.amqp.annotation.producer")
public class ProducerApplication {
public static void main(String[] args) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ProducerApplication.class);
RabbitAdmin rabbitAdmin = context.getBean(RabbitAdmin.class);
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
rabbitAdmin.declareExchange(new DirectExchange("roberto.order", true, false, new HashMap<>()));
MessageProperties messageProperties = new MessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
messageProperties.setContentType("UTF-8");
Message message = new Message("訂單資訊".getBytes(), messageProperties);
rabbitTemplate.send("roberto.order", "", message);
}
}
3.建立消費者配置類 將RabbitListenerContainerFactory交由Spring託管
@Configuration
public class SpringAMQPConsumerConfig {
@Bean
public ConnectionFactory connectionFactory() {
com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory();
connectionFactory.setHost("192.168.56.128");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("roberto");
connectionFactory.setPassword("roberto");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(10000);
Map<String, Object> connectionFactoryPropertiesMap = new HashMap();
connectionFactoryPropertiesMap.put("principal", "RobertoHuang");
connectionFactoryPropertiesMap.put("description", "RGP訂單系統V2.0");
connectionFactoryPropertiesMap.put("emailAddress", "[email protected]");
connectionFactory.setClientProperties(connectionFactoryPropertiesMap);
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory);
return cachingConnectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory);
// 設定消費者執行緒數
simpleRabbitListenerContainerFactory.setConcurrentConsumers(5);
// 設定最大消費者執行緒數
simpleRabbitListenerContainerFactory.setMaxConcurrentConsumers(10);
// 設定消費者標籤
simpleRabbitListenerContainerFactory.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String s) {
return "RGP訂單系統ADD處理邏輯消費者";
}
});
return simpleRabbitListenerContainerFactory;
}
}
4.自定義消費者訊息處理器類 在訊息處理器類中使用@RabbitListener註解宣告該類為RabbitMQ訊息處理器類,並在bindings屬性中聲明瞭佇列和交換機已經它們之間的繫結關係(監聽roberto.order.add佇列),使用@RabbitHandler註解宣告具體訊息處理方法
@Component
@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "roberto.order.add", durable = "true", autoDelete = "false", exclusive = "false"), exchange = @Exchange(name = "roberto.order"))})
public class SpringAMQPMessageHandle {
@RabbitHandler
public void add(byte[] body) {
System.out.println("----------byte[]方法進行處理----------");
try {
System.out.println(new String(body, "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
5.建立消費者啟動類 新增@EnableRabbit註解
@EnableRabbit
@ComponentScan(basePackages = "roberto.growth.process.rabbitmq.spring.amqp.annotation.consumer")
public class ConsumerApplication {
public static void main(String[] args) {
new AnnotationConfigApplicationContext(ConsumerApplication.class);
}
}
6.依次啟動訊息消費者和生產者 控制檯輸出如下
----------byte[]方法進行處理----------
訂單資訊