1. 程式人生 > >學習之路-RabbitMQ(三):SpringBoot整合RabbitMQ

學習之路-RabbitMQ(三):SpringBoot整合RabbitMQ

一:引入RabbitMQ的相關jar包:

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-test</
artifactId
>
</dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </dependency>

二:配置application.yml
配置rabbitmq的yml:

server:
     port: 44000
   spring:
     application:
       name
: test‐rabbitmq‐producer rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtualHost: /

三:建立rabbitMQ的配置類
配置RabbitMQ配置類,主要有交換機、佇列、已經兩者的繫結
本案例配置Topic交換機

package com.xuecheng.test.config;


import org.springframework.amqp.core.*;
import org.
springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Auther: 星仔 * @Date: 2018/12/29 10:54 * @Description: */ @Configuration public class RabbitMQConfig { //宣告佇列名稱 public static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; public static final String QUEUE_INFORM_SMS = "queue_inform_sms"; //宣告交換機的名稱 public static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform"; //宣告RoutingKey private static final String ROUTINGKEY_SMS = "inform.#.sms.#"; private static final String ROUTINGKEY_EMAIL = "inform.#.email.#"; /** * 交換機配置 * ExchangeBuilder提供了fanout、direct、topic、header交換機型別的配置 * * @return the exchange */ @Bean(EXCHANGE_TOPICS_INFORM) public Exchange EXCHANGE_TOPICS_INFORM(){ //durable(true)持久化,訊息佇列重啟後交換機仍然存在 return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build(); } //宣告佇列 @Bean(QUEUE_INFORM_EMAIL) public Queue QUEUE_INFORM_EMAIL(){ return new Queue(QUEUE_INFORM_EMAIL); } //宣告佇列 @Bean(QUEUE_INFORM_SMS) public Queue QUEUE_INFORM_SMS(){ return new Queue(QUEUE_INFORM_SMS); } /** channel.queueBind(INFORM_QUEUE_SMS,"inform_exchange_topic","inform.#.sms.#"); * * 繫結佇列到交換機 * @param queue the queue * @param exchange the exchange * @return the binding */ @Bean public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs(); } @Bean public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs(); } }

四:配置生產端傳送訊息

package com.xuecheng.rabbitmq.producer;


import com.xuecheng.test.config.RabbitMQConfig;
import com.xuecheng.test.config.RabbitmqApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * @Auther: 星仔
 * @Date: 2018/12/24 21:31
 * @Description:
 */
@SpringBootTest(classes = RabbitmqApplication.class)
@RunWith(SpringRunner.class)
public class ProducerTest05 {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    public void rabbitMQTest(){
        String message = "send message";
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_TOPICS_INFORM,"inform.sms.email",message);
    }
}

五:配置消費端接受訊息

package com.xuecheng.test.receive;

import com.rabbitmq.client.Channel;
import com.xuecheng.test.config.RabbitMQConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Auther: 星仔
 * @Date: 2018/12/29 11:26
 * @Description:
 */
@Component
public class ReceiveHandler {
    //監聽佇列
    @RabbitListener(queues = {RabbitMQConfig.QUEUE_INFORM_SMS})
    public void receive_sms(String msg, Message message, Channel channel){
        System.out.println(msg);
        System.out.println(message.getBody());
    }

    //監聽佇列
    @RabbitListener(queues = {RabbitMQConfig.QUEUE_INFORM_EMAIL})
    public void receive_email(String msg, Message message, Channel channel){
        System.out.println(msg);
        System.out.println(message.getBody());
    }
}