1. 程式人生 > >SpringBoot使用訊息中介軟體RabbitMQ

SpringBoot使用訊息中介軟體RabbitMQ

首先在docker中安裝rabbitmq,pull 帶有web介面的

docker pull rabbitmq:3-management

5672為客戶端,15672為web介面埠

docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq01 映象id

簡要介紹一下rabbitmq的三個Exchange

Fanout( 廣播)
繫結queue時,不用設定routkey
釋出訊息時,不用設定routkey

Direct (點對點,單播)
繫結queue時,設定routkey
釋出訊息時,設定routkey,單播

Topic
繫結queue時,使用包含* 和#的表示式
#代表一個或多個單詞
*代表一個單詞
釋出訊息時,繫結queue的表示式中,匹配到的queue 則收到訊息

接下來在springboot專案中使用rabbitmq
在pom.xml中引入依賴

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId
>
<artifactId>spring-boot-starter-web</artifactId> </dependency>

配置

spring.rabbitmq.host=120.87.111.104
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

開啟註解

@EnableRabbit
@SpringBootApplication
public class RabbitmqApplication
{
public static void main(String[] args) { SpringApplication.run(RabbitmqApplication.class, args); } }

初始化

    @Autowired
    AmqpAdmin admin;

    //初始化
    @Test
    public void test1(){
        //建立direct
        admin.declareExchange(new DirectExchange("admin.direct"));
        //建立fanout
        admin.declareExchange(new FanoutExchange("admin.fanout"));
        //建立topic
        admin.declareExchange(new TopicExchange("admin.topic"));
        //建立3個訊息佇列
        admin.declareQueue(new Queue("admin.lhc1"));
        admin.declareQueue(new Queue("admin.lhc2"));
        admin.declareQueue(new Queue("admin.lhc3"));
        //設定繫結關係
        //設定direct與三個訊息佇列的關係
        admin.declareBinding(new Binding("admin.lhc1", Binding.DestinationType.QUEUE,"admin.direct","admin.lhc1",null));
        admin.declareBinding(new Binding("admin.lhc2", Binding.DestinationType.QUEUE,"admin.direct","admin.lhc2",null));
        admin.declareBinding(new Binding("admin.lhc3", Binding.DestinationType.QUEUE,"admin.direct","admin.lhc3",null));
        //設定fanout與三個訊息佇列的關係
        admin.declareBinding(new Binding("admin.lhc1",Binding.DestinationType.QUEUE,"admin.fanout","lhc1",null));
        admin.declareBinding(new Binding("admin.lhc2",Binding.DestinationType.QUEUE,"admin.fanout","lhc2",null));
        admin.declareBinding(new Binding("admin.lhc3",Binding.DestinationType.QUEUE,"admin.fanout","lhc3",null));
        //設定topic與三個訊息佇列的關係
        admin.declareBinding(new Binding("admin.lhc1", Binding.DestinationType.QUEUE,"admin.topic","admin.lhc.com.#",null));
        admin.declareBinding(new Binding("admin.lhc2", Binding.DestinationType.QUEUE,"admin.topic","admin.lhc.#",null));
        admin.declareBinding(new Binding("admin.lhc3", Binding.DestinationType.QUEUE,"admin.topic","admin.lhc.*",null));
    }

設定MessageConverter

@Configuration
public class RabbitMQConfig {

    //預設使用java的序列化機制,需要修改為Jackson2
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

進行相關測試

    //單播
    @Test
    public void test3(){
     User user= new User("1","jackson","123456");
        rabbitTemplate.convertAndSend("admin.direct","admin.lhc1",user);
    }
    //廣播
    @Test
    public void test5(){
        User user=  new User("1","jerry","123456");
        //不需要傳遞routKey,但必需傳空的字串,否則訊息佇列接收不到
        rabbitTemplate.convertAndSend("admin.fanout","",user);
    }

    //topic
    @Test
    public void test6(){
        User user=  new User("1","jack","123456");
        //admin.topic與三個佇列的繫結關係如下,故可確定admin.lhc1和admin.lhc2可以收到
        //"admin.lhc.com.#"
        //"admin.lhc.#"
        //"admin.lhc.*"
        rabbitTemplate.convertAndSend("admin.topic","admin.lhc.com.topic",user);
    }

    //接收
    @Test
    public void test4(){
        Object o=rabbitTemplate.receiveAndConvert("admin.lhc1");
        System.out.println(o.getClass());
        System.out.println(o);
    }

還可以在Service中進行監聽


@Service
public class  UserService {

    @RabbitListener(queues = "admin.lhc2")
    public void receive2(Message message){
        System.out.println(message.getBody());
        System.out.println(message.getMessageProperties());
    }

    @RabbitListener(queues = "admin.lhc1")
    //自動型別轉化
    public void receive(User user){
        System.out.println(user);
    }

}