1. 程式人生 > >SpringBoot2.0系列--09--訊息佇列(Rabbit)

SpringBoot2.0系列--09--訊息佇列(Rabbit)

SpringBoot2.0系列–09–訊息佇列(Rabbit)

前言

JDK出11了,SpringBoot出2.0了,還沒有系統的學習過,剛好最近專案中有使用到,就把一些關鍵的東西列出來,避免忘記
SpringBoot2.0系列–00–目錄

介紹

當專案需要拆分,分散式的時候一般需要使用訊息佇列,Rabbit作為一個訊息中介軟體,在實際專案中使用的比重還是挺大的。

訊息中介軟體最主要的作用是解耦,中介軟體最標準的用法是生產者生產訊息傳送到佇列,消費者從佇列中拿取訊息並處理,生產者不用關心是誰來消費,消費者不用關心誰在生產訊息,從而達到解耦的目的。

這邊主要springboot和RabbitMQ的結合使用。具體介紹可以檢視這篇:

// todo 某連結

總流程

  1. 安裝rabbit,啟動
  2. 配置引用
  3. 簡單(一對一下訊息)例項
  4. 一對多訊息
  5. 多對多訊息
  6. 傳送物件
  7. Topic Exchange
  8. Fanout Exchange

具體操作

安裝rabbit,啟動

可以檢視

// todo 某連結

配置引用

pom檔案

<!-- rabbitmq需要的包-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.properties檔案

# 這幾個是預設的配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

簡單(一對一下訊息)例項

建立佇列配置

@Configuration
public class RabbitConfig {

    // 測試一對一
    @Bean
    public Queue helloQueue() {
        return new Queue("hello");
    }

    @Bean
    public Queue fooQueue() {
        return new Queue("foo");
}

建立生產者

/*
 * Copyright (C), 2015-2018
 * FileName: Sender
 * Author:   zhao
 * Date:     2018/11/14 15:28
 * Description: 傳送者
 * History:
 * <author>          <time>          <version>          <desc>
 * 作者姓名           修改時間           版本號              描述
 */
package com.lizhaobolg.message.rabbit.easy;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 〈一句話功能簡述〉<br>
 * 〈傳送者〉
 *
 * @author zhao
 * @date 2018/11/14 15:28
 * @since 1.0.1
 */
@Component
public class Sender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "hello " + new Date();
        System.out.println("Sender : " + context);
        rabbitTemplate.convertAndSend("hello", context);
    }

    public void sendFoo() {
        String context = "foo " + new Date();
        System.out.println("Foo Sender  : " + context);
        rabbitTemplate.convertAndSend("foo", context);
    }

}

建立消費者

/*
 * Copyright (C), 2015-2018
 * FileName: Receiver
 * Author:   zhao
 * Date:     2018/11/14 15:28
 * Description: 消費者
 * History:
 * <author>          <time>          <version>          <desc>
 * 作者姓名           修改時間           版本號              描述
 */
package com.lizhaobolg.message.rabbit.easy;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 〈一句話功能簡述〉<br>
 * 〈消費者〉
 *
 * @author zhao
 * @date 2018/11/14 15:28
 * @since 1.0.1
 */
@Component
@RabbitListener(queues = "hello")
public class Receiver {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver : " + hello);
    }

}

測試類及結果

這邊發出去一條,接收到了一條

// 測試一對一
@Test
public void easy() {

    //  結果
    //        Sender : hello Wed Nov 14 19:33:16 GMT+08:00 2018
    //        Receiver : hello Wed Nov 14 19:33:16 GMT+08:00 2018
    sender.send();
    // sender.sendFoo();
}

一對多訊息

建立佇列配置

    // 測試一對多
    @Bean
    public Queue multimapQueue1() {
        return new Queue("OneToMany");
    }

建立生產者

/*
 * Copyright (C), 2015-2018
 * FileName: one2ManySender
 * Author:   zhao
 * Date:     2018/11/14 16:57
 * Description: 一對多翻譯
 * History:
 * <author>          <time>          <version>          <desc>
 * 作者姓名           修改時間           版本號              描述
 */
package com.lizhaobolg.message.rabbit.multimap;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 〈一句話功能簡述〉<br>
 * 〈一對多翻譯〉
 *
 * @author zhao
 * @date 2018/11/14 16:57
 * @since 1.0.1
 */
@Component
public class OneToManySender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "OneToMany " + new Date();
        System.out.println("OneToManySender : " + context);
        rabbitTemplate.convertAndSend("OneToMany", context);
    }

}

建立消費者

/*
 * Copyright (C), 2015-2018
 * FileName: OneToManyReceiver1
 * Author:   zhao
 * Date:     2018/11/14 16:59
 * Description: 一對多接收者1
 * History:
 * <author>          <time>          <version>          <desc>
 * 作者姓名           修改時間           版本號              描述
 */
package com.lizhaobolg.message.rabbit.multimap;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 〈一句話功能簡述〉<br>
 * 〈一對多接收者1〉
 *
 * @author zhao
 * @date 2018/11/14 16:59
 * @since 1.0.1
 */
@Component
@RabbitListener(queues = "OneToMany")
public class OneToManyReceiver1 {

    @RabbitHandler
    public void process(String foo) {
        System.out.println("OneToManyReceiver1 : " + foo);
    }

}

/*
 * Copyright (C), 2015-2018
 * FileName: OneToManyReceiver2
 * Author:   zhao
 * Date:     2018/11/14 16:59
 * Description: 一對多接收者2
 * History:
 * <author>          <time>          <version>          <desc>
 * 作者姓名           修改時間           版本號              描述
 */
package com.lizhaobolg.message.rabbit.multimap;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 〈一句話功能簡述〉<br>
 * 〈一對多接收者2〉
 *
 * @author zhao
 * @date 2018/11/14 16:59
 * @since 1.0.1
 */
@Component
@RabbitListener(queues = "OneToMany")
public class OneToManyReceiver2 {

    @RabbitHandler
    public void process(String foo) {
        System.out.println("OneToManyReceiver2 : " + foo);
    }

}

測試類、說明及結果

一個傳送端,2個接收端,可以看到結果是平均分佈的

// 測試一對多訊息
@Test
public void testOneToMany() throws Exception {
    // 一個傳送端,2個接收端,可以看到結果是平均分佈的
    // 結果
    // OneToManySender : OneToMany Wed Nov 14 19:37:17 GMT+08:00 2018
    //OneToManyReceiver2 : OneToMany Wed Nov 14 19:37:17 GMT+08:00 2018
    //OneToManySender : OneToMany Wed Nov 14 19:37:18 GMT+08:00 2018
    //OneToManyReceiver1 : OneToMany Wed Nov 14 19:37:18 GMT+08:00 2018
    //OneToManySender : OneToMany Wed Nov 14 19:37:18 GMT+08:00 2018
    //OneToManyReceiver2 : OneToMany Wed Nov 14 19:37:18 GMT+08:00 2018
    //OneToManySender : OneToMany Wed Nov 14 19:37:18 GMT+08:00 2018
    //OneToManyReceiver1 : OneToMany Wed Nov 14 19:37:18 GMT+08:00 2018
    //OneToManySender : OneToMany Wed Nov 14 19:37:18 GMT+08:00 2018
    //OneToManyReceiver2 : OneToMany Wed Nov 14 19:37:18 GMT+08:00 2018
    //OneToManySender : OneToMany Wed Nov 14 19:37:18 GMT+08:00 2018
    //OneToManyReceiver1 : OneToMany Wed Nov 14 19:37:18 GMT+08:00 2018
    //OneToManySender : OneToMany Wed Nov 14 19:37:18 GMT+08:00 2018
    //OneToManyReceiver2 : OneToMany Wed Nov 14 19:37:18 GMT+08:00 2018
    for (int i = 0; i < 100; i++) {
        oneToManySender.send();
        Thread.sleep(100);
    }
}

多對多訊息

建立佇列配置


// 測試多對多
@Bean
public Queue multimapQueue3() {
    return new Queue("manyToMany");
}

建立生產者

/*
 * Copyright (C), 2015-2018
 * FileName: ManyToManySender
 * Author:   zhao
 * Date:     2018/11/14 16:57
 * Description: 多對多翻譯
 * History:
 * <author>          <time>          <version>          <desc>
 * 作者姓名           修改時間           版本號              描述
 */
package com.lizhaobolg.message.rabbit.multimap;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 〈一句話功能簡述〉<br>
 * 〈多對多翻譯〉
 *
 * @author zhao
 * @date 2018/11/14 16:57
 * @since 1.0.1
 */
@Component
public class ManyToManySender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "ManyToManySender1 " + new Date();
        System.out.println("ManyToManySender1 : " + context);
        rabbitTemplate.convertAndSend("manyToMany", context);
    }

}

/*
 * Copyright (C), 2015-2018
 * FileName: ManyToManySender
 * Author:   zhao
 * Date:     2018/11/14 16:57
 * Description: 多對多翻譯
 * History:
 * <author>          <time>          <version>          <desc>
 * 作者姓名           修改時間           版本號              描述
 */
package com.lizhaobolg.message.rabbit.multimap;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 〈一句話功能簡述〉<br>
 * 〈多對多翻譯〉
 *
 * @author zhao
 * @date 2018/11/14 16:57
 * @since 1.0.1
 */
@Component
public class ManyToManySender2 {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "ManyToManySender2 " + new Date();
        System.out.println("ManyToManySender2 : " + context);
        rabbitTemplate.convertAndSend("manyToMany", context);
    }

}

建立消費者

/*
 * Copyright (C), 2015-2018
 * FileName: ManyToManyReceiver1
 * Author:   zhao
 * Date:     2018/11/14 16:59
 * Description: 多對多接收者1
 * History:
 * <author>          <time>          <version>          <desc>
 * 作者姓名           修改時間           版本號              描述
 */
package com.lizhaobolg.message.rabbit.multimap;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 〈一句話功能簡述〉<br>
 * 〈多對多接收者1〉
 *
 * @author zhao
 * @date 2018/11/14 16:59
 * @since 1.0.1
 */
@Component
@RabbitListener(queues = "manyToMany")
public class ManyToManyReceiver1 {

    public int count = 0;

    @RabbitHandler
    public void process(String foo) {
        System.out.println("ManyToManyReceiver1 : " + foo);
        count++;
    }

}

/*
 * Copyright (C), 2015-2018
 * FileName: ManyToManyReceiver2
 * Author:   zhao
 * Date:     2018/11/14 16:59
 * Description: 多對多接收者2
 * History:
 * <author>          <time>          <version>          <desc>
 * 作者姓名           修改時間           版本號              描述
 */
package com.lizhaobolg.message.rabbit.multimap;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 〈一句話功能簡述〉<br>
 * 〈多對多接收者2〉
 *
 * @author zhao
 * @date 2018/11/14 16:59
 * @since 1.0.1
 */
@Component
@RabbitListener(queues = "manyToMany")
public class ManyToManyReceiver2 {

    public int count = 0;
    @RabbitHandler
    public void process(String foo) {
        System.out.println("ManyToManyReceiver2 : " + foo);
        count++;
    }

}

/*
 * Copyright (C), 2015-2018
 * FileName: ManyToManyReceiver3
 * Author:   zhao
 * Date:     2018/11/14 16:59
 * Description: 多對多接收者1
 * History:
 * <author>          <time>          <version>          <desc>
 * 作者姓名           修改時間           版本號              描述
 */
package com.lizhaobolg.message.rabbit.multimap;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 〈一句話功能簡述〉<br>
 * 〈多對多接收者3〉
 *
 * @author zhao
 * @date 2018/11/14 16:59
 * @since 1.0.1
 */
@Component
@RabbitListener(queues = "manyToMany")
public class ManyToManyReceiver3 {

    public int count = 0;
    @RabbitHandler
    public void process(String foo) {
        System.out.println("ManyToManyReceiver3 : " + foo);
        count++;
    }

}

測試類、說明及結果

這裡是2對3的關係,結果看上去好像不是平均的,我們加上一個count,來統計各個接收者執行的次數,最後發現是66.67.67,所以是平均的

// 測試多對多訊息
@Test
public void testManyToMany() throws Exception {
    //這裡是2對3的關係,結果看上去好像不是平均的,
    // 我們加上一個count,來統計各個接收者執行的次數,最後發現是66.67.67,
    // 所以是平均的
    // 結果
    //        manyToManyReceiver1.count: 67
    //        manyToManyReceiver2.count: 66
    //        manyToManyReceiver3.count: 67
    //        ManyToManyReceiver3 : ManyToManySender1 Wed Nov 14 19:40:31 GMT+08:00 2018
    //        ManyToManyReceiver1 : ManyToManySender2 Wed Nov 14 19:40:31 GMT+08:00 2018
    //        ManyToManySender1 : ManyToManySender1 Wed Nov 14 19:40:31 GMT+08:00 2018
    //        ManyToManySender2 : ManyToManySender2 Wed Nov 14 19:40:31 GMT+08:00 2018
    //        ManyToManyReceiver2 : ManyToManySender1 Wed Nov 14 19:40:31 GMT+08:00 2018
    //        ManyToManyReceiver3 : ManyToManySender2 Wed Nov 14 19:40:31 GMT+08:00 2018
    //        ManyToManySender1 : ManyToManySender1 Wed Nov 14 19:40:31 GMT+08:00 2018
    //        ManyToManySender2 : ManyToManySender2 Wed Nov 14 19:40:31 GMT+08:00 2018
    //        ManyToManyReceiver2 : ManyToManySender2 Wed Nov 14 19:40:31 GMT+08:00 2018
    //        ManyToManyReceiver1 : ManyToManySender1 Wed Nov 14 19:40:31 GMT+08:00 2018
    //        ManyToManySender1 : ManyToManySender1 Wed Nov 14 19:40:31 GMT+08:00 2018
    //        ManyToManySender2 : ManyToManySender2 Wed Nov 14 19:40:31 GMT+08:00 2018
    //        ManyToManyReceiver1 : ManyToManySender2 Wed Nov 14 19:40:31 GMT+08:00 2018
    //        ManyToManyReceiver3 : ManyToManySender1 Wed Nov 14 19:40:31 GMT+08:00 2018

    for (int i = 0; i < 100; i++) {
        manyToManySender.send();
        manyToManySender2.send();
        Thread.sleep(100);
    }
    System.out.println(
            "manyToManyReceiver1.count: " + manyToManyReceiver1.count + "\n" + "manyToManyReceiver2.count: "
                    + manyToManyReceiver2.count + "\n" + "manyToManyReceiver3.count: " + manyToManyReceiver3.count
                    + "\n");

}

傳送物件

建立佇列配置

// 測試傳送物件
@Bean
public Queue entityQueue() {
    return new Queue("entity");
}

物件

/*
 * Copyright (C), 2015-2018
 * FileName: User
 * Author:   zhao
 * Date:     2018/11/14 18:25
 * Description: 實體--使用者
 * History:
 * <author>          <time>          <version>          <desc>
 * 作者姓名           修改時間           版本號              描述
 */
package com.lizhaobolg.message.rabbit.advance.entity;

import java.io.Serializable;

/**
 * 〈一句話功能簡述〉<br>
 * 〈實體--使用者〉
 *
 * @author zhao
 * @date 2018/11/14 18:25
 * @since 1.0.1
 */
public class User implements Serializable {


    private int id;
    private String name;

    public User(int id, String name) {
        this.id = id;
        this.name = name;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    @Override
    public String toString() {
        return "User{" + "id=" + id + ", name='" + name + '\'' + '}';
    }
}

建立生產者

/*
 * Copyright (C), 2015-2018
 * FileName: Sender
 * Author:   zhao
 * Date:     2018/11/14 15:28
 * Description: 傳送者
 * History:
 * <author>          <time>          <version>          <desc>
 * 作者姓名           修改時間           版本號              描述
 */
package com.lizhaobolg.message.rabbit.advance.entity;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 〈一句話功能簡述〉<br>
 * 〈傳送者〉
 *
 * @author zhao
 * @date 2018/11/14 15:28
 * @since 1.0.1
 */
@Component
public class EntitySender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        User user = new User(1, "小王");
        System.out.println("Sender : " + user);
        rabbitTemplate.convertAndSend("entity", user);
    }

}

建立消費者

/*
 * Copyright (C), 2015-2018
 * FileName: Receiver
 * Author:   zhao
 * Date:     2018/11/14 15:28
 * Description: 消費者
 * History:
 * <author>          <time>          <version>          <desc>
 * 作者姓名           修改時間           版本號              描述
 */
package com.lizhaobolg.message.rabbit.advance.entity;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 〈一句話功能簡述〉<br>
 * 〈消費者〉
 *
 * @author zhao
 * @date 2018/11/14 15:28
 * @since 1.0.1
 */
@Component
@RabbitListener(queues = "entity")
public class EntityReceiver {

    @RabbitHandler
    public void process(User user) {
        System.out.println("Receiver : " + user.toString());
    }

}

測試類、說明及結果

讓實體實現Serializable介面,就能直接傳送了

// 測試傳送實體
@Test
public void entity() {
    // 讓實體實現Serializable介面,就能直接傳送了
    // 結果
    // Sender : User{id=1, name='小王'}
    // Receiver : User{id=1, name='小王'}
    entitySender.send();
}

Topic Exchange

建立佇列配置

// 測試topic
final static String message = "topic.message";
final static String messages = "topic.messages";

@Bean
public Queue queueMessage() {
    return new Queue(RabbitConfig.message);
}

@Bean
public Queue queueMessages() {
    return new Queue(RabbitConfig.messages);
}

@Bean
TopicExchange exchange() {
    return new TopicExchange("exchange");
}

@Bean
Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
    return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}

@Bean
Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
    return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}

建立生產者

/*
 * Copyright (C), 2015-2018
 * FileName: Sender
 * Author:   zhao
 * Date:     2018/11/14 15:28
 * Description: 傳送者
 * History:
 * <author>          <time>          <version>          <desc>
 * 作者姓名           修改時間           版本號              描述
 */
package com.lizhaobolg.message.rabbit.advance.topic;

import com.lizhaobolg.message.rabbit.advance.entity.User;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 〈一句話功能簡述〉<br>
 * 〈傳送者〉
 *
 * @author zhao
 * @date 2018/11/14 15:28
 * @since 1.0.1
 */
@Component
public class TopicSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send1() {
        String context = "hi, i am message 1";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("exchange", "topic.message", context);
    }

    public void send2() {
        String context = "hi, i am messages 2";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("exchange", "topic.messages", context);
    }
}

建立消費者

/*
 * Copyright (C), 2015-2018
 * FileName: Receiver
 * Author:   zhao
 * Date:     2018/11/14 15:28
 * Description: 消費者
 * History:
 * <author>          <time>          <version>          <desc>
 * 作者姓名           修改時間           版本號              描述
 */
package com.lizhaobolg.message.rabbit.advance.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 〈一句話功能簡述〉<br>
 * 〈消費者〉
 *
 * @author zhao
 * @date 2018/11/14 15:28
 * @since 1.0.1
 */
@Component
@RabbitListener(queues = "topic.message")
public class TopicReceiver1 {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("TopicReceiver1 : " + hello);
    }

}

/*
 * Copyright (C), 2015-2018
 * FileName: Receiver
 * Author:   zhao
 * Date:     2018/11/14 15:28
 * Description: 消費者
 * History:
 * <author>          <time>          <version>          <desc>
 * 作者姓名           修改時間           版本號              描述
 */
package com.lizhaobolg.message.rabbit.advance.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 〈一句話功能簡述〉<br>
 * 〈消費者〉
 *
 * @author zhao
 * @date 2018/11/14 15:28
 * @since 1.0.1
 */
@Component
@RabbitListener(queues = "topic.messages")
public class TopicReceiver2 {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("TopicReceiver2 : " + hello);
    }

}

測試類及結果

// topic -- 測試根據key來繫結佇列
@Test
public void topic() {
    // 這裡queueMessages這個佇列,可以被2個key匹配,
    // 本身topic.messages會繫結過一個佇列
    // 所以會執行2次topic.messages的訊息

    // 結果
    // Sender : hi, i am message 1
    // Sender : hi, i am messages 2
    // TopicReceiver2 : hi, i am message 1
    // TopicReceiver1 : hi, i am message 1
    // TopicReceiver2 : hi, i am messages 2

    topicSender.send1();
    topicSender.send2();
}

Fanout Exchange

建立佇列配置

// 測試fanout
    @Bean
    public Queue AMessage() {
        return new Queue("fanout.A");
    }

    @Bean
    public Queue BMessage() {
        return new Queue("fanout.B");
    }

    @Bean
    public Queue CMessage() {
        return new Queue("fanout.C");
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(CMessage).to(fanoutExchange);
    }

建立生產者

/*
 * Copyright (C), 2015-2018
 * FileName: Sender
 * Author:   zhao
 * Date:     2018/11/14 15:28
 * Description: 傳送者
 * History:
 * <author>          <time>          <version>          <desc>
 * 作者姓名           修改時間           版本號              描述
 */
package com.lizhaobolg.message.rabbit.advance.fanout;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 〈一句話功能簡述〉<br>
 * 〈傳送者〉
 *
 * @author zhao
 * @date 2018/11/14 15:28
 * @since 1.0.1
 */
@Component
public class FanoutSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send1() {
        String context = "hi, fanout msg ";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
    }

}

建立消費者

/*
 * Copyright (C), 2015-2018
 * FileName: FanoutReceiver
 * Author:   zhao
 * Date:     2018/11/14 18:58
 * Description: fanout消費者
 * History:
 * <author>          <time>          <version>          <desc>
 * 作者姓名           修改時間           版本號              描述
 */
package com.lizhaobolg.message.rabbit.advance.fanout;

import com.lizhaobolg.message.rabbit.advance.entity.User;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 〈一句話功能簡述〉<br>
 * 〈fanout消費者〉
 *
 * @author zhao
 * @date 2018/11/14 18:58
 * @since 1.0.1
 */
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("FanoutReceiverA : " + msg);
    }

}

/*
 * Copyright (C), 2015-2018
 * FileName: FanoutReceiver
 * Author:   zhao
 * Date:     2018/11/14 18:58
 * Description: fanout消費者
 * History:
 * <author>          <time>          <version>          <desc>
 * 作者姓名           修改時間           版本號              描述
 */
package com.lizhaobolg.message.rabbit.advance.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 〈一句話功能簡述〉<br>
 * 〈fanout消費者〉
 *
 * @author zhao
 * @date 2018/11/14 18:58
 * @since 1.0.1
 */
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("FanoutReceiverB : " + msg);
    }

}

/*
 * Copyright (C), 2015-2018
 * FileName: FanoutReceiver
 * Author:   zhao
 * Date:     2018/11/14 18:58
 * Description: fanout消費者
 * History:
 * <author>          <time>          <version>          <desc>
 * 作者姓名           修改時間           版本號              描述
 */
package com.lizhaobolg.message.rabbit.advance.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 〈一句話功能簡述〉<br>
 * 〈fanout消費者〉
 *
 * @author zhao
 * @date 2018/11/14 18:58
 * @since 1.0.1
 */
@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("FanoutReceiverC : " + msg);
    }

}

測試類及結果

這裡屬於廣播,所以傳送一次,三個客戶端都能接收到

// fanout -- 廣播機制
@Test
public void fanout() {
    // 這裡屬於廣播,所以傳送一次,三個客戶端都能接收到
    // 結果
    // Sender : hi, fanout msg
    // FanoutReceiverB : hi, fanout msg
    // FanoutReceiverC : hi, fanout msg
    // FanoutReceiverA : hi, fanout msg
    fanoutSender.send1();
}

參考連結

聯絡方式

聯絡方式:QQ3060507060

檢視下一篇或者其他文章,可點選目錄或者專欄檢視

相關推薦

SpringBoot2.0系列--09--訊息佇列(Rabbit)

SpringBoot2.0系列–09–訊息佇列(Rabbit) 前言 JDK出11了,SpringBoot出2.0了,還沒有系統的學習過,剛好最近專案中有使用到,就把一些關鍵的東西列出來,避免忘記 SpringBoot2.0系列–00–目錄 介紹 當專案需要拆

訊息佇列(MQ)系列1.0 為啥需要訊息佇列

什麼場景會需要訊息佇列(MQ)? message queue: 主要原因是由於在高併發環境下,由於來不及同步處理,請求往往會發生堵塞,比如說,大量的insert,update之類的請求同時到達資料庫, 直接導致無數的行鎖表鎖,甚至最後請求會堆積過多,從而觸發too many

大型網站架構系列訊息佇列(二)(轉)

本文是大型網站架構系列:訊息佇列(二),主要分享JMS訊息服務,常用訊息中介軟體(Active MQ,Rabbit MQ,Zero MQ,Kafka)。【第二篇的內容大部分為網路資源的整理和彙總,供大家學習總結使用,最後有文章來源】 本次分享大綱 訊息佇列概述(見第一篇:大型網站架構系列:分散式訊息

SpringBoot2.0系列--07--熱部署

SpringBoot2.0系列–07–熱部署 文章目錄 SpringBoot2.0系列--07--熱部署 前言 介紹 總流程 具體操作 聯絡方式 前言 JDK出11了,SpringBoot

SpringBoot2.0系列--08--打包jar和war包

SpringBoot2.0系列–08–打包jar和war包 文章目錄 SpringBoot2.0系列--08--打包jar和war包 前言 介紹 總流程 具體操作 聯絡方式 前言 JDK出

Cris 玩轉大資料系列訊息佇列神器 Kafka

Cris 玩轉大資料系列之訊息佇列神器 Kafka Author:Cris 文章目錄 Cris 玩轉大資料系列之訊息佇列神器 Kafka Author:Cris 1. Kafka 概述

SpringBoot2.0系列--01--HelloWorld

SpringBoot2.0系列–01–HelloWorld 前言 JDK出11了,SpringBoot出2.0了,還沒有系統的學習過,剛好最近專案中有使用到,就把一些關鍵的東西列出來,避免忘記吧 準備工具 IntelliJ IDEA 2018.2.4 Mav

SpringBoot2.0系列--04--最簡單的Mybatis連線資料庫

SpringBoot2.0系列–04–最簡單的Mybatis連線資料庫 前言 JDK出11了,SpringBoot出2.0了,還沒有系統的學習過,剛好最近專案中有使用到,就把一些關鍵的東西列出來,避免忘記 SpringBoot2.0系列–00–目錄 介紹 寫完

叢集與負載均衡系列(4)——訊息佇列之Rabbitmq的搭建

        前面的三篇文章介紹了共享session,從這篇文章開始介紹訊息佇列,這裡用的是Rabbitmq。對於Rabbitmq的一些基本概念,不打算在這裡總結了。因為網上有大把總結的不錯的文章,比如點選開啟連結         這篇文章介紹Rabbitmq的安裝。  

SpringBoot2.0整合MQTT訊息推送功能

        這幾天在弄後端管理系統向指定的Android客戶端推送訊息的功能模組,查閱了網上很多部落格介紹的許多方式,最終選擇基於MQTT協議來實現,MQTT是一個輕量級的訊息釋出/訂閱協議,它是實現基於手機客戶端的訊息推送伺服器的理想解決方案。         實現M

叢集與負載均衡系列(7)——訊息佇列之分散式事務

         XA協議:                為了解決分散式事務,各大廠家資料庫都提供了xa協議介面。什麼是XA協議,就是通過多階段提交,確保資料一致性。以兩階段提交為例                                  第一階段為準備階段,

大型網站架構系列——分散式訊息佇列

     轉載自:https://www.cnblogs.com/itfly8/p/5155983.html  訊息佇列概述 訊息佇列使用場景 1、訊息佇列概述: 訊息佇列中介軟體是分散式系統中重要的元件,主要解決 應用耦合,非同步訊息,流量削鋒 等問題。實現高效能,高可用,可伸縮和最終一致性架構。是大型

SpringBoot熱部署devtool和配置檔案自動注入(SpringBoot2.0系列-二)

1、SpringBoot2.x使用Dev-tool熱部署 簡介:什麼是熱部署,使用springboot結合dev-tool工具,快速載入啟動應用 核心依賴包: <dependency> <groupId>o

XXL-MQ v1.2.0,分散式訊息佇列

Release Notes 1、client端與Broker長鏈初始化優化,防止重複建立連線。 2、POM多項依賴升級; 3、UI元件升級; 4、規範專案目錄結構; 6、超時控制; 5、通訊遷移至 xxl-rpc; 6、除了springboot型別示例;新增無框架

網際網路面試開小灶系列訊息佇列(一)

目錄 背景 為什麼使用訊息佇列 訊息佇列有什麼優缺點 訊息佇列的選型 重複消費你們是怎麼解決的? @(目錄) 背景 程式設計師不懂點訊息佇列的知識,怎麼能證明你

高併發架構系列:如何從0到1設計一個MQ訊息佇列

訊息佇列作為系統解耦,流量控制的利器,成為分散式系統核心元件之一。 如果你對訊息佇列背後的實現原理關注不多,其實瞭解訊息佇列背後的實現非常重要。 不僅知其然還要知其所以然,這才是一個優秀的工程師需要具備的特徵。 今天,我們就一起來探討設計一個訊息佇列背後的技術。 訊息佇列整體設計思路 主要是設計

SpringBoot2.0高階案例(07) 整合:Redis叢集 ,實現訊息佇列場景

本文原始碼 GitHub地址:知了一笑 https://github.com/cicadasmile/middle-ware-pa

大型網站架構系列:分散式訊息佇列(一)(轉)

以下是訊息佇列以下的大綱,本文主要介紹訊息佇列概述,訊息佇列應用場景和訊息中介軟體示例(電商,日誌系統)。 本次分享大綱 訊息佇列概述 訊息佇列應用場景 訊息中介軟體示例 JMS訊息服務(見第二篇:大型網站架構系列:分散式訊息佇列(二)) 常用訊息佇列(見第二篇:大型網站架構系列:分

springboot2.0 快速使用教程系列

1 springboot2.0專案搭建篇 (一)springboot2.0快速專案搭建和專案的配置(Spring Tool Suite(STS)) (二)springboot2.0快速專案搭建和專案的配置(IntellJ IDEA) (三)springboot2.0快速專案搭建和專

springboot2.x簡單詳細教程--訊息佇列介紹及整合ActiveMQ (第十三章)

一、JMS介紹和使用場景及基礎程式設計模型     簡介:講解什麼是小寫佇列,JMS的基礎知識和使用場景     1、什麼是JMS: Java訊息服務(Java Message Service),Java平臺中關於面向訊息中介