SpringBoot2.0專案模組整合之RabbitMQ
阿新 • • 發佈:2019-02-14
springboot整合RabbitMQ非常簡單,如果只是簡單的使用配置非常少,springboot提供了spring-boot-starter-amqp專案對訊息各種支援。
新增依賴包
<!-- rabbitmq依賴 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.yml
server: port: 8004 spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: / publisher-confirms: true # 訊息傳送到交換機確認機制,是否確認回撥 publisher-returns: true datasource: type: com.alibaba.druid.pool.DruidDataSource #這裡是配置druid連線池,以下都是druid的配置資訊 url: jdbc:mysql://127.0.0.1:3306/quartz?useUnicode=true&characterEncoding=utf-8&useSSL=false driver-class-name: com.mysql.jdbc.Driver username: root password: 123456 logging: file: rabbitmq-service.log level: com.rabbitmq: debug
RabbitConfig
package com.rabbitmq.config; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { @Value("${spring.rabbitmq.host}") private String addresses; @Value("${spring.rabbitmq.port}") private Integer port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.virtual-host}") private String virtualHost; @Value("${spring.rabbitmq.publisher-confirms}") private boolean publisherConfirms; public RabbitConfig() { } @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses(this.addresses); connectionFactory.setPort(this.port); connectionFactory.setUsername(this.username); connectionFactory.setPassword(this.password); connectionFactory.setVirtualHost(this.virtualHost); connectionFactory.setPublisherConfirms(this.publisherConfirms); return connectionFactory; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(this.connectionFactory()); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); return rabbitTemplate; } }
RabbitExchangeConfig
package com.rabbitmq.config;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitExchangeConfig {
@Autowired
private RabbitAdmin rabbitAdmin;
public RabbitExchangeConfig() {
}
@Bean
FanoutExchange contractFanoutExchange() {
FanoutExchange fanoutExchange = new FanoutExchange("com.exchange.fanout");
this.rabbitAdmin.declareExchange(fanoutExchange);
return fanoutExchange;
}
@Bean
TopicExchange contractTopicExchangeDurable() {
TopicExchange contractTopicExchange = new TopicExchange("com.exchange.topic");
this.rabbitAdmin.declareExchange(contractTopicExchange);
return contractTopicExchange;
}
@Bean
DirectExchange contractDirectExchange() {
DirectExchange contractDirectExchange = new DirectExchange("com.exchange.direct");
this.rabbitAdmin.declareExchange(contractDirectExchange);
return contractDirectExchange;
}
@Bean
Queue queueHello() {
Queue queue = new Queue("com.queue.notify.hello", true);
this.rabbitAdmin.declareQueue(queue);
return queue;
}
}
RabbitListenerConfig
package com.rabbitmq.config;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
@Configuration
@EnableRabbit
public class RabbitListenerConfig implements RabbitListenerConfigurer {
@Autowired
private ConnectionFactory connectionFactory;
public RabbitListenerConfig() {
}
@Bean
public DefaultMessageHandlerMethodFactory handlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(new MappingJackson2MessageConverter());
return factory;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(this.connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
return factory;
}
public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) {
rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(this.handlerMethodFactory());
}
}
AmqpServiceImpl
package com.rabbitmq.service.impl;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.rabbitmq.api.IAmqpService;
@Service
public class AmqpServiceImpl implements IAmqpService {
private @Autowired AmqpTemplate amqpTemplate;
@Override
public void convertAndSend(String message) {
amqpTemplate.convertAndSend("com.queue.notify.hello", message);
}
}
AmqpServiceConsumer
package com.rabbitmq.mq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.rabbitmq.api.IAmqpHelloService;
@Component
public class AmqpServiceConsumer {
private Logger logger = LoggerFactory.getLogger(AmqpServiceConsumer.class);
@Autowired
private IAmqpHelloService amqpHelloService;
public AmqpServiceConsumer() {
}
@RabbitListener(queues = {"com.queue.notify.hello"})
public void receiveSmsCodeQueue(String message) {
this.logger.info("------hello:消費者處理訊息------");
this.logger.debug(message);
this.amqpHelloService.receiveHelloQueue(message);
}
}
GitHub專案demo下載地址:專案demo