1. 程式人生 > >Springboot2(28)整合rabbitmq實現延時訊息

Springboot2(28)整合rabbitmq實現延時訊息

原始碼地址

springboot2教程系列

rabbitmq實現訊息的確認機制和延時訊息的傳送

訊息生產者程式碼實現的主要配置

@Configuration
@Slf4j
public class PrividerRabbitmqConfig {
	 
    @Resource
    private RabbitTemplate rabbitTemplate;


    /**
	     * 定製化amqp模版      可根據需要定製多個
	     * 
	     * 
	     * 此處為模版類定義 Jackson訊息轉換器
	     * ConfirmCallback介面用於實現訊息傳送到RabbitMQ交換器後接收ack回撥   即訊息傳送到exchange  ack
	     * ReturnCallback介面用於實現訊息傳送到RabbitMQ 交換器,但無相應佇列與交換器繫結時的回撥  即訊息傳送不到任何一個佇列中  ack
	     *
	     * @return the amqp template
	     */
@Bean public AmqpTemplate amqpTemplate() { //使用jackson 訊息轉換器 rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); rabbitTemplate.setEncoding("UTF-8"); //開啟returncallback yml 需要 配置 publisher-returns: true rabbitTemplate.setMandatory
(true); rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { log.info("訊息:{} 傳送失敗, 應答碼:{} 原因:{} 交換機: {} 路由鍵: {}", replyCode, replyText, exchange, routingKey); }); //訊息確認 yml 需要配置 publisher-returns: true rabbitTemplate.
setConfirmCallback((correlationData, ack, cause) -> { if (ack) { log.debug("訊息傳送到exchange成功,id: {}", correlationData.getId()); } else { log.info("訊息傳送到exchange失敗,原因: {}", cause); } }); return rabbitTemplate; } /* ----------------------------------------------------------------------------Direct exchange test--------------------------------------------------------------------------- */ /** * 宣告Direct交換機 支援持久化. * * @return the exchange */ @Bean("directExchange") public Exchange directExchange() { return ExchangeBuilder.directExchange("DIRECT_EXCHANGE").durable(true).build(); } /** * 宣告一個佇列 支援持久化. * * @return the queue */ @Bean("directQueue") public Queue directQueue() { return QueueBuilder.durable("DIRECT_QUEUE").build(); } /** * 通過繫結鍵 將指定佇列繫結到一個指定的交換機 . * * @param queue the queue * @param exchange the exchange * @return the binding */ @Bean public Binding directBinding(@Qualifier("directQueue") Queue queue, @Qualifier("directExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("DIRECT_ROUTING_KEY").noargs(); } /* ----------------------------------------------------------------------------Fanout exchange test--------------------------------------------------------------------------- */ /** * 宣告 fanout 交換機. * * @return the exchange */ @Bean("fanoutExchange") public FanoutExchange fanoutExchange() { return (FanoutExchange) ExchangeBuilder.fanoutExchange("FANOUT_EXCHANGE").durable(true).build(); } /** * Fanout queue A. * * @return the queue */ @Bean("fanoutQueueA") public Queue fanoutQueueA() { return QueueBuilder.durable("FANOUT_QUEUE_A").build(); } /** * Fanout queue B . * * @return the queue */ @Bean("fanoutQueueB") public Queue fanoutQueueB() { return QueueBuilder.durable("FANOUT_QUEUE_B").build(); } /** * 繫結佇列A 到Fanout 交換機. * * @param queue the queue * @param fanoutExchange the fanout exchange * @return the binding */ @Bean public Binding bindingA(@Qualifier("fanoutQueueA") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) { return BindingBuilder.bind(queue).to(fanoutExchange); } /** * 繫結佇列B 到Fanout 交換機. * * @param queue the queue * @param fanoutExchange the fanout exchange * @return the binding */ @Bean public Binding bindingB(@Qualifier("fanoutQueueB") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) { return BindingBuilder.bind(queue).to(fanoutExchange); } /** * 超時佇列 * 訊息被拒絕(basic.reject/ basic.nack)並且requeue=false * 訊息TTL過期(參考:RabbitMQ之TTL(Time-To-Live 過期時間)) * 佇列達到最大長度 * @return */ @Bean("deadLetterQueue") public Queue deadLetterQueue() { Map<String, Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", "DIRECT_EXCHANGE"); arguments.put("x-dead-letter-routing-key", "DIRECT_ROUTING_KEY"); Queue queue = new Queue("DEAD_LETTER_QUEUE",true,false,false,arguments); return queue; } @Bean public Binding deadLetterBinding(@Qualifier("deadLetterQueue") Queue queue, @Qualifier("directExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("DIRECT_ROUTING_KEY2").noargs(); } /** * 優先順序佇列 * @return */ /* @Bean("priorityQueue")*/ public Queue priorityQueue(){ Map<String, Object> arguments = new HashMap<>(); arguments.put("x-max-priority",10); //佇列的屬性引數 有10個優先級別 Queue queue = new Queue("DEAD_LETTER_QUEUE",true,false,false,arguments); return queue; } /* @Bean public Binding priorityBinding(@Qualifier("priorityQueue") Queue queue, @Qualifier("directExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("DIRECT_ROUTING_KEY").noargs(); }*/ }

訊息消費者程式碼實現的主要配置

@Configuration
public class ConsumerRabbitmqConfig {

    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);             //開啟手動 ack
        return factory;
    }

}

1 傳送確認

application.yml新增配置

spring.rabbitmq.publisher-confirms: true #將channel通道設定成confirm模式

設定訊息確認會影響併發效能,導致訊息掉失。因為每個connection最多支援2048個channel,當channel達到2048時,會報錯org.springframework.amqp.AmqpResourceNotAvailableException: The channelMax limit is reached. Try later

@Bean
public AmqpTemplate amqpTemplate() {
    //使用jackson 訊息轉換器
    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    rabbitTemplate.setEncoding("UTF-8");
    //開啟returncallback     yml 需要 配置    publisher-returns: true
    rabbitTemplate.setMandatory(true);
    rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
        log.info("訊息:{} 傳送失敗, 應答碼:{} 原因:{} 交換機: {}  路由鍵: {}", replyCode, replyText, exchange, routingKey);
    });
    //訊息確認  yml 需要配置   publisher-returns: true
    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
        if (ack) {
            log.debug("訊息傳送到exchange成功,id: {}", correlationData.getId());
        } else {
            log.info("訊息傳送到exchange失敗,原因: {}", cause);
        }
    });
    return rabbitTemplate;
}

rabbitTemplate.setConfirmCallback()

訊息傳送到 Broker 後觸發回撥,確認訊息是否到達 Broker 伺服器,也就是隻確認是否正確到達 Exchange 中

rabbitTemplate.setReturnCallback()

通過實現 ReturnCallback 介面,啟動訊息失敗返回,比如路由不到佇列時觸發回撥

同一個連線不同channel使用事務和釋出確認

2 消費確認

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual

RabbitListenerContainerFactory 中進行開啟手動 ack

@Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);             //開啟手動 ack
        return factory;
    }

確認訊息

 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

3 傳送訊息

CorrelationData correlationData = new CorrelationData( UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("FANOUT_EXCHANGE", "",new UserData(userName),correlationData);

4 超時佇列

/**
* 超時佇列
* 訊息被拒絕(basic.reject/ basic.nack)並且requeue=false
* 訊息TTL過期(參考:RabbitMQ之TTL(Time-To-Live 過期時間))
* 佇列達到最大長度
* @return
*/
@Bean("deadLetterQueue")
public Queue deadLetterQueue() {
    Map<String, Object> arguments = new HashMap<>();
    arguments.put("x-dead-letter-exchange", "DIRECT_EXCHANGE");
    arguments.put("x-dead-letter-routing-key", "DIRECT_ROUTING_KEY");
    Queue queue = new Queue("DEAD_LETTER_QUEUE",true,false,false,arguments);
    return queue;
}
   /**
     * 傳送超時訊息
     * @return
     */
@RequestMapping("/deadLetter")
@ResponseBody
public R deadLetterMsg(){
    MessagePostProcessor processor = new MessagePostProcessor(){
        public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties().setExpiration("10000" );//10秒後超時
            message.getMessageProperties().setPriority(1);
            return message;
        }
    };
    rabbitTemplate.convertAndSend("DIRECT_EXCHANGE","DIRECT_ROUTING_KEY2", "333",  processor);
    return R.ok();
}

通過超時佇列達到訊息延時傳送的效果。當訊息傳送到超時佇列deadLetterQueue經過一定時間超時後,訊息會路由到繫結的exchange(DIRECT_EXCHANGE)。而繫結exchange的佇列會得到訊息。

測試

請求地址:http://127.0.0.1:8080/deadLetter

訊息傳送

在這裡插入圖片描述

到收到訊息

在這裡插入圖片描述