1. 程式人生 > >分散式訊息系統:Kafka(五)偏移量

分散式訊息系統:Kafka(五)偏移量

5、偏移量提交

5.1 偏移量

(1)新舊版本偏移量的變化

  在Kafka0.9版本之前消費者儲存的偏移量是在zookeeper中/consumers/GROUP.ID/offsets/TOPIC.NAME/PARTITION.ID。新版消費者不在儲存偏移量到zookeeper中,而是儲存在Kafka的一個內部主題中“consumer_offsets”,該主題預設有50個分割槽,每個分割槽3個副本,分割槽數量有引數offset.topic.num.partition設定。通過消費者組ID的雜湊值和該引數取模的方式來確定某個消費者組已消費的偏移量儲存到consumer_offsets主題的哪個分割槽中。

(2)查詢偏移量

  Kafka消費者API提供兩種方法用來查詢偏移量。

  一個是committed(TopicPartition partition)方法,這個方法返回一個OffsetAndMetadata物件,通過這個物件可以獲取指定分割槽已提交的偏移量;
  另外一個方法position(TopicPartition partition)返回的是下一次拉取位置。

(3)重置消費偏移量
  Kafka消費者還提供了重置消費偏移量的方法,seek(TopicPartition partition, long offset),該方法用於指定消費起始位置,另外還有seekToBeginning()和seekToEnd(),從名字就能看出來是幹嘛的。

  偏移量提交有自動和手動,預設是自動(enable.auto.commit = true)。自動提交的話每隔多久自動提交一次呢?這個由消費者協調器引數auto.commit.interval.ms 毫秒執行一次提交。有些場景我們需要手動提交偏移量,尤其是在一個長事務中並且保證訊息不被重複消費以及訊息不丟失,比如生產者一個訂單提交訊息,消費者拿到後要扣減庫存,扣減成功後該訊息才能提交,所以在這種場景下需要手動提交,因為庫存扣減失敗這個訊息就不能消費,同時客戶這個訂單狀態也不能是成功。手動提交也有兩種一個是同步提交一個是非同步提交,其區別就是消費者執行緒是否阻塞。如果使用手動提交就要關閉自動提交,因為自動提交預設是開啟的。

5.2 偏移量提交

  消費者提交偏移量的主要是消費者往一個名為_consumer_offset的特殊主題傳送訊息,訊息中包含每個分割槽的偏移量。
  如果消費者一直執行,偏移量的提交併不會產生任何影響。但是如果有消費者發生崩潰,或者有新的消費者加入消費者群組的時候,會觸發 Kafka 的再均衡。這使得 Kafka 完成再均衡之後,每個消費者可能被會分到新分割槽中。為了能夠繼續之前的工作,消費者就需要讀取每一個分割槽的最後一次提交的偏移量,然後從偏移量指定的地方繼續處理。

5.2.1 提交偏移量小於客戶端處理的偏移量

這裡寫圖片描述
  如果提交的偏移量小於客戶端處理的最後一個訊息的偏移量,那麼處於兩個偏移量之間的訊息就會被重複處理。

5.2.2 提交偏移量大於客戶端處理的偏移量
這裡寫圖片描述
  如果提交的偏移量大於客戶端處理的最後一個訊息的偏移量,那麼處於兩個偏移量之間的訊息將會丟失。

5.2.3 自動提交

  當 enable.auto.commit 屬性被設為 true,那麼每過 5s,消費者會自動把從 poll()方法接收到的最大偏移量提交上去。這是因為提交時間間隔由 auto.commit.interval.ms 控制,預設值是 5s。與消費者裡的其他東西一樣,自動提交也是在輪詢裡進行的。消費者每次在進行輪詢時會檢查是否該提交偏移量了,如果是,那麼就會提交從上一次輪詢返回的偏移量。

  但是使用這種方式,容易出現提交的偏移量小於客戶端處理的最後一個訊息的偏移量這種情況的問題。假設我們仍然使用預設的 5s 提交時間間隔,在最近一次提交之後的 3s 發生了再均衡,再均衡之後,消費者從最後一次提交的偏移量位置開始讀取訊息。這個時候偏移量已經落後了 3s(因為沒有達到5s的時限,並沒有提交偏移量),所以在這 3s 的資料將會被重複處理。

  雖然可以通過修改提交時間間隔來更頻繁地提交偏移量,減小可能出現重複訊息的時間窗的時間跨度,不過這種情況是無法完全避免的。

  在使用自動提交時,每次呼叫輪詢方法都會把上一次呼叫返回的偏移量提交上去,它並不知道具體哪些訊息已經被處理了,所以在再次呼叫之前最好確保所有當前呼叫返回的訊息都已經處理完畢(在呼叫 close() 方法之前也會進行自動提交)。一般情況下不會有什麼問題,不過在處理異常或提前退出輪詢時要格外小心。

5.2.4 手動提交

  消費者 API 提供了另一種提交偏移量的方式,開發者可以在必要的時候提交當前偏移量,而不是基於時間間隔。

  這是我們需要把把 auto.commit.offset 設為 false,讓應用程式決定何時提交偏移量。

5.2.4.1同步提交

  使用 commitSync() 提交偏移量最簡單也最可靠。這個 API 會提交由 poll() 方法返回的最新偏移量,提交成功後馬上返回,如果提交失敗就丟擲異常。

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    {
        System.out.printf("topic = %s, partition = %s, offset =
          %d, customer = %s, country = %s\n",
             record.topic(), record.partition(),
                  record.offset(), record.key(), record.value()); 
        }
        try {
          consumer.commitSync(); 
        } catch (CommitFailedException e) {
            log.error("commit failed", e) 
        }
}

  commitSync() 將會提交由 poll() 返回的最新偏移量,所以在處理完所有記錄後要確保呼叫了 commitSync(),否則還是會有丟失訊息的風險。如果發生了再均衡,從最近一批訊息到發生再均衡之間的所有訊息都將被重複處理。

  同時在這個程式中,只要沒有發生不可恢復的錯誤,commitSync() 方法會一直嘗試直至提交成功。如果提交失敗,我們也只能把異常記錄到錯誤日誌裡。

5.2.4.2 非同步提交

  同步提交有一個不足之處,在 broker 對提交請求作出迴應之前,應用程式會一直阻塞,這樣會限制應用程式的吞吐量。我們可以通過降低提交頻率來提升吞吐量,但如果發生了再均衡,會增加重複訊息的數量。

  這個時候可以使用非同步提交 API。我們只管傳送提交請求,無需等待 broker 的響應。

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    {
        System.out.printf("topic = %s, partition = %s,
        offset = %d, customer = %s, country = %s\n",
        record.topic(), record.partition(), record.offset(),
        record.key(), record.value());
    }
    consumer.commitAsync(); 
}

  在成功提交或碰到無法恢復的錯誤之前,commitSync() 會一直重試,但是 commitAsync() 不會,這也是 commitAsync() 不好的一個地方。

  它之所以不進行重試,是因為在它收到伺服器響應的時候,可能有一個更大的偏移量已經提交成功。假設我們發出一個請求用於提交偏移量 2000,這個時候發生了短暫的通訊問題,伺服器收不到請求,自然也不會作出任何響應。與此同時,我們處理了另外一批訊息,併成功提交了偏移量 3000。如果 commitAsync() 重新嘗試提交偏移量 2000,它有可能在偏移量 3000 之後提交成功。這個時候如果發生再均衡,就會出現重複訊息。

  commitAsync() 也支援回撥,在 broker 作出響應時會執行回撥。回撥經常被用於記錄提交錯誤或生成度量指標。如果要用它來進行重試,則一定要注意提交的順序。

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("topic = %s, partition = %s,
        offset = %d, customer = %s, country = %s\n",
        record.topic(), record.partition(), record.offset(),
        record.key(), record.value());
    }
    consumer.commitAsync(new OffsetCommitCallback() {
        public void onComplete(Map<TopicPartition,
        OffsetAndMetadata> offsets, Exception e) {
            if (e != null)
                log.error("Commit failed for offsets {}", offsets, e);
        }
      }); 
}

5.2.4.3 同步和非同步混合提交

  一般情況下,針對偶爾出現的提交失敗,不進行重試不會有太大問題,因為如果提交失敗是因為臨時問題導致的,那麼後續的提交總會有成功的。

  但如果這是發生在關閉消費者或再均衡前的最後一次提交,就要確保能夠提交成功。因此在這種情況下,我們應該考慮使用混合提交的方法:

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.println("topic = %s, partition = %s, offset = %d,
            customer = %s, country = %s\n",
            record.topic(), record.partition(),
            record.offset(), record.key(), record.value());
        }
        consumer.commitAsync(); 
    }
} catch (Exception e) {
    log.error("Unexpected error", e);
} finally {
    try {
        consumer.commitSync(); 
    } finally {
        consumer.close();
    }
}
  1. 在程式正常執行過程中,我們使用 commitAsync 方法來進行提交,這樣的執行速度更快,而且就算當前提交失敗,下次提交成功也可以。
  2. 如果直接關閉消費者,就沒有所謂的“下一次提交”了,因為不會再呼叫poll()方法。使用 commitSync() 方法會一直重試,直到提交成功或發生無法恢復的錯誤。

5.2.4.4 提交特定的偏移量

  如果 poll() 方法返回一大批資料,為了避免因再均衡引起的重複處理整批訊息,想要在批次中間提交偏移量該怎麼辦?這種情況無法通過呼叫 commitSync() 或 commitAsync() 來實現,因為它們只會提交最後一個偏移量,而此時該批次裡的訊息還沒有處理完。

  這時候需要使用一下的兩個方法:

/**
 * Commit the specified offsets for the specified list of topics and partitions.
 */
@Override
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets)


/**
 * Commit the specified offsets for the specified list of topics and partitions to Kafka.
 */
@Override
public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)

  消費者 API 允許在呼叫 commitSync() 和 commitAsync() 方法時傳進去希望提交的分割槽和偏移量的 map。
  假設處理了半個批次的訊息,最後一個來自主題“customers”分割槽 3 的訊息的偏移量是 5000,你可以呼叫 commitSync() 方法來提交它。不過,因為消費者可能不只讀取一個分割槽,你需要跟蹤所有分割槽的偏移量,所以在這個層面上控制偏移量的提交會讓程式碼變複雜。

  程式碼如下:

private Map<TopicPartition, OffsetAndMetadata> currentOffsets =
    new HashMap<>(); 
int count = 0;

...

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    {
        System.out.printf("topic = %s, partition = %s, offset = %d,
        customer = %s, country = %s\n",
        record.topic(), record.partition(), record.offset(),
        record.key(), record.value()); 
        currentOffsets.put(new TopicPartition(record.topic(),
        record.partition()), new
        OffsetAndMetadata(record.offset()+1, "no metadata")); 
        if (count % 1000 == 0) 
            consumer.commitAsync(currentOffsets,null); 
        count++;
    }
}

這裡呼叫的是 commitAsync(),不過呼叫commitSync()也是完全可以的。在提交特定偏移量時,仍然要處理可能發生的錯誤。

5.3 監聽再均衡

  如果 Kafka 觸發了再均衡,我們需要在消費者失去對一個分割槽的所有權之前提交最後一個已處理記錄的偏移量。如果消費者準備了一個緩衝區用於處理偶發的事件,那麼在失去分割槽所有權之前,需要處理在緩衝區累積下來的記錄。可能還需要關閉檔案控制代碼、資料庫連線等。

  在為消費者分配新分割槽或移除舊分割槽時,可以通過消費者 API 執行一些應用程式程式碼,在呼叫 subscribe() 方法時傳進去一個 ConsumerRebalanceListener 例項就可以了。 ConsumerRebalanceListener 有兩個需要實現的方法。

  1. public void onPartitionsRevoked(Collection partitions) 方法會在再均衡開始之前和消費者停止讀取訊息之後被呼叫。如果在這裡提交偏移量,下一個接管分割槽的消費者就知道該從哪裡開始讀取了。
  2. public void onPartitionsAssigned(Collection partitions) 方法會在重新分配分割槽之後和消費者開始讀取訊息之前被呼叫。

下面的例子將演示如何在失去分割槽所有權之前通過 onPartitionsRevoked() 方法來提交偏移量。

private Map<TopicPartition, OffsetAndMetadata> currentOffsets=
  new HashMap<>();

private class HandleRebalance implements ConsumerRebalanceListener { 
    public void onPartitionsAssigned(Collection<TopicPartition>
      partitions) { 
    }

    public void onPartitionsRevoked(Collection<TopicPartition>
      partitions) {
        System.out.println("Lost partitions in rebalance.
          Committing current
        offsets:" + currentOffsets);
        consumer.commitSync(currentOffsets); 
    }
}

try {
    consumer.subscribe(topics, new HandleRebalance()); 

    while (true) {
        ConsumerRecords<String, String> records =
          consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
        {
            System.out.println("topic = %s, partition = %s, offset = %d,
             customer = %s, country = %s\n",
             record.topic(), record.partition(), record.offset(),
             record.key(), record.value());
             currentOffsets.put(new TopicPartition(record.topic(),
             record.partition()), new
             OffsetAndMetadata(record.offset()+1, "no metadata"));
        }
        consumer.commitAsync(currentOffsets, null);
    }
} catch (WakeupException e) {
    // 忽略異常,正在關閉消費者
} catch (Exception e) {
    log.error("Unexpected error", e);
} finally {
    try {
        consumer.commitSync(currentOffsets);
    } finally {
        consumer.close();
        System.out.println("Closed consumer and we are done");
    }
}

  如果發生再均衡,我們要在即將失去分割槽所有權時提交偏移量。要注意,提交的是最近處理過的偏移量,而不是批次中還在處理的最後一個偏移量。因為分割槽有可能在我們還在處理訊息的時候被撤回。我們要提交所有分割槽的偏移量,而不只是那些即將失去所有權的分割槽的偏移量——因為提交的偏移量是已經處理過的,所以不會有什麼問題。呼叫 commitSync() 方法,確保在再均衡發生之前提交偏移量。