1. 程式人生 > >Kafka(七)消費者偏移量

Kafka(七)消費者偏移量

sof () 取模 失敗 data 兩種方法 保存 庫存 num

在Kafka0.9版本之前消費者保存的偏移量是在zookeeper中/consumers/GROUP.ID/offsets/TOPIC.NAME/PARTITION.ID。新版消費者不在保存偏移量到zookeeper中,而是保存在Kafka的一個內部主題中“__consumer_offsets”,該主題默認有50個分區,每個分區3個副本,分區數量有參數offset.topic.num.partition設置。通過消費者組ID的哈希值和該參數取模的方式來確定某個消費者組已消費的偏移量保存到__consumer_offsets主題的哪個分區中。


Kafka消費者API提供兩種方法用來查詢偏移量。一個是committed(TopicPartition partition)方法,這個方法返回一個OffsetAndMetadata對象,通過這個對象可以獲取指定分區已提交的偏移量;另外一個方法position(TopicPartition partition)返回的是下一次拉取位置。

同時Kafka消費者還提供了重置消費偏移量的方法,seek(TopicPartition partition, long offset),該方法用於指定消費起始位置,另外還有seekToBeginning()和seekToEnd(),從名字就能看出來是幹嘛的。


偏移量提交有自動和手動,默認是自動(enable.auto.commit = true)。自動提交的話每隔多久自動提交一次呢?這個由消費者協調器參數auto.commit.interval.ms 毫秒執行一次提交。有些場景我們需要手動提交偏移量,尤其是在一個長事務中並且保證消息不被重復消費以及消息不丟失,比如生產者一個訂單提交消息,消費者拿到後要扣減庫存,扣減成功後該消息才能提交,所以在這種場景下需要手動提交,因為庫存扣減失敗這個消息就不能消費,同時客戶這個訂單狀態也不能是成功。手動提交也有兩種一個是同步提交一個是異步提交,其區別就是消費者線程是否阻塞。如果使用手動提交就要關閉自動提交,因為自動提交默認是開啟的。

Kafka(七)消費者偏移量