1. 程式人生 > >【Spring Boot】(30)、SpringBoot整合RabbitMQ

【Spring Boot】(30)、SpringBoot整合RabbitMQ

1、安裝

1.1、Erlang:

Erlang下載地址,下載後安裝即可。

1.2、RabbitMQ安裝

RabbitMQ下載地址,下載後安裝即可。

注意:Erlang的版本要與RabbitMQ版本需要匹配才行。

RabbitMQ Minimum required Erlang/OTP Maximum supported Erlang/OTP
3.7.x 19.3 20.3.x
3.6.15 19.3 20.3.x
3.6.14、 3.6.13 、 3.6.12、 3.6.11 R16B03 20.1.x
3.6.10 、 3.6.9、 3.6.8、 3.6.7、 3.6.6、 3.6.5、 3.6.4 R16B03 19.3.x
3.6.3 、 3.6.2、 3.6.1、 3.6.0 R16B03 18.3.x
3.5.x R14B04 17.5.x
3.4.x R13B03 16B03

 

2、視覺化管理介面

  • Erlang和RabbitMQ安裝完成後,通過命令列進入到RabbitMQ的安裝目錄下的sbin目錄,輸入以下命令,等待返回。

    • rabbitmq-plugins enable rabbitmq_management

  • 訪問http://ip:15672/,使用guest/guest或者admin/admin登入。

  • 如果使用的是Linux系統,記得把防火牆的埠15672開放或者把防火牆關閉。

  • 如果程式碼中需要使用新使用者作為測試,需要在Admin標籤頁中新建一個使用者,並同時設定密碼和virtual hosts

     

3、RabbitMQ術語

  • Server(Broker):接收客戶端連線,實現AMQP協議的訊息佇列和路由功能的程序;

  • Virtual Host:

    虛擬主機的概念,類似許可權控制組,一個Virtual Host裡可以有多個Exchange和Queue,許可權控制的最小麗都是Virtual Host;

  • Exchange:交換機,接收生產者傳送的訊息,並根據Routing Key將訊息路由到伺服器中的佇列Queue。

  • ExchangeType:交換機型別決定了路由訊息行為,RabbitMQ中主要有三種類型Exchange,分別是fanout、direct、topic;

  • Message Queue:訊息佇列,用於儲存還未被消費者消費的訊息;

  • Message:由Header和body組成,Header是由生產者新增的各種屬性的集合,包括Message是否被持久化、優先順序是多少、由哪個Message Queue接收等;body是真正需要傳送的資料內容;

  • BindingKey(RouteKey):繫結關鍵字,將一個特定的Exchange和一個特定的Queue繫結起來。

 

4、與Spring Boot的整合(簡單版HelloWorld)

4.1、引入RabbitMQ依賴

首先當然是新增RabbitMQ的依賴啦,從mvn repository找到SpringBoot整合RabbitMQ的整合包。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>${spring-boot-amqp.version}</version>
</dependency>

 來看看其依賴情況:

 

4.2、新增RabbitMQ配置資訊

spring.rabbitmq.host=ip地址
spring.rabbitmq.port=5672
spring.rabbitmq.username=使用者名稱
spring.rabbitmq.password=密碼
spring.rabbitmq.virtual-host=/
spring.rabbitmq.publisher-confirms=true

這裡配置的資訊是最基礎的,複雜的配置可以自行百度嘗試。

 

4.3、建立訊息生產者

通過注入AmqpTemplate介面的例項來實現訊息的傳送,AmqpTemplate介面定義了一套針對AMQP協議的基礎操作。在Spring Boot中會根據配置來注入其具體實現。

@Component
public class Sender {

	@Autowired
	private AmqpTemplate amqpTemplate;

	public void send(){
		String content = "hello : " + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));

		amqpTemplate.convertAndSend("hello", content);
	}
}

此時的AmqpTemplate物件其實是RabbitTemplate的例項,因為RabbitTemplateAmqpTemplate的子類:

至於為啥能自動注入這個Bean,後面會講解到,先不急。

 

4.4、建立訊息消費者

通過@RabbitListener註解定義該類對hello佇列的監聽,並用@RabbitHandler註解來指定對訊息的處理方法。所以,該消費者實現了對hello佇列的消費,消費操作為輸出訊息的字串內容。

@RabbitListener(queues = {"hello"})
@Component
public class Receiver {

	@RabbitHandler
	public void handler(String content){
		System.out.println("接收到:" + content);
	}
}

 

4.5、建立RabbitMQ配置類

用來配置佇列、交換器、路由等高階資訊。

@Configuration
public class RabbitConfig {

	@Bean
	public Queue helloQueue(){
		return new Queue("hello");
	}
}

4.6、編寫測試類

注入訊息生產者用於向佇列中傳送訊息

@SpringBootTest
@RunWith(SpringRunner.class)
public class TestRabbitmq {

	@Autowired
	private Sender sender;

	@Test
	public void test(){
		sender.send();
	}
}

 

4.7、測試

  • 先啟動主程式類,用於監聽佇列;

  • 然後執行測試類,使用生產者向佇列中傳送訊息。

 

4.8 、疑問

學過RabbitMQ基礎的童靴肯定知道,要通過RabbitMQ傳送訊息的話,需要建立通道,交換機,佇列,並將通道與交換機、交換機與佇列繫結起來,而上述的簡單例子中,為什麼沒看到通道、交換機的建立,也沒看到繫結的操作呢?其實在RabbitMQ中,在不建立交換機的情況下,RabbitMQ會建立一個預設的交換機,通過RabbitMQ視覺化管理介面可以看到:

而且建立的佇列,預設也就繫結到該交換機上,見下圖:

再仔細看這個預設交換機,能看到這個交換機(Exchange)型別是Direct模式的,至於什麼是Direct模式,後面會講。

 

5、Spring Boot自動配置RabbitMQ

同之前文章一樣,SpringBoot整合RabbitMQ同樣有個自動配置類,只不過RabbitMQ的自動配置類是由SpringBoot官方自行提供,而不像Mybatis是由Mybatis方提供的。這個自動配置類在spring-boot-autoconfigure-xxx.jar包中:

這裡說個題外話,是關於自定義Starter的小知識點:

  • 啟動器(Starter)只用來做依賴匯入;

  • 需要專門寫一個自動配置模組;

  • 啟動器依賴自動配置模組,使用的時候只需要引入啟動器(Starter);

而在命名規範中約定如下:

  • 官方名稱空間:

    • 字首:spring-boot-starter-xxx

    • 模式:spring-boot-starter-模組名

    • 舉例:spring-boot-starter-web、spring-boot-starter-jdbc、...

  • 自定義名稱空間:

    • 字首:xxx-spring-boot-starter

    • 模式:模組名-spring-boot-starter

    • 舉例:mybatis-spring-boot-starter、pagehelper-spring-boot-starter、...

 

進入正題之前,咱們先來看看Spring與RabbitMQ的整合時的配置資訊:

<!-- RabbitMQ start -->
<!-- 連線配置 -->
<rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}"
                           password="${mq.password}" port="${mq.port}"  />

<rabbit:admin connection-factory="connectionFactory"/>

<!-- 訊息佇列客戶端 -->
<rabbit:template id="amqpTemplate" exchange="${mq.queue}_exchange" connection-factory="connectionFactory"  />

<!-- queue 佇列宣告 -->
<!-- 
        durable 是否持久化 
        exclusive 僅建立者可以使用的私有佇列,斷開後自動刪除 
        auto-delete 當所有消費端連線斷開後,是否自動刪除佇列 -->
<rabbit:queue id="test_queue" name="${mq.queue}_testQueue" durable="true" auto-delete="false" exclusive="false" />

<!-- Topic交換機定義,其他型別交換機類似 -->
<!-- 
        交換機:一個交換機可以繫結多個佇列,一個佇列也可以繫結到多個交換機上。
        如果沒有佇列繫結到交換機上,則傳送到該交換機上的資訊則會丟失。
     -->
<rabbit:topic-exchange name="${mq.queue}_exchange" durable="true" auto-delete="false">
    <rabbit:bindings>
        <!-- 設定訊息Queue匹配的pattern (direct模式為key) -->
        <rabbit:binding queue="test_queue" pattern="${mq.queue}_patt"/>
    </rabbit:bindings>
</rabbit:topic-exchange>

<bean name="rabbitmqService" class="xxx.yyy.zzz"></bean>

<!-- 配置監聽 消費者 -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
    <!-- 
            queues 監聽佇列,多個用逗號分隔 
            ref 監聽器 -->
    <rabbit:listener queues="test_queue" ref="rabbitmqService"/>
</rabbit:listener-container>

配合上面的xml配置檔案來看看SpringBoot中RabbitMQ的自動配置類RabbitAutoConfiguration

@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {
    
    @Configuration
	@ConditionalOnMissingBean(ConnectionFactory.class)
	protected static class RabbitConnectionFactoryCreator {
    
        @Bean
        public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties config)
            throws Exception {
            RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean();
            //...
            return connectionFactory;
        }
    }
    
    @Configuration
	@Import(RabbitConnectionFactoryCreator.class)
	protected static class RabbitTemplateConfiguration {
        
        @Bean
		@ConditionalOnSingleCandidate(ConnectionFactory.class)
		@ConditionalOnMissingBean(RabbitTemplate.class)
		public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
			RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            //...
            return rabbitTemplate;
        }
        
        @Bean
		@ConditionalOnSingleCandidate(ConnectionFactory.class)
		@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
		@ConditionalOnMissingBean(AmqpAdmin.class)
		public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
			return new RabbitAdmin(connectionFactory);
		}
    }
}

該自動配置類中自動註冊了三個重要的Bean,分別是rabbitConnectionFactoryrabbitTemplateamqpAdmin,剛好與xml配置檔案中的前三個Bean一一對應。當然RabbitMQ的配置資訊由RabbitProperties類進行匯入:

@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitProperties {
	private String host = "localhost";
	private int port = 5672;
	private String username;
	private String password;
	private final Ssl ssl = new Ssl();
	private String virtualHost;
	
	private String addresses;
	private Integer requestedHeartbeat;
	private boolean publisherConfirms;
	private boolean publisherReturns;
	private Integer connectionTimeout;
	private final Cache cache = new Cache();
	private final Listener listener = new Listener();
	private final Template template = new Template();
	private List<Address> parsedAddresses;
	
	public static class Ssl {
		private boolean enabled;
		private String keyStore;
		private String keyStorePassword;
		private String trustStore;
		private String trustStorePassword;
		private String algorithm;
	}
	public static class Cache {
		private final Channel channel = new Channel();
		private final Connection connection = new Connection();
        
		public static class Channel {
			private Integer size;
			private Long checkoutTimeout;
		}

		public static class Connection {
			private CacheMode mode = CacheMode.CHANNEL;
			private Integer size;
        }
	}

	public static class Listener {
		@NestedConfigurationProperty
		private final AmqpContainer simple = new AmqpContainer();
	}

	public static class AmqpContainer {
		private boolean autoStartup = true;
		private AcknowledgeMode acknowledgeMode;
		private Integer concurrency;
		private Integer maxConcurrency;
		private Integer prefetch;
		private Integer transactionSize;
		private Boolean defaultRequeueRejected;
		private Long idleEventInterval;
		@NestedConfigurationProperty
		private final ListenerRetry retry = new ListenerRetry();
	}

	public static class Template {
		@NestedConfigurationProperty
		private final Retry retry = new Retry();
		private Boolean mandatory;
		private Long receiveTimeout;
		private Long replyTimeout;
	}

	public static class Retry {
		private boolean enabled;
		private int maxAttempts = 3;
		private long initialInterval = 1000L;
		private double multiplier = 1.0;
		private long maxInterval = 10000L;
	}

	public static class ListenerRetry extends Retry {
		private boolean stateless = true;
	}
	
	private static final class Address {
		private static final String PREFIX_AMQP = "amqp://";
		private static final int DEFAULT_PORT = 5672;
		private String host;
		private int port;
		private String username;
		private String password;
		private String virtualHost;
	}
}

大家自行嘗試,這裡就不多描述了。

 

RabbitAutoConfiguration配置類上有個簽名:

@Import(RabbitAnnotationDrivenConfiguration.class)

來看看RabbitAnnotationDrivenConfiguration類:

public class SimpleRabbitListenerContainerFactory
		extends AbstractRabbitListenerContainerFactory<SimpleMessageListenerContainer>
		implements ApplicationContextAware, ApplicationEventPublisherAware {

	private Executor taskExecutor;

	private PlatformTransactionManager transactionManager;

	private Integer txSize;

	private Integer concurrentConsumers;

	private Integer maxConcurrentConsumers;

	private Long startConsumerMinInterval;

	private Long stopConsumerMinInterval;

	private Integer consecutiveActiveTrigger;

	private Integer consecutiveIdleTrigger;

	private Integer prefetchCount;

	private Long receiveTimeout;

	private Boolean defaultRequeueRejected;

	private Advice[] adviceChain;

	private BackOff recoveryBackOff;

	private Boolean missingQueuesFatal;

	private Boolean mismatchedQueuesFatal;

	private ConsumerTagStrategy consumerTagStrategy;

	private Long idleEventInterval;

	private ApplicationEventPublisher applicationEventPublisher;

	private ApplicationContext applicationContext;
	
	@Override
	protected SimpleMessageListenerContainer createContainerInstance() {
		return new SimpleMessageListenerContainer();
	}

	@Override
	protected void initializeContainer(SimpleMessageListenerContainer instance) {
		//other code...
	}
}

它其實是SimpleMessageListenerContainer的工廠類,而SimpleMessageListenerContainer又是<rabbit:listener-container />標籤的具體實現類,剛好又同xml配置檔案的消費監聽容器對應。

至於其他的配置資訊,如佇列和交換機的建立,以及佇列與交換機的繫結就由配置類自行定義了。請往下接著看...

 

6、RabbitMQ交換機及工作模式

RabbitMQ的交換機Exchange有如下幾種型別:

  • Fanout

  • Direct

  • Topic

  • Header

其中header型別的Exchange由於用的相對較少,所以本章主要講述其他三種類型的Exchange。

 

RabbitMQ的工作模式:

  • 釋出/訂閱模式:對應Fanout型別的交換機。

  • 路由模式:對應Direct型別的交換機。

  • 萬用字元模式:對應Topic型別的交換機。

 

6.1、釋出/訂閱模式(Fanout)

任何傳送到Fanout Exchange的訊息都會被轉發到與該Exchange繫結(Binding)的所有Queue上。

  • 可以理解為路由表的模式;

  • 這種模式不需要RouteKey;

  • 這種模式需要提前將Exchange與Queue進行繫結,一個Exchange可以繫結多個Queue,一個Queue可以同多個Exchange進行繫結;

  • 如果接受到訊息的Exchange沒有與任何Queue繫結,則訊息會被拋棄。

 

程式碼示例:

FanoutConfig配置類程式碼,配置了兩個佇列和一個交換機,並繫結:

@Configuration
public class FanoutConfig {

    public static final String FANOUT_QUEUE_NAME_1 = "fanout-queue-1";
    public static final String FANOUT_QUEUE_NAME_2 = "fanout-queue-2";
    public static final String FANOUT_EXCHANGE_NAME = "fanout-exchange";

    @Bean
    public Queue fanoutQueue1() {
//        return new Queue(FANOUT_QUEUE_NAME_1);//預設情況,durable為true,exclusive為false,auto-delete為false
        return QueueBuilder.durable(FANOUT_QUEUE_NAME_1).build();
    }

    @Bean
    public Queue fanoutQueue2() {
//        return new Queue(FANOUT_QUEUE_NAME_1);//預設情況,durable為true,exclusive為false,auto-delete為false
        return QueueBuilder.durable(FANOUT_QUEUE_NAME_2).build();
    }

    @Bean
    public FanoutExchange fanoutExchange() {
//        return new FanoutExchange(FANOUT_EXCHANGE_NAME);//預設情況下,durable為true,auto-delete為false
        return (FanoutExchange) ExchangeBuilder.fanoutExchange(FANOUT_EXCHANGE_NAME).durable(true).build();
    }

    @Bean
    public Binding fanoutBinding1(FanoutExchange fanoutExchange, Queue fanoutQueue1) {
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    @Bean
    public Binding fanoutBinding2(FanoutExchange fanoutExchange, Queue fanoutQueue2) {
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

訊息生產者:

@Component(value = "fanout-sender")
public class Sender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send(String name){
        String content = "hello : " + name + ",當前時間:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));

        amqpTemplate.convertAndSend(FanoutConfig.FANOUT_EXCHANGE_NAME, "", content);
    }
}

消費者1號:

@RabbitListener(queues = {FanoutConfig.FANOUT_QUEUE_NAME_1})
@Component("fanout-receiver1")
public class Receiver1 {

	@RabbitHandler
	public void handler(String content){
		System.out.println("Fanout.Receiver1接收到:" + content);
	}
}

消費者2號:

@RabbitListener(queues = {FanoutConfig.FANOUT_QUEUE_NAME_2})
@Component("fanout-receiver2")
public class Receiver2 {

	@RabbitHandler
	public void handler(String content){
		System.out.println("Fanout.Receiver2接收到:" + content);
	}
}

Fanout控制器:

@RestController
public class FanoutController {

    @Autowired
    @Qualifier("fanout-sender")
    private Sender sender;

    @RequestMapping("/fanout")
    public String hello(String name){
        sender.send(name);
        return "success";
    }
}

輸入url:http://localhost:8081/fanout?name=Cay 觀看控制檯輸出:

 

6.2、路由模式(Direct)

  • RabbitMQ預設自帶Exchange,該Exchange的名字為空字串,當前也可以自己指定名字;

  • 在預設的Exchange下,不需要將Exchange與Queue繫結, RabbitMQ會自動繫結;而如果使用自定義的Exchange,則需要在將Exchange繫結到Queue的時候需要指定一個RouteKey;

  • 在訊息傳遞時需要一個RouteKey;

  • 所有傳送到Direct Exchange的訊息會被轉發到RouteKey中指定的Queue。

  • 如果vhost中不存在RouteKey中指定的佇列名,則該訊息會被拋棄。

 

程式碼示例:

DirectConfig配置類程式碼,配置兩個佇列,通過兩個不同的routeKey繫結到同一個Exchange上:

@Configuration
public class DirectConfig {

    public static final String DIRECT_QUEUE_NAME_1 = "direct-queue-1";
    public static final String DIRECT_QUEUE_NAME_2 = "direct-queue-2";
    public static final String DIRECT_EXCHANGE_NAME = "direct-exchange";

    public static final String ROUTE_KEY_1 = "direct.route.key.1";
    public static final String ROUTE_KEY_2 = "direct.route.key.2";

    @Bean
    public Queue directQueue1() {
//        return new Queue(DIRECT_QUEUE_NAME_1);//預設情況,durable為true,exclusive為false,auto-delete為false
        return QueueBuilder.durable(DIRECT_QUEUE_NAME_1).build();
    }

    @Bean
    public Queue directQueue2() {
//        return new Queue(DIRECT_QUEUE_NAME_2);//預設情況,durable為true,exclusive為false,auto-delete為false
        return QueueBuilder.durable(DIRECT_QUEUE_NAME_2).build();
    }

    @Bean
    public DirectExchange directExchange() {
//        return new DirectExchange(DIRECT_EXCHANGE_NAME_1);//預設情況下,durable為true,auto-delete為false
        return (DirectExchange) ExchangeBuilder.directExchange(DIRECT_EXCHANGE_NAME).durable(true).build();
    }

    @Bean
    public Binding directBinding1(DirectExchange directExchange, Queue directQueue1) {
        return BindingBuilder.bind(directQueue1).to(directExchange).with(ROUTE_KEY_1);
    }

    @Bean
    public Binding directBinding2(DirectExchange directExchange, Queue directQueue2) {
        return BindingBuilder.bind(directQueue2).to(directExchange).with(ROUTE_KEY_2);
    }
}

訊息生產者:

@Component("direct-sender")
public class Sender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send(Integer selector) {
        String content = "hello,我是%d號,當前時間:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        String routeKey = "";
        if (selector.intValue() == 1) {
            content = String.format(content, 1);
            routeKey = DirectConfig.ROUTE_KEY_1;
        } else if (selector.intValue() == 2) {
            content = String.format(content, 2);
            routeKey = DirectConfig.ROUTE_KEY_2;
        }
        amqpTemplate.convertAndSend(DirectConfig.DIRECT_EXCHANGE_NAME, routeKey, content);
    }
}

消費者1號:

@RabbitListener(queues = {DirectConfig.DIRECT_QUEUE_NAME_1})
@Component("direct-receiver1")
public class Receiver1 {

	@RabbitHandler
	public void handler(String content){
		System.out.println("Direct.Receiver1接收到:" + content);
	}
}

消費者2號:

@RabbitListener(queues = {DirectConfig.DIRECT_QUEUE_NAME_2})
@Component("direct-receiver2")
public class Receiver2 {

	@RabbitHandler
	public void handler(String content){
		System.out.println("Direct.Receiver2接收到:" + content);
	}
}

Direct控制器:

@RestController
public class DirectController {
    private static final Logger logger = LoggerFactory.getLogger(DirectController.class);

    @Autowired
    @Qualifier("direct-sender")
    private Sender sender;

    @RequestMapping("/direct")
    public String hello(@RequestParam(defaultValue = "1") int selector){
        logger.info("引數selector:" + selector);
        sender.send(selector);
        return "success";
    }
}

輸入兩次不同的引數selector:

 

6.3、萬用字元模式(Topic)

任何傳送到Topic Exchange的訊息都會被轉發到所有關心RouteKey中指定的Queue上。

  • 這種模式較為複雜,簡單來說,就是每個佇列都有其關心的主題,所有的訊息都帶有一個“標題”(RouteKey),Exchange會將訊息轉發到所有關注主題能與RouteKey模糊匹配的佇列。

  • 這種模式需要RouteKey,也需要提前繫結Exchange與Queue。

  • 在進行繫結時,要提供一個該佇列關心的主題,如“#.log.#”表示該佇列關心所有涉及log的訊息(一個RouteKey為”MQ.log.error”的訊息會被轉發到該佇列)。

  • “#”表示0個或若干個關鍵字,“*”表示一個關鍵字。如“log.*”能與“log.warn”匹配,但是無法與“log.warn.timeout”匹配;但是“log.#”能與上述兩者匹配。

  • 同樣,如果Exchange沒有發現能夠與RouteKey匹配的Queue,則會拋棄此訊息。

 

程式碼示例:

TopicConfig配置類,聲明瞭兩個佇列,分別對應兩個routeKey:topic.#和topic.*

@Configuration
public class TopicConfig {

	public static final String TOPIC_QUEUE_NAME_1 = "topic-queue-1";
	public static final String TOPIC_QUEUE_NAME_2 = "topic-queue-2";
	public static final String TOPIC_EXCHANGE_NAME = "topic-exchange";

	public static final String ROUTE_KEY_1 = "topic.#";
	public static final String ROUTE_KEY_2 = "topic.*";

	@Bean
	public TopicExchange topicExchange() {
//        return new TopicExchange(TOPIC_EXCHANGE_NAME);//預設情況下,durable為true,auto-delete為false
		return (TopicExchange) ExchangeBuilder.topicExchange(TOPIC_EXCHANGE_NAME).durable(true).build();
	}
	
	@Bean
	public Queue topicQueue1() {
		return new Queue(TOPIC_QUEUE_NAME_1);
	}

	@Bean
	public Queue topicQueue2() {
		return new Queue(TOPIC_QUEUE_NAME_2);
	}

	@Bean
	public Binding topicBinding1(TopicExchange topicExchange, Queue topicQueue1) {
		return BindingBuilder.bind(topicQueue1).to(topicExchange).with(ROUTE_KEY_1);
	}

	@Bean
	public Binding topicBinding2(TopicExchange topicExchange, Queue topicQueue2) {
		return BindingBuilder.bind(topicQueue2).to(topicExchange).with(ROUTE_KEY_2);
	}
}

訊息生產者:

@Component("topic-sender")
public class Sender {

    private static final String TOPIC_PREFIX = "topic.";

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send(String selector){
        String content = "hello,當前時間:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));

        amqpTemplate.convertAndSend(TopicConfig.TOPIC_EXCHANGE_NAME, TOPIC_PREFIX + selector, content);
    }
}

消費者1號:

@RabbitListener(queues = {TopicConfig.TOPIC_QUEUE_NAME_1})
@Component("topic-receiver1")
public class Receiver1 {

	@RabbitHandler
	public void handler(String content){
		System.out.println("Topic.Receiver1接收到:" + content);
	}
}

消費者2號:

@RabbitListener(queues = {TopicConfig.TOPIC_QUEUE_NAME_2})
@Component("topic-receiver2")
public class Receiver2 {

	@RabbitHandler
	public void handler(String content){
		System.out.println("Topic.Receiver2接收到:" + content);
	}
}

Topic控制器:

@RestController
@RequestMapping("/topic")
public class TopicController {

    @Autowired
    @Qualifier("topic-sender")
    private Sender sender;

    @RequestMapping("/send")
    public String send(String routeKey){
        sender.send(routeKey);
        return "success";
    }
}

通過url訪問不同的routeKey來檢視結果:

 

 

可以看到,如果引數為message的時候即routeKey為topic.message,兩個佇列都能接收到訊息,而如果引數為message.a的時候即routeKey為topic.message.a,只有佇列1能接收到訊息,而佇列2不能接收到訊息。

 

7、訊息確認與回撥

預設情況下,RabbitMQ傳送訊息以及接收訊息是自動確認的,意思也就是說,訊息傳送方傳送訊息的時候,認為訊息已經成功傳送到了RabbitMQ伺服器,而當訊息傳送給消費者後,RabbitMQ伺服器就立即自動確認,然後將訊息從佇列中刪除了。而這樣的自動機制會造成訊息的丟失,我們常常聽到“丟訊息”的字眼。

為了解決訊息的丟失,RabbitMQ便產生了手動確認的機制:

  • 傳送者:

    • 當訊息不能路由到任何佇列時,會進行確認失敗操作,如果傳送方設定了mandatory=true模式,則先會呼叫basic.return方法,然後呼叫basic.ack方法;

    • 當訊息可以路由時,訊息被髮送到所有繫結的佇列時,進行訊息的確認basic.ack

  • 接收者:

    • 當訊息成功被消費時,可以進行訊息的確認basic.ack

    • 當訊息不能正常被訊息時,可以進行訊息的反確認basic.nack 或者拒絕basic.reject

當設定mandatory=true時,其中basic.ackbasic.nack會呼叫自定義的RabbitTemplate.ConfirmCallback介面的confirm方法。

public interface ConfirmCallback {
    /**
		 * Confirmation callback.
		 * @param correlationData correlation data for the callback.
		 * @param ack true for ack, false for nack
		 * @param cause An optional cause, for nack, when available, otherwise null.
		 */
    void confirm(CorrelationData correlationData, boolean ack, String cause);

}

而傳送者傳送訊息時無法路由後,會呼叫baisc.return方法,其確認機制由RabbitTemplate.ReturnCallback介面的returnedMessage方法實現。

public interface ReturnCallback {
    /**
		 * Returned message callback.
		 * @param message the returned message.
		 * @param replyCode the reply code.
		 * @param replyText the reply text.
		 * @param exchange the exchange.
		 * @param routingKey the routing key.
		 */
    void returnedMessage(Message message, int replyCode, String replyText,
                         String exchange, String routingKey);
}

 

7.1、使用Spring配置RabbitMQ的確認機制

  • 修改publisher-confirms為true
<!--連線工廠 -->
 <rabbit:connection-factory id="connectionFactory" host="{ip地址}" port="{埠}" username="{使用者名稱}" password="{密碼}" publisher-confirms="true"/>
  • 修改訊息回撥方法confirm-callback和return-callback為bean的id
<!-- mandatory必須設定true,return callback才生效 -->
<rabbit:template id="amqpTemplate"	connection-factory="connectionFactory" 
                 confirm-callback="confirmCallBackListener"
                 return-callback="returnCallBackListener" 
                 mandatory="true" 
/>
  • 訊息回撥類

@Service
public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback{

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
		//other code...
    }
}

@Service
class returnCallBackListener implements RabbitTemplate.ReturnCallback{
    
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
		//other code...
    }
}
  • 修改訊息確認機制改成手動確認manual:
<rabbit:listener-container
		connection-factory="connectionFactory" acknowledge="manual" >
		<!-- other xml  -->
</rabbit:listener-container>

 

7.2、使用SpringBoot配置RabbitMQ的確認機制

@Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
    //other code...
    
    //傳送者是否確認
    cachingConnectionFactory.setPublisherConfirms(true);
    return cachingConnectionFactory;
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(){
    SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();

    //other code...

    //修改成手動確認
    rabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

    return rabbitListenerContainerFactory;
}

@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());

    //重點
    rabbitTemplate.setMandatory(true);
    //訊息回撥
    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {

    });

    rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {

    });
    return rabbitTemplate;
}

 現在SpringBoot整合RabbitMQ的基礎寫得差不多了,還有很多功能需要大家去研究了。下期再見...

 

====================打個廣告,歡迎關注====================

QQ:

412425870

微信公眾號:Cay課堂

csdn部落格:

http://blog.csdn.net/caychen

碼雲:

https://gitee.com/caychen/

github:

https://github.com/caychen

點選群號或者掃描二維碼即可加入QQ群:

328243383(1群)

 

點選群號或者掃描二維碼即可加入QQ群:
180479701(2群)