1. 程式人生 > >中介軟體系列十 RabbitMQ之消費者端的訊息確認機制

中介軟體系列十 RabbitMQ之消費者端的訊息確認機制

概述

在RabbitMQ中,即使將queue,exchange, message等都設定了持久化之後,還是不能保證100%保證資料不丟失了。為了實現訊息不丟失,我們需要從Consumer端和Productor端同時進行處理。本篇文章先介紹Consumer端,在AMPQ-0-9-1中有定義從消費者到RabbitMQ的訊息確認機制,通過此機制可以保證訊息能夠從RabbitMQ正確到達消費者端。本文介紹在RabbitMQ中如何實現消費者端的訊息確認機制,包括如下內容

  • 1 消費者的實現機制
  • 2 消費者端的程式碼實現
  • 3 使用wireshark對訊息確認的關鍵包進行轉包,並進行分析
  • 4 在使用訊息確認機制的注意點

消費者端投遞確認機制

在消費者端確認的方式

RabbitMQ中的兩種確認方式:

  • 1 自動確認方式:RabbitMQ成功將訊息發出(即將訊息成功寫入TCP Socket)中立即認為本次投遞已經被正確處理,不管消費者端是否成功處理本次投遞

    在自動確認模式下,訊息傳送後即被認為成功投遞,又稱為”fire-and-forget”
    優點:這種模式下吞吐量非常高。
    缺點:A. 有可能出現投遞丟失的情況,不同於手動確認模式,如果消費者的TCP連線或通道在訊息成功互動之前關閉,則此訊息會丟失 B. 消費者端過載的問題。在手動確認模式中,可以設定一次最多同時處理多少訊息,而自動模式不能設定此值。因此,消費者有可能因為訊息無法及時處理,堆積中記憶體中,記憶體耗盡而奔潰 C. 此種模式只推薦在消費者可以快速且穩定處理投遞的訊息的場景中使用

  • 2 手動處理方式:消費者收到訊息後,手動呼叫basic.ack/basic.nack/basic.reject後,RabbitMQ收到這些訊息後,才認為本次投遞成功

    手動訊息確認方法有:
    § basic.ack用於肯定確認
    § basic.nack用於否定確認(注意:這是AMQP 0-9-1的RabbitMQ擴充套件)
    § basic.reject用於否定確認,但與basic.nack相比有一個限制:一次只能拒絕單條訊息
    消費者端以上的3個方法都表示訊息已經被正確投遞,但是basic.ack表示訊息已經被正確處理,但是basic.nack,basic.reject表示沒有被正確處理,但是RabbitMQ中仍然需要刪除這條訊息。
    手動的確認模式的投遞效率略低於自動,但是可以彌補自動確認模式的不足。

批量手動投遞確認

訊息手動除了一次確認一條,也可以一次確認多條。為了減少網路流量,可以批量手動確認。在應答時,設定basic.nack的multiple 欄位為true,可以同時對delivery_tag和比delivery_tag值小的投遞訊息進行確認
例如,假設在通道上沒有確認訊息的delivery_tag是5,6,7和8,當basic.nack中delivery_tag被設定為8並且multiple 被設定為true時,方法執行成功後,從5到8的所有訊息將被確認。 如果multiple 設定為false,那麼交貨5,6和7仍然是未確認的。

投遞唯一碼: Delivery Tags

當消費者向RabbitMQ註冊後,RabbitMQ使用basic.deliver向消費者投遞訊息時,訊息體上會帶上delivery tag,這個值會唯一標識本次投遞,在同一通道上,此值是唯一的。delivery tag值有64位長度,值從1開始,每傳送一次訊息值遞增1,最大值為9223372036854775807。消費者端在應答訊息時,帶上此引數,告訴RabbitMQ某次投遞已經正確應答。

消費者端訊息投遞確認程式碼

工程:

工程:
程式碼路徑:

消費者程式碼:

程式碼:關鍵點
a. channel.basicConsume設定接收非自動確認
b. 在處理完訊息後,呼叫channel.basicAck進行手動訊息確認

// 預設消費者實現
Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
            throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [ConsumerConfirmRecv] Received '" + message + "'");
        // 訊息正向確認
        channel.basicAck(envelope.getDeliveryTag(),true);
         // 訊息否定確認: 如果設定multiple=false,requeue值啟作用,如果設定multiple=true,則requeue無論設定什麼值,後臺統一處理成true
        // channel.basicNack(envelope.getDeliveryTag(),false, false);
    }
};
// 接收訊息:設定非自動確認
channel.basicConsume(QUEUE_NAME, false, consumer);

消費者端抓包分析

正向確認-Basic.ack

和一般的訊息包不同點時,一般的消費者端的包分析見本文,只是多了Basic.ack包
這裡寫圖片描述
131-132幀 RabbitMQ向消費者推送訊息,消費者端進行否定確認
131幀 RabbitMQ向消費者推送訊息

Frame 131: 269 bytes on wire (2152 bits), 269 bytes captured (2152 bits) on interface 0
Ethernet II, Src: PcsCompu_48:72:ad (08:00:27:48:72:ad), Dst: Giga-Byt_bf:ae:ce (40:8d:5c:bf:ae:ce)
Internet Protocol Version 4, Src: 10.240.80.147, Dst: 10.240.80.99
Transmission Control Protocol, Src Port: 5672, Dst Port: 49877, Seq: 600, Ack: 642, Len: 215
# 返回Consume的呼叫結果
Advanced Message Queueing Protocol
    Type: Method (1)
    Channel: 1
    Length: 36
    Class: Basic (60)
    Method: Consume-Ok (21)
    Arguments
    # 此訊息的消費者的編號
        Consumer-Tag: amq.ctag-AhhpMhEMC5VuWcny42rObg
# 要投遞的訊息引數
Advanced Message Queueing Protocol
    Type: Method (1)
    Channel: 1
    Length: 87
    Class: Basic (60)
    Method: Deliver (60)
    Arguments
    # 要投遞的訊息者標誌
        Consumer-Tag: amq.ctag-AhhpMhEMC5VuWcny42rObg
    # 訊息的編號,從1開始
        Delivery-Tag: 1        
    # 是否是重新投遞的訊息,如果訊息是第一次投遞,則此值是false,如果此訊息是重新投遞的,則此值為true
        .... ...0 = Redelivered: False
    # 來源的交換機
        Exchange: consumerconfirm-exchange
    # 路由鍵
        Routing-Key: consumer-confirm
# 要投遞的訊息的頭
Advanced Message Queueing Protocol
    Type: Content header (2)
    Channel: 1
    Length: 27
    Class ID: Basic (60)
    Weight: 0
    Body size: 33
    Property flags: 0x9800
    # 訊息的屬性 
    Properties
        Content-Type: text/plain
        Delivery-Mode: 2
        Priority: 0
# 要投遞的訊息的內容
Advanced Message Queueing Protocol
    Type: Content body (3)
    Channel: 1
    Length: 33
    Payload: 436f6e73756d6572436f6e6669726d53656e642131353137...

132幀 消費者端對訊息的確認
這裡寫圖片描述

否定確認-Basic.Nack

和一般的訊息包不同點時,一般的消費者端的包分析見本文,只是多了Basic.Nack包
這裡寫圖片描述

275-276幀 RabbitMQ向消費者推送訊息,消費者端進行否定確認
275幀 RabbitMQ向消費者推送訊息,和上節內容相似,這裡略
276幀 消費者端進行否定確認,比basic.ack多了Requeue屬性
這裡寫圖片描述

其他消費者端的為了保證正確處理資料的機制

在消費者端為了正確處理資料,還可以設定Qos和投遞失敗

通道預取設定(Channel Prefetch Setting (QoS))

只能在訊息手動確認模式中啟作用。
為了避免消費者端一次同時處理過多的訊息,可以通過basic.qos設定最大的預取值。該值定義了通道上允許的最大未確認訊息,一旦未確認訊息的數量達到配置值,RabbitMQ將停止在通道上傳送更多訊息,直到至少有一個未被確認的訊息被確認。
備註:通道預取設定在basic.get (“pull API”)中是不啟作用,即使在訊息手動確認模式中

消費者確認模式,預取和吞吐量

在RabbitMQ中影響吞吐量最大的引數是:訊息確認模式和Qos預取值
自動訊息確認模式或設定Qos預取值為無限雖然可以最大的提高訊息的投遞速度,但是在消費者端未及時處理的訊息的數量也將增加,從而增加消費者RAM消耗,使用消費者端奔潰。所以以上兩種情況需要謹慎使用。

RabbitMQ官方推薦Qos預取值設定在 100到300範圍內的值通常提供最佳的吞吐量,並且不會有使消費者奔潰的問題

消費者失敗或失去連線時:自動重新排隊

在訊息手動確認模式中,如果發生以下情況投遞訊息所有的通道或連線被突然關閉(包括消費者端丟失TCP連線、消費者應用程式(程序)掛掉、通道級別的協議異常)任何已經投遞的訊息但是沒有被消費者端確認的訊息會自動重新排隊。
請注意,連線檢測不可用客戶端需要一段時間才會發現,所以會有一段時間內的所有訊息會重新投遞
因為訊息的可能重新投遞,所有必須保證消費者端的介面的冪等。

多次確認和對未知delivery_tag進行確認

如果消費者對同一delivery_tag進行多次確認,則丟擲通道異常PRECONDITION_FAILED。如果對
未知的delivery_tag進行確認,也會丟擲通道異常。

程式碼

--------------------- 本文來自 hry2015 的CSDN 部落格 ,全文地址請點選:https://blog.csdn.net/hry2015/article/details/79416540?utm_source=copy