Spring boot整合RabbitMQ的簡單使用
最近訊息佇列的使用比較頻繁,目前我使用比較多的就是RabbitMQ了,在專案中一般使用訊息佇列的場景有如下幾個地方。
1.非同步的處理:比如在註冊,或者專案中狀態改變需要給對應的角色傳送郵件,簡訊的時候。應該採用訊息佇列把事件放入佇列,讓傳送郵件的服務去做傳送的事件。
2.應用解耦:比如訂單和庫存的系統中,原來是有了訂單就會通知庫存發生改變。如果庫存發生未知錯誤,那麼訂單也會失敗。這樣是不大合理的。現在是將訂單系統和庫存系統分離。將訂單訊息持久化到訊息佇列中,就算庫存系統失效,也能根據持久化的訊息來保持系統的完整性。
3.日誌系統:日誌直接寫入資料庫,當有大量資料的時候,資料庫壓力過大。可以採用Kalfka訊息中介軟體處理。
目前RabbitMQ的抽象訊息模型如下:
生產者(producer)建立訊息,消費者(consumer)訂閱某個佇列。然後釋出到佇列(queue)中,最後將訊息傳送到監聽的消費者。
上面只是一個抽象的模型,由於RabbitMQ實現了AMQP 協議,所以實際上也是AMQP的模型。多了一個exchange,生產者的訊息不會直接傳送給佇列,而是經過exchange去分發給佇列。
生產者在將訊息傳送給Exchange的時候,一般會指定一個routing key,來指定這個訊息的路由規則,生產者就可以在傳送訊息給Exchange時,通過指定routing key來決定訊息流向哪裡。
RabbitMQ常用的Exchange Type有三種:fanout、direct、topic:
fanout:把所有傳送到該Exchange的訊息投遞到所有與它繫結的佇列中。
direct:把訊息投遞到那些binding key與routing key完全匹配的佇列中。
topic:將訊息路由到binding key與routing key模式匹配的佇列中。
下面我們直接程式碼實踐一下:
1.先在IDEA建立一個新的spring boot專案
2.配置依賴
<!-- RabbitMQ --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
3.在配置檔案中新增配置資訊,我使用的yml檔案。
spring:
rabbitmq:
host: (rabbitmq的主機ip)
port: 5672(埠,預設沒有修改就是5672)
username: (賬號)
password: (密碼)
virtual-host: / (預設就是/)
publisher-confirms: true
4.新建java的上生產者配置項
@Configuration
public class RqProducerConfig {
@Bean
RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
//宣告一個佇列 佇列名queue.string
@Bean
Queue queueString(RabbitAdmin rabbitAdmin) {
Queue queue = new Queue("queue.string", true);
rabbitAdmin.declareQueue(queue);
return queue;
}
//宣告一個exchange 是topic形式的
@Bean
TopicExchange exchange(RabbitAdmin rabbitAdmin) {
TopicExchange topicExchange = new TopicExchange("exchange");
rabbitAdmin.declareExchange(topicExchange);
return topicExchange;
}
//把exchange和佇列繫結起來,生產者傳送訊息到exchange,exchange會將訊息傳送到對應的佇列
@Bean
Binding bindingExchangeString(Queue queueString, TopicExchange exchange, RabbitAdmin rabbitAdmin) {
Binding binding = BindingBuilder.bind(queueString).to(exchange).with("queue.string");
rabbitAdmin.declareBinding(binding);
return binding;
}
/**
* 生產者用 可以用來轉化為json
*
* @return
*/
@Bean
public RabbitMessagingTemplate rabbitMessagingTemplate(RabbitTemplate rabbitTemplate) {
RabbitMessagingTemplate rabbitMessagingTemplate = new RabbitMessagingTemplate();
rabbitMessagingTemplate.setMessageConverter(jackson2Converter());
rabbitMessagingTemplate.setRabbitTemplate(rabbitTemplate);
return rabbitMessagingTemplate;
}
@Bean
public MappingJackson2MessageConverter jackson2Converter() {
return new MappingJackson2MessageConverter();
}
}
5.編寫傳送的服務層介面和實現
public interface SendStringRqService {
void sendString(String message);
}
@Service
public class SendStringRqServiceImpl implements SendStringRqService {
@Autowired
private RabbitMessagingTemplate rabbitTemplate;
@Override
public void sendEmail(String message) {
rabbitTemplate.convertAndSend("exchange", "queue.string", message);
}
}
6.客戶端測試類編寫
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitDemoApplicationTests {
@Autowired
private SendStringRqService sendStringRqService;
@Test
public void SendStringService() {
sendStringRqService.sendEmail(new Date(System.currentTimeMillis())+" ------> hello world");
}
}
我們再編寫消費端程式碼
1.首先配置依賴和yml檔案。這裡跟客戶端配置一樣。可以直接看客戶端配置就好了。
2.java配置項
@Configuration
@EnableRabbit
public class ConsumerConfig implements RabbitListenerConfigurer {
//這裡是接受訊息,進行處理的類
@Autowired
ReceiverService receiverService;
@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(new MappingJackson2MessageConverter());
return factory;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// factory.setPrefetchCount(5);
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
return factory;
}
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}
}
3.最後就是接受配置的類的編寫,ReceiverService的編寫
@Component
public class ReceiverService {
//監聽佇列的名稱,也就是客戶端宣告的queue.string
@RabbitListener(queues = "queue.string")
public void receiveStringQueue(String message) {
System.out.println("Received string<" + message + ">");
System.out.println("Received string<" + new Date(System.currentTimeMillis()) + ">");
}
}
配置好之後直接執行消費者端程式碼。這個時候檢視RabbitMQManage的介面,會有個連線可以看到。
再執行客戶端的測試用例
執行成功後看到消費者端的控制檯輸出:
這樣基礎的rabbitmq就用spring boot整合完成了。如果需要更多功能,可以根據這個示例去完成。