學習之路-RabbitMQ(三):SpringBoot整合RabbitMQ
阿新 • • 發佈:2018-12-29
一:引入RabbitMQ的相關jar包:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</ artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
二:配置application.yml
配置rabbitmq的yml:
server:
port: 44000
spring:
application:
name : test‐rabbitmq‐producer
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtualHost: /
三:建立rabbitMQ的配置類
配置RabbitMQ配置類,主要有交換機、佇列、已經兩者的繫結
本案例配置Topic交換機
package com.xuecheng.test.config;
import org.springframework.amqp.core.*;
import org. springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Auther: 星仔
* @Date: 2018/12/29 10:54
* @Description:
*/
@Configuration
public class RabbitMQConfig {
//宣告佇列名稱
public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
//宣告交換機的名稱
public static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";
//宣告RoutingKey
private static final String ROUTINGKEY_SMS = "inform.#.sms.#";
private static final String ROUTINGKEY_EMAIL = "inform.#.email.#";
/**
* 交換機配置
* ExchangeBuilder提供了fanout、direct、topic、header交換機型別的配置
* * @return the exchange
*/
@Bean(EXCHANGE_TOPICS_INFORM)
public Exchange EXCHANGE_TOPICS_INFORM(){
//durable(true)持久化,訊息佇列重啟後交換機仍然存在
return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
}
//宣告佇列
@Bean(QUEUE_INFORM_EMAIL)
public Queue QUEUE_INFORM_EMAIL(){
return new Queue(QUEUE_INFORM_EMAIL);
}
//宣告佇列
@Bean(QUEUE_INFORM_SMS)
public Queue QUEUE_INFORM_SMS(){
return new Queue(QUEUE_INFORM_SMS);
}
/** channel.queueBind(INFORM_QUEUE_SMS,"inform_exchange_topic","inform.#.sms.#");
* * 繫結佇列到交換機
* @param queue the queue
* @param exchange the exchange
* @return the binding
*/
@Bean
public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
}
@Bean
public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
}
}
四:配置生產端傳送訊息
package com.xuecheng.rabbitmq.producer;
import com.xuecheng.test.config.RabbitMQConfig;
import com.xuecheng.test.config.RabbitmqApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @Auther: 星仔
* @Date: 2018/12/24 21:31
* @Description:
*/
@SpringBootTest(classes = RabbitmqApplication.class)
@RunWith(SpringRunner.class)
public class ProducerTest05 {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void rabbitMQTest(){
String message = "send message";
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_TOPICS_INFORM,"inform.sms.email",message);
}
}
五:配置消費端接受訊息
package com.xuecheng.test.receive;
import com.rabbitmq.client.Channel;
import com.xuecheng.test.config.RabbitMQConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Auther: 星仔
* @Date: 2018/12/29 11:26
* @Description:
*/
@Component
public class ReceiveHandler {
//監聽佇列
@RabbitListener(queues = {RabbitMQConfig.QUEUE_INFORM_SMS})
public void receive_sms(String msg, Message message, Channel channel){
System.out.println(msg);
System.out.println(message.getBody());
}
//監聽佇列
@RabbitListener(queues = {RabbitMQConfig.QUEUE_INFORM_EMAIL})
public void receive_email(String msg, Message message, Channel channel){
System.out.println(msg);
System.out.println(message.getBody());
}
}