1. 程式人生 > >RabbitMQ 訊息中介軟體如何保證消費者customer能夠成功處理訊息?

RabbitMQ 訊息中介軟體如何保證消費者customer能夠成功處理訊息?

一、確保消費者customer處理訊息成功

預設情況下消費者C1接收到訊息1無論是否正常接受和處理都會立即應答rabbit伺服器,然後訊息1就會從佇列中被刪除,假如C1突然出現異常狀況導致訊息1沒有被處理完畢,那麼訊息1就處理失敗了,也不會有其他消費者去處理訊息1。事實上我們希望的是訊息1如果沒有被C1正確處理完畢,那麼就傳送給其他消費者處理,為了達到這個目的,只需要做兩件事情,第一關閉rabbitMq的自動應答機制,第二消費者正確處理完訊息後手動應答。

RabbitMQ應答機制:

  • 自動確認,預設是自動確認,即獲取訊息後,直接確認。
  • 手動確認,給當前訊息設定狀態,當手動ack後服務端才會刪除該訊息,如果返回nack,重新入隊。

customer在監聽佇列接收訊息的時候,申明取消自動應答,手動返回完成。

channel.basicConsume(QUEUE_NAME, false, consumer);

在完成消費操作時,返回確認狀態。

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  • delivery.getEnvelope().getDeliveryTag(): 訊息id
  • true: 這裡true或者false都代表已經應答

二、存在兩個大問題

問題: 1)重複消費 2) 訊息堆積

1. 訊息重發導致訊息重複消費的問題(訊息中介軟體冪等性)

如果在消費方customer在完成消費了之後,由於網路問題沒有及時應答,就會存在大量訊息堆積在MQ伺服器。
由於RabbitMQ有訊息重新發送的機制,如果沒有及時迴應那麼就會繼續重發,重發就會導致訊息重複消費。

(1)在生產者producer產生訊息的時候可以給訊息一個唯一的id。
(2)在執行完畢之後,利用redis快取訊息id,判斷時候消費過。

單個消費者:

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
	String value = redis.get("key_"+envelope.getDeliveryTag());
	if (value != null){
		//之前已經執行過了,所以直接應答
		channel.basicAck(deliveryTag, true);
		return ;
	}
	String exchange = envelope.getExchange();//交換
	long deliveryTag = envelope.getDeliveryTag();//訊息id
	String routingKey  = envelope.getRoutingKey();//路由key
	String message = new String(body, "utf-8");
	System.out.println(message);
	//先在redis中放入訊息id,記得加上過期時間
	// 返回確認狀態
}

多個消費者: 分散式鎖解決

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
	try{
		//在賦值鎖的時候可以加上過期時間
		boolean flag = redisTemplate.opsForValue().setIfAbsent("key_"+envelope.getDeliveryTag(),envelope.getDeliveryTag());
		//如果沒有被賦值則返回true
		if(flag){
			String exchange = envelope.getExchange();//交換
			long deliveryTag = envelope.getDeliveryTag();//訊息id
			 String routingKey  = envelope.getRoutingKey();//路由key
			String message = new String(body, "utf-8");
			System.out.println(message);
			// 返回確認狀態
		}
	}catch(Exception e){
	//列印日誌
	//刪除redis中的記錄
	//直接return;不應答,等待再次重新發送。
	}
}

2. 訊息堆積解決

1)加大rabbitMQ的記憶體空間