1. 程式人生 > >七、消費訊息與效能權衡(讀數筆記與個人實踐)

七、消費訊息與效能權衡(讀數筆記與個人實踐)

摘要

主要介紹消費訊息時的幾種方式:

  • 平衡訊息消費的可靠性與效能;
  • 死信交換器;
  • 設定自動刪除佇列、持久化佇列、TTL等;

消費效能

新增一個圖

使用no-ack模式 

在消費訊息時,負責消費的應用程式會發送一個Basic.Consumer請求,與該請求一起傳送的還有一個no-ack標誌。當這個標誌啟用時,它會告訴RabbitMQ消費者在接收到訊息時不會進行確認,RabbitMQ只管儘快的傳送訊息。

使用no-ack標誌消費訊息是讓RabbitMQ將消費投遞給消費者的最快方式,但這也是最不可靠的方式。

如果使用no-ack,那麼當有新的可用訊息時,RabbitMQ將會發送該訊息給消費者,而不用等待。實際上,如果有可用訊息,RabbitMQ會持續向消費者傳送它們,直到套接字緩衝區被填滿為止。

目前沒有找到RabbitTemplate如何開啟no-ack的方法,如果有用過的朋友,請留言告訴我,謝謝。

訊息確認模式

訊息確認模式要求每次消費訊息時,向RabbitMQ返回一個Basic.Ack,告知RabbitMQ訊息已經成功消費,可以在伺服器中刪除該訊息。

訊息確認有三種確認方式:

  • Ack;
  • Reject;
  • Nack;

基於RabbitTemplate,下面這段程式碼,有對這幾種確認方式的實現,在配置檔案中開啟手動確認模式,acknowledge-mode屬性為manual(預設為自動確認):

spring:
  #訊息佇列配置
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 5000ms
    listener:
      simple:
        acknowledge-mode: manual
/**
 * 消費者監聽訊息佇列
 */
@Component
@Slf4j
@RabbitListener(queues = "DIRECT_QUEUE")
public class DirectQueueListener {

    @RabbitHandler
    public void process(String message,
                        Channel channel,
                        @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException, InterruptedException {
        log.info("消費訊息成功: {}", message);
        Thread.sleep(1000);
        switch (message) {
            case "nack":
                channel.basicNack(tag, true, false); // 第二個引數控制是否開啟批量拒絕,第三個引數表示是否requeue
                break;
            case "nack-requeue":
                channel.basicNack(tag, true, true);
                break;
            case "reject":
                channel.basicReject(tag, false);
                break;
            case "reject-requeue": // 啟用了requeue,如果只有一個消費者,容易造成死迴圈
                channel.basicReject(tag, true);
                break;
            default:
                channel.basicAck(tag, true);
                break;
        }
    }

}

channel.basicAck:當正常消費訊息時,呼叫該方法。

我們看到除了basicAck,還有basicReject和basicNack。這兩種,顧名思義,是用來拒絕消費的。

channel.basicReject:從協議層面上,reject是傳送一個Basic.Reject響應,告知RabbitMQ無法對這條訊息進行處理,當拒絕時,可以指定是否丟棄訊息或使用requeue標誌重新發送訊息。當啟用requeue時,RabbitMQ將會把這條訊息重新放回到佇列中。

不能使用basicReject一次拒絕多個訊息。

channel.basicNack:Basic.Nack實現與Basic.Reject相同的行為,但添加了批量拒絕的功能。

設定multiple或requeue如圖所示:

服務質量確認模式

AMQP規範要求通道要有服務質量設定,即在確認訊息接收之前可以預先接收一定數量的訊息。可以設定一個預取數量來實現高效的傳送訊息。

如果消費者應用程式在確認訊息之前崩潰,在套接字關閉時,所有預取的訊息將返回到佇列。

如果設定了no-ack,那麼預取大小將被忽略。

使用RabbitTemplate時,可以在消費者應用程式的配置檔案中配置預取大小:

spring:
  #訊息佇列配置
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 5000ms
    listener:
      simple:
        acknowledge-mode: manual
        prefetch: 1000

其中prefetch就是預取大小,消費者應用程式執行後,可以在RabbitMQ的控制檯看到這個設定:

如果熟悉抓包軟體的朋友,可以試著抓包看看:

我預先發送了2條訊息到RabbitMQ,可以看到上圖中最後兩行是兩個Ack。

有一種方式可以一次確認多個訊息,Basic.Ack響應具有一個multiple屬性,當把它設定為true時就能確認以前未確認的訊息。

如果使用multiple,當成功的接收了一些訊息,並且應用程式在回覆Ack之前就發生了異常,則所有為確認的訊息將返回佇列。

死信交換器

RabbitMQ的死信交換器是一種可以拒絕已投遞訊息的可選行為,一般有三種情況的訊息會進入死信佇列:

  • 當拒絕了一個不重新發送的訊息時,會進入死信;
  • 當訊息的TTL到期時,會進入死信;
  • 當佇列已滿時,會進入死信;
死信與備用交換器不同,過期或被拒絕的訊息通過死信交換器進行投遞,而備用交換器則路由那些無法由RabbitMQ路由的訊息。

在RabbitMQ中,在宣告佇列時,指定死信交換器:

    /**
     * 宣告佇列。
     * 同時指定死信佇列。
     *
     * @return Queue物件。
     */
    @Bean("directQueue")
    public Queue directQueue() {
        return QueueBuilder.durable("DIRECT_QUEUE")
                .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME)
                .withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_NAME)
                .build();
    }

死信交換器還允許使用預先指定的值覆蓋路由鍵,這樣可以允許同一個交換器同時處理死信和非死信訊息,但需要確保死信訊息不被投遞到相同的佇列。設定預定義路由鍵的關鍵字是:x-dead-letter-routing-key。

測試死信佇列,當消費者拒絕時,檢視訊息是否會進入死信佇列:

圖。

控制佇列

定義佇列時,有多個設定可以確定佇列的行為:

  • 自動刪除自己;
  • 只允許一個消費者進行消費;
  • 自動過期訊息;
  • 保持有限數量的訊息;
  • 將舊訊息推出堆疊;

更改佇列的設定,必須刪除佇列並重新建立它。

臨時佇列

也可以叫做自動刪除的佇列。

一旦消費者完成連線和檢索訊息,在斷開連線時佇列將被刪除。

    @Bean("directQueue")
    public Queue directQueue() {
        return QueueBuilder.durable("DIRECT_QUEUE").autoDelete()
                .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME)
                .withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_NAME)
                .build();
    }

佇列只會在沒有消費者監聽的時候自行刪除。

只允許單個消費者

在需要確保只有單個消費者能夠消費佇列中的訊息時,在建立佇列時設定exclusive屬性,啟用後在消費者斷開連線後,佇列也會自動刪除。

    @Bean("directQueue")
    public Queue directQueue() {
        return QueueBuilder.durable("DIRECT_QUEUE").exclusive()
                .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME)
                .withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_NAME)
                .build();
    }

宣告exclusive的佇列,只能被宣告時所指定的同一個連線和通道所消費,當建立佇列的通道關閉時,獨佔佇列也將會刪除。在通道關閉之間,可以多次宣告和取消exclusive佇列的消費者。

自動過期佇列

如果一段時間沒有使用該佇列,就刪除它。

建立一個自動過期的佇列非常簡單,要做的事情就是使用x-expires引數宣告一個佇列。該引數以毫秒為單位設定佇列的生存時間(Time To Live,TTL)。

自動過期佇列有一些嚴格的約定:

  • 佇列只有在沒有消費者的情況下才會過期。如果有連線消費者,則只有發出Basic.Cancel或斷開連線之後才自動刪除;
  • 佇列只有在TTL週期之內沒有收到Basic.Get請求時才會到期。一旦一個Basic.Get請求中已經包含了一個具有過期值的佇列,那麼過期設定無效,佇列不會被自動刪除(不要使用Get);
  • 不能重新宣告或更改x-expires屬性;
  • RabbitMQ不保證過期刪除佇列這一過程的時效性;

永久佇列

使用durable標誌告訴RabbitMQ希望佇列配置被儲存在伺服器:

    @Bean("directQueue")
    public Queue directQueue() {
        return QueueBuilder.durable("DIRECT_QUEUE")
                .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME)
                .withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_NAME)
                .build();
    }

訊息級別的TTL

訊息級別的TTL設定允許伺服器對訊息的最大生存時間進行限制。宣告佇列的同時指定死信交換器和TTL值將導致該佇列中已到期的訊息成為死信訊息。

可以使用x-message-ttl設定佇列的訊息TTL時間。

最大長度佇列

從RabbitMQ3.1.0開始,可以在宣告佇列時指定最大長度。如果在佇列上設定列x-max-length引數,一旦訊息到達最大值,RabbitMQ會在新增新訊息時刪除位於佇列前端的訊息,如果宣告佇列時候,指定列死信交換器,則從佇列前端刪除的訊息會