1. 程式人生 > >RabbitMQ延遲消費和重複消費

RabbitMQ延遲消費和重複消費

轉載自 https://blog.csdn.net/quliuwuyiz/article/details/79301054

使用RabbitMQ實現延遲任務
場景一:物聯網系統經常會遇到向終端下發命令,如果命令一段時間沒有應答,就需要設定成超時。

場景二:訂單下單之後30分鐘後,如果使用者沒有付錢,則系統自動取消訂單。

延遲任務的模型如下圖:

 

基於 RabbitMQ 實現的分散式延遲重試佇列
場景一:在消費該訊息的時候,發現條件不滿足,需要等待30分鐘,重新消費該訊息,再次判斷是否滿足條件,如果滿足則消費該訊息,如果不滿足,則再等待30分鐘。這樣的場景通過mq佇列來實現。

在訊息佇列的監聽過程中,先判斷條件是否滿足,滿足,則直接消費。不滿足,則將該訊息傳送到上圖的死信佇列,但是在死信佇列失效之後,需要重新轉發到當前佇列進行消費就可以實現該功能。

基本概念如下: 訊息的TTL ( Time to Live ) 和 DLX (Dead Letter Exchange)

 

訊息的TTL就是訊息的存活時間。RabbitMQ可以對佇列和訊息分別設定TTL。對佇列設定就是佇列沒有消費者連著的保留時間,也可以對每一個單獨的訊息做單獨的設定。超過了這個時間,我們認為這個訊息就死了,稱之為死信。如果佇列設定了,訊息也設定了,那麼會取小的。所以一個訊息如果被路由到不同的佇列中,這個訊息死亡的時間有可能不一樣(不同的佇列設定)。這裡單講單個訊息的TTL,因為它才是實現延遲任務的關鍵。

可以通過設定訊息的expiration欄位或者x-message-ttl屬性來設定時間,兩者是一樣的效果。只是expiration欄位是字串引數,所以要寫個int型別的字串:

byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");
channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);
當上面的訊息扔到佇列中後,過了60秒,如果沒有被消費,它就死了。不會被消費者消費到。這個訊息後面的,沒有“死掉”的訊息對頂上來,被消費者消費。死信在佇列中並不會被刪除和釋放,它會被統計到佇列的訊息數中去。單靠死信還不能實現延遲任務,還要靠Dead Letter Exchange。

Dead Letter Exchanges
Exchage的概念在這裡就不在贅述,可以從這裡進行了解。一個訊息在滿足如下條件下,會進死信路由,記住這裡是路由而不是佇列,一個路由可以對應很多佇列。

1. 一個訊息被Consumer拒收了,並且reject方法的引數裡requeue是false。也就是說不會被再次放在佇列裡,被其他消費者使用。

2. 上面的訊息的TTL到了,訊息過期了。

3. 佇列的長度限制滿了。排在前面的訊息會被丟棄或者扔到死信路由上。

Dead Letter Exchange其實就是一種普通的exchange,和建立其他exchange沒有兩樣。只是在某一個設定Dead Letter Exchange的佇列中有訊息過期了,會自動觸發訊息的轉發,傳送到Dead Letter Exchange中去。

package com.test.sender.delay;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;


import javax.annotation.Resource;


import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;


import com.drools.model.MQPushErrorFlow;


@Component
@PropertySource(value = "classpath:riskConfigMq.properties")
public class LifsInCompleteDataOneSend {
private static final Log log = LogFactory.getLog(LifsInCompleteDataOneConfig.class);


private static final String DELAY_QUEUE_PER_QUEUE_TTL_NAME_FIRST = "delay_queue_per_queue_lifs_ttl"; // TTL配置在佇列上的緩衝佇列。
private static final String DELAY_QUEUE_PER_QUEUE_ROUTING_KEY_FIRST = "delay_queue_per_queue_lifs_routing_key"; // TTL配置在佇列上的緩衝佇列。


private static final Integer QUEUE_EXPIRATION_FIRST = 30000;


/**
* 訊息佇列業務名稱
*/
@Value("${lifs.consumer.pushServiceName}")
private String pushServiceName;


/**
* 訂閱平臺名稱
*/
@Value("${lifs.consumer.platformName}")
private String platformName;


/**
* 訊息佇列一個業務使用的佇列的數量
*/
@Value("${lifs.consumer.queueShardingCount}")
private Integer queueShardingCount;


/**
* 交換機的名稱,共用lifs監聽的交換機
*/
@Value("${lifs.consumer.exchangeName}")
private String exchangeName;
/**
* 重發次數
*/
@Value("${rabbitmq.resend.times}")
private Integer resendTimes;


/**
* 底層需要使用的真實發送物件,每個傳送物件都需要對應一個
*/
@Resource(name = "lnCompleteDataOneRabbitTemplate")
private RabbitTemplate rabbitTemplate;

@Bean
public Queue delayQueueFirstTTL() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", exchangeName);
arguments.put("x-dead-letter-routing-key", getDirectRoutingKey(pushServiceName, 0, platformName));
arguments.put("x-message-ttl", QUEUE_EXPIRATION_FIRST);
Queue queue = new Queue(DELAY_QUEUE_PER_QUEUE_TTL_NAME_FIRST, true, false, false, arguments);
log.info("第一次延遲佇列名稱: " + DELAY_QUEUE_PER_QUEUE_TTL_NAME_FIRST + "  延期之後的轉發的routingKey: " + getDirectRoutingKey(pushServiceName, 0, platformName) + "  exchange: " + exchangeName);
/*
* Queue queue = QueueBuilder.durable(DELAY_QUEUE_PER_QUEUE_TTL) //
* delay_queue_per_queue_ttl .withArgument("x-dead-letter-exchange",DELAY_EXCHANGE_NAME)
* .withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME)
* .withArgument("x-message-ttl",QUEUE_EXPIRATION).build(); 
* Queue queue =new Queue(DELAY_QUEUE_PER_QUEUE_TTL,true);
*/
return queue;
}


@Bean
public Binding lnCompleteDataOneBinding() {
return BindingBuilder.bind(delayQueueFirstTTL()).to(lnCompleteDataOneExchange()).with(DELAY_QUEUE_PER_QUEUE_ROUTING_KEY_FIRST);
}


@Bean(name = "lnCompleteDataOneExchange")
public DirectExchange lnCompleteDataOneExchange() {
return new DirectExchange(exchangeName);
}


    private String getDirectRoutingKey(String pushServiceName, int shardingIndex, String platformName) {
        return String.format("%s.%d.%s", pushServiceName, shardingIndex, platformName);
    }
    
    @Bean(name = "delayQueueFirstListenerContainer")
    public String delayQueueFirstListenerContainer(@Qualifier("lnCompleteDataOneConnectionFactory") ConnectionFactory connectionFactory) {
    Queue queue = delayQueueFirstTTL();
    RabbitAdmin ra = new RabbitAdmin(connectionFactory);
        ra.declareExchange(lnCompleteDataOneExchange());
        ra.declareQueue(queue);
        ra.declareBinding(lnCompleteDataOneBinding());
        log.info("delayQueueFirstListenerContainer: queueName" + queue.getName() + "  exchangeName: " + lnCompleteDataOneExchange().getName() + " routingKey: " + DELAY_QUEUE_PER_QUEUE_ROUTING_KEY_FIRST);
        return "";
    }
/**
* 自動生成uuid呼叫傳送方法

* @param dto
* @param routingId
*/
public String send(String message) {
DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
log.info("延遲半分鐘的佇列中接受訊息的時間: " + df.format(new Date()) + "\n訊息的內容:" + message);


rabbitTemplate.convertAndSend(DELAY_QUEUE_PER_QUEUE_TTL_NAME_FIRST, message); // 向佇列裡面傳送訊息,第一個引數是佇列名稱,第二個引數是內容


return "sender delay";
}

}

package com.test.sender.delay;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;
import com.framework.mq.common.RabbitConfig;

@Component
@PropertySource(value = "classpath:riskConfigMq.properties")
public class LifsInCompleteDataOneConfig {

/**
* MQ服務地址和埠號
*/
@Value("${rabbitmq.addresses}")
private String addresses;
/**
* MQ使用者名稱
*/
@Value("${rabbitmq.username}")
private String username;
/**
* MQ密碼
*/
@Value("${rabbitmq.password}")
private String password;
/**
* MQ的虛擬主機
*/
@Value("${rabbitmq.virtualHost}")
private String virtualHost;
/**
* MQ是否使用傳送確認模式(必須開啟)
*/
@Value("${rabbitmq.publisherConfirms}")
private boolean publisherConfirms;
/**
* 快取的channel的數量
*/
@Value("${rabbitmq.channelCacheSize}")
private Integer channelCacheSize;
/**
* 快取的連線數量
*/
@Value("${rabbitmq.connectionCacheSize}")
private Integer connectionCacheSize;


/**
* 交換機的名稱,共用lifs監聽的交換機
*/
@Value("${lifs.consumer.exchangeName}")
private String exchangeName;


/**
* 注入RabbitConfig物件
* @return
*/
@Bean(name = "lnCompleteDataOneRabbitConfig")
public RabbitConfig rabbitConfig() {
return new RabbitConfig(addresses, username, password, virtualHost, publisherConfirms, channelCacheSize,
connectionCacheSize, exchangeName);
}


/**
* 注入連線工廠物件

* @param rabbitConfig 之前注入的 @RabbitConfig 物件
* @return
*/
@Bean(name = "lnCompleteDataOneConnectionFactory")
public ConnectionFactory connectionFactory(
@Qualifier(value = "lnCompleteDataOneRabbitConfig") RabbitConfig rabbitConfig) {
return rabbitConfig.getConnectionFactory();
}

/**
* 注入的 @RabbitTemplate 物件

* @param connectionFactory
* @return
*/
@Bean(name = "lnCompleteDataOneRabbitTemplate")
RabbitTemplate rabbitTemplate(
@Qualifier("lnCompleteDataOneConnectionFactory") ConnectionFactory connectionFactory) {

return new RabbitTemplate(connectionFactory);
}
}

在初次監聽訊息佇列的地方

在業務程式碼中,判斷條件是否滿足,如果不滿足,賦值incompleteDataFlagResult=1,在第二次重試的時候,如果還不滿足,則賦值incompleteDataFlagResult=2,如果滿足,則賦值incompleteDataFlagResult=200,直接消費,併發送回調的mq。

if(incompleteDataFlagResult==1){ //推進到等待30秒過期的佇列
lifsInCompleteDataOneSend.send(JSONObject.toJSONString(request));

}else if(incompleteDataFlagResult==2){  //推進到等待60秒過期的佇列

lifsInCompleteDataTwoSend.send(JSONObject.toJSONString(request));

}else if(incompleteDataFlagResult==3){ // 進行儲存,需要手工處理
InstallmentRequestFailure installmentRequestFailure = new InstallmentRequestFailure();
installmentRequestFailureService.save(installmentRequestFailure);
}else if(incompleteDataFlagResult==200){
lifsPushSender.send(request, customerId);
}

 
--------------------- 
作者:quliuwuyiz 
來源:CSDN 
原文:https://blog.csdn.net/quliuwuyiz/article/details/79301054 
版權宣告:本文為博主原創文章,轉載請附上博文連結!