SpringBoot使用訊息中介軟體RabbitMQ
阿新 • • 發佈:2019-02-16
首先在docker中安裝rabbitmq,pull 帶有web介面的
docker pull rabbitmq:3-management
5672為客戶端,15672為web介面埠
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq01 映象id
簡要介紹一下rabbitmq的三個Exchange
Fanout( 廣播)
繫結queue時,不用設定routkey
釋出訊息時,不用設定routkey
Direct (點對點,單播)
繫結queue時,設定routkey
釋出訊息時,設定routkey,單播
Topic
繫結queue時,使用包含* 和#的表示式
#代表一個或多個單詞
*代表一個單詞
釋出訊息時,繫結queue的表示式中,匹配到的queue 則收到訊息
接下來在springboot專案中使用rabbitmq
在pom.xml中引入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId >
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
配置
spring.rabbitmq.host=120.87.111.104
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
開啟註解
@EnableRabbit
@SpringBootApplication
public class RabbitmqApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitmqApplication.class, args);
}
}
初始化
@Autowired
AmqpAdmin admin;
//初始化
@Test
public void test1(){
//建立direct
admin.declareExchange(new DirectExchange("admin.direct"));
//建立fanout
admin.declareExchange(new FanoutExchange("admin.fanout"));
//建立topic
admin.declareExchange(new TopicExchange("admin.topic"));
//建立3個訊息佇列
admin.declareQueue(new Queue("admin.lhc1"));
admin.declareQueue(new Queue("admin.lhc2"));
admin.declareQueue(new Queue("admin.lhc3"));
//設定繫結關係
//設定direct與三個訊息佇列的關係
admin.declareBinding(new Binding("admin.lhc1", Binding.DestinationType.QUEUE,"admin.direct","admin.lhc1",null));
admin.declareBinding(new Binding("admin.lhc2", Binding.DestinationType.QUEUE,"admin.direct","admin.lhc2",null));
admin.declareBinding(new Binding("admin.lhc3", Binding.DestinationType.QUEUE,"admin.direct","admin.lhc3",null));
//設定fanout與三個訊息佇列的關係
admin.declareBinding(new Binding("admin.lhc1",Binding.DestinationType.QUEUE,"admin.fanout","lhc1",null));
admin.declareBinding(new Binding("admin.lhc2",Binding.DestinationType.QUEUE,"admin.fanout","lhc2",null));
admin.declareBinding(new Binding("admin.lhc3",Binding.DestinationType.QUEUE,"admin.fanout","lhc3",null));
//設定topic與三個訊息佇列的關係
admin.declareBinding(new Binding("admin.lhc1", Binding.DestinationType.QUEUE,"admin.topic","admin.lhc.com.#",null));
admin.declareBinding(new Binding("admin.lhc2", Binding.DestinationType.QUEUE,"admin.topic","admin.lhc.#",null));
admin.declareBinding(new Binding("admin.lhc3", Binding.DestinationType.QUEUE,"admin.topic","admin.lhc.*",null));
}
設定MessageConverter
@Configuration
public class RabbitMQConfig {
//預設使用java的序列化機制,需要修改為Jackson2
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
進行相關測試
//單播
@Test
public void test3(){
User user= new User("1","jackson","123456");
rabbitTemplate.convertAndSend("admin.direct","admin.lhc1",user);
}
//廣播
@Test
public void test5(){
User user= new User("1","jerry","123456");
//不需要傳遞routKey,但必需傳空的字串,否則訊息佇列接收不到
rabbitTemplate.convertAndSend("admin.fanout","",user);
}
//topic
@Test
public void test6(){
User user= new User("1","jack","123456");
//admin.topic與三個佇列的繫結關係如下,故可確定admin.lhc1和admin.lhc2可以收到
//"admin.lhc.com.#"
//"admin.lhc.#"
//"admin.lhc.*"
rabbitTemplate.convertAndSend("admin.topic","admin.lhc.com.topic",user);
}
//接收
@Test
public void test4(){
Object o=rabbitTemplate.receiveAndConvert("admin.lhc1");
System.out.println(o.getClass());
System.out.println(o);
}
還可以在Service中進行監聽
@Service
public class UserService {
@RabbitListener(queues = "admin.lhc2")
public void receive2(Message message){
System.out.println(message.getBody());
System.out.println(message.getMessageProperties());
}
@RabbitListener(queues = "admin.lhc1")
//自動型別轉化
public void receive(User user){
System.out.println(user);
}
}