1. 程式人生 > >高可用服務 AHAS 在消息隊列 MQ 削峰填谷場景下的應用

高可用服務 AHAS 在消息隊列 MQ 削峰填谷場景下的應用

ktr current record 線程池 blog ignore messages pic amp

在消息隊列中,當消費者去消費消息的時候,無論是通過 pull 的方式還是 push 的方式,都可能會出現大批量的消息突刺。如果此時要處理所有消息,很可能會導致系統負載過高,影響穩定性。但其實可能後面幾秒之內都沒有消息投遞,若直接把多余的消息丟掉則沒有充分利用系統處理消息的能力。我們希望可以把消息突刺均攤到一段時間內,讓系統負載保持在消息處理水位之下的同時盡可能地處理更多消息,從而起到“削峰填谷”的效果:

?技術分享圖片

上圖中紅色的部分代表超出消息處理能力的部分。

我們可以看到消息突刺往往都是瞬時的、不規律的,其後一段時間系統往往都會有空閑資源。我們希望把紅色的那部分消息平攤到後面空閑時去處理,這樣既可以保證系統負載處在一個穩定的水位,又可以盡可能地處理更多消息,這時候我們就需要一個能夠控制消費端消息勻速處理的利器 —AHAS 流控降級,來為消息隊列削峰填谷,保駕護航。

AHAS 是如何削峰填谷的

AHAS 的流控降級是面向分布式服務架構的專業流量控制組件,主要以流量為切入點,從流量控制、熔斷降級、系統保護等多個維度來幫助您保障服務的穩定性,同時提供強大的聚合監控和歷史監控查詢功能。

AHAS 專門為這種場景提供了勻速排隊的控制特性,可以把突然到來的大量請求以勻速的形式均攤,以固定的間隔時間讓請求通過,以穩定的速度逐步處理這些請求,起到“削峰填谷”的效果,從而避免流量突刺造成系統負載過高。同時堆積的請求將會排隊,逐步進行處理;當請求排隊預計超過最大超時時長的時候則直接拒絕,而不是拒絕全部請求。

比如在 RocketMQ 的場景下配置了勻速模式下請求 QPS 為 5,則會每 200 ms 處理一條消息,多余的處理任務將排隊;同時設置了超時時間,預計排隊時長超過超時時間的處理任務將會直接被拒絕。示意圖如下圖所示:

技術分享圖片

RocketMQ Consumer 接入示例

本部分將引導您快速在 RocketMQ 消費端接入 AHAS 流控降級 Sentinel。

1. 開通 AHAS

首先您需要到AHAS 控制臺開通 AHAS 功能(免費)。可以根據 開通 AHAS 文檔 裏面的指引進行開通。

2. 代碼改造

在結合阿裏雲 RocketMQ Client 使用 Sentinel 時,用戶需要引入 AHAS Sentinel 的依賴 ahas-sentinel-client (以 Maven 為例):

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>ahas-sentinel-client</artifactId>
    <version>1.1.0</version>
</dependency>

由於 RocketMQ Client 未提供相應攔截機制,而且每次收到都可能是批量的消息,因此用戶在處理消息時需要手動進行資源定義(埋點)。我們可以在處理消息的邏輯處手動進行埋點,資源名可以根據需要來確定(如 groupId + topic 的組合):

    private static Action handleMessage(Message message, String groupId, String topic) {
        Entry entry = null;
        try {
            // 資源名稱為 groupId 和 topic 的組合,便於標識,同時可以針對不同的 groupId 和 topic 配置不同的規則
            entry = SphU.entry("handleMqMessage:" + groupId + ":" + topic);
          
            // 在此處編寫真實的處理邏輯
            System.out.println(System.currentTimeMillis() + " | handling message: " + message);
            return Action.CommitMessage;
        } catch (BlockException ex) {
            // 在編寫處理被流控的邏輯
            // 示例:可以在此處記錄錯誤或進行重試
            System.err.println("Blocked, will retry later: " + message);
            return Action.ReconsumeLater; // 會觸發消息重新投遞
        } finally {
            if (entry != null) {
                entry.exit();
            }
        }
    }

消費者訂閱消息的邏輯示例:

Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe(topic, "*", (message, context) -> {
    return handleMessage(message);
});
consumer.start();

更多關於 RocketMQ SDK 的信息可以參考 消息隊列 RocketMQ 入門文檔。

3. 獲取 AHAS 啟動參數

註意:若在本地運行接入 AHAS Sentinel 控制臺需要在頁面左上角選擇 公網 環境,若在阿裏雲 ECS 環境則在頁面左上角選擇對應的 Region 環境。

我們可以進入 AHAS 控制臺,點擊左側側邊欄的 流控降級,進入 AHAS 流控降級控制臺應用總覽頁面。在頁面右上角,單擊添加應用,選擇 SDK 接入頁簽,到 配置啟動參數 頁簽拿到需要的啟動參數(詳情請參考 SDK 接入文檔),類似於:

-Dproject.name=AppName -Dahas.license=<License>

其中 project.name 配置項代表應用名(會顯示在控制臺,比如 MqConsumerDemo),ahas.license 配置項代表自己的授權 license(ECS 環境不需要此項)。

4. 啟動 Consumer,配置規則

接下來我們添加獲取到的啟動參數,啟動修改好的 Consumer 應用。由於 AHAS 流控降級需要進行資源調用才能觸發初始化,因此首先需要向對應 group/topic 發送一條消息觸發初始化。消費端接收到消息後,我們就可以在 AHAS Sentinel 控制臺上看到我們的應用了。點擊應用卡片,進入詳情頁面後點擊左側側邊欄的“機器列表”。我們可以在機器列表頁面看到剛剛接入的機器,代表接入成功:

技術分享圖片

點擊“請求鏈路”頁面,我們可以看到之前定義的資源。點擊右邊的“流控”按鈕添加新的流控規則:

技術分享圖片

我們在“流控方式”中選擇“排隊等待”,設置 QPS 為 10,代表每 100ms 勻速通過一個請求;並且設置最大超時時長為 2000ms,超出此超時時間的請求將不會排隊,立即拒絕。配置完成後點擊新建按鈕。

5. 發送消息,查看效果

下面我們可以在 Producer 端批量發送消息,然後在 Consumer 端的控制臺輸出處觀察效果。可以看到消息消費的速率是勻速的,大約每 100 ms 消費一條消息:

1550732955137 | handling message: Hello MQ 2453
1550732955236 | handling message: Hello MQ 9162
1550732955338 | handling message: Hello MQ 4944
1550732955438 | handling message: Hello MQ 5582
1550732955538 | handling message: Hello MQ 4493
1550732955637 | handling message: Hello MQ 3036
1550732955738 | handling message: Hello MQ 1381
1550732955834 | handling message: Hello MQ 1450
1550732955937 | handling message: Hello MQ 5871

同時不斷有排隊的處理任務完成,超出等待時長的處理請求直接被拒絕。註意在處理請求被拒絕的時候,需要根據需求決定是否需要重新消費消息。

我們也可以點擊左側側邊欄的“監控詳情”進入監控詳情頁面,查看處理消息的監控曲線:

技術分享圖片

對比普通限流模式的監控曲線(最右面的部分):

技術分享圖片

如果不開啟勻速模式,只是普通的限流模式,則只會同時處理 10 條消息,其余的全部被拒絕,即使後面的時間系統資源充足多余的請求也無法被處理,因而浪費了許多空閑資源。兩種模式對比說明勻速模式下消息處理能力得到了更好的利用。

Kafka 接入代碼示例

Kafka 消費端接入 AHAS 流控降級的思路與上面的 RocketMQ 類似,這裏給出一個簡單的代碼示例:

private static void handleMessage(ConsumerRecord<String, String> record, String groupId, String topic) {
    pool.submit(() -> {
        Entry entry = null;
        try {
            // 資源名稱為 groupId 和 topic 的組合,便於標識,同時可以針對不同的 groupId 和 topic 配置不同的規則
            entry = SphU.entry("handleKafkaMessage:" + groupId + ":" + topic);

            // 在此處理消息.
            System.out.printf("[%d] Receive new messages: %s%n", System.currentTimeMillis(), record.toString());
        } catch (BlockException ex) {
            // Blocked.
            // NOTE: 在處理請求被拒絕的時候,需要根據需求決定是否需要重新消費消息
            System.err.println("Blocked: " + record.toString());
        } finally {
            if (entry != null) {
                entry.exit();
            }
        }
    });
}

消費消息的邏輯:

while (true) {
    try {
        ConsumerRecords<String, String> records = consumer.poll(1000);
        // 必須在下次 poll 之前消費完這些數據, 且總耗時不得超過 SESSION_TIMEOUT_MS_CONFIG
        // 建議開一個單獨的線程池來消費消息,然後異步返回結果
        for (ConsumerRecord<String, String> record : records) {
            handleMessage(record, groupId, topic);
        }
    } catch (Exception e) {
        try {
            Thread.sleep(1000);
        } catch (Throwable ignore) {
        }
        e.printStackTrace();
    }
}

其它

以上介紹的只是 AHAS 流控降級的其中一個場景 —— 請求勻速,它還可以處理更復雜的各種情況,比如:

  • 流量控制:可以針對不同的調用關系,以不同的運行指標(如 QPS、線程數、系統負載等)為基準,對資源調用進行流量控制,將隨機的請求調整成合適的形狀(請求勻速、Warm Up 等)。
  • 熔斷降級:當調用鏈路中某個資源出現不穩定的情況,如平均 RT 增高、異常比例升高的時候,會使對此資源的調用請求快速失敗,避免影響其它的資源導致級聯失敗。
  • 系統負載保護:對系統的維度提供保護。當系統負載較高的時候,提供了對應的保護機制,讓系統的入口流量和系統的負載達到一個平衡,保證系統在能力範圍之內處理最多的請求。

您可以參考 AHAS 流控降級文檔 來挖掘更多的場景。

原文鏈接
更多技術幹貨 請關註阿裏雲雲棲社區微信號 :yunqiinsight

高可用服務 AHAS 在消息隊列 MQ 削峰填谷場景下的應用