1. 程式人生 > >kafka同步非同步消費和訊息的偏移量(四)

kafka同步非同步消費和訊息的偏移量(四)

1. 消費者位置(consumer position)

因為kafka服務端不儲存訊息的狀態,所以消費端需要自己去做很多事情。我們每次呼叫poll()方法他總是返回已經儲存在生產者佇列中還未被消費者消費的訊息。訊息在每一個分割槽中都是順序的,那麼必然可以通過一個偏移量去確定每一條訊息的位置。

偏移量在消費訊息的過程中處於重要的作用。如果是自動提交訊息,那麼poll()方法會去在每次獲取訊息的時候自動提交獲取最後一條訊息的偏移量,告訴伺服器我們已經消費到這個位置,下次從下一個位置開始消費。

我們把更新分割槽當前位置的操作叫做提交。消費者是如何提交偏移量的呢?kafka最新的api是這樣做的:建立一個叫做_consumer_offset的特殊主題用來儲存訊息的偏移量。消費者每次消費都會往這個主題傳送訊息,訊息包含每個分割槽的偏移量。

如果消費者一直處於執行的狀態那麼這個偏移量沒有什麼用。不過如果這個消費者崩潰或者有新的消費者加入群組觸發再均衡策略,那麼再均衡之後該分割槽的消費者如果不是之前的那一位,那麼新的小夥伴怎麼知道之前的夥計消費到哪裡呢。所以提交他自己的offset就發揮作用了。

Consumer讀取partition中的資料是通過呼叫發起一個fetch請求來執行的。而從KafkaConsumer來看,它有一個poll方法。但是這個poll方法只是可能會發起fetch請求。原因是:Consumer每次發起fetch請求時,讀取到的資料是有限制的,通過配置項max.partition.fetch.bytes來限制的。而在執行poll方法時,會根據配置項個max.poll.records來限制一次最多pool多少個record。

那麼就可能出現這樣的情況: 在滿足max.partition.fetch.bytes限制的情況下,假如fetch到了100個record,放到本地快取後,由於max.poll.records限制每次只能poll出15個record。那麼KafkaConsumer就需要執行7次才能將這一次通過網路發起的fetch請求所fetch到的這100個record消費完畢。其中前6次是每次pool中15個record,最後一次是poll出10個record。

在consumer中,還有另外一個配置項:max.poll.interval.ms ,它表示最大的poll資料間隔,如果超過這個間隔沒有發起pool請求,但heartbeat仍舊在發,就認為該consumer處於 livelock狀態。就會將該consumer退出consumer group。所以為了不使Consumer 自己被退出,Consumer 應該不停的發起poll(timeout)操作。而這個動作 KafkaConsumer Client是不會幫我們做的,這就需要自己在程式中不停的呼叫poll方法了。

當一個consumer因某種原因退出Group時,進行重新分配partition後,同一group中的另一個consumer在讀取該partition時,怎麼能夠知道上一個consumer該從哪個offset的message讀取呢?也是是如何保證同一個group內的consumer不重複消費訊息呢?上面說了一次走網路的fetch請求會拉取到一定量的資料,但是這些資料還沒有被訊息完畢,Consumer就掛掉了,下一次進行資料fetch時,是否會從上次讀到的資料開始讀取,而導致Consumer消費的資料丟失嗎?

為了做到這一點,當使用完poll從本地快取拉取到資料之後,需要client呼叫commitSync方法(或者commitAsync方法)去commit 下一次該去讀取 哪一個offset的message。

而這個commit方法會通過走網路的commit請求將offset在coordinator中保留,這樣就能夠保證下一次讀取(不論是進行rebalance)時,既不會重複消費訊息,也不會遺漏訊息。

對於offset的commit,Kafka Consumer Java Client支援兩種模式:由KafkaConsumer自動提交,或者是使用者通過呼叫commitSync、commitAsync方法的方式完成offset的提交。

2. 位移管理(offset management)

2.1 自動提交

Kafka預設是定期幫你自動提交位移的(enable.auto.commit = true),使用這種簡單的方式之前你需要知道可能會帶來什麼後果。

假設我們仍然使用預設的5s提交時間間隔,在最近一次提交之後的3s發生了再均衡,再均衡之後,消費者從最後一次提交的偏移量位置開始讀取訊息。這個時候偏移量已經落後
了3s,所以在這3s內到達的訊息會被重複處理。可以通過修改提交時間間隔來更頻繁地提交偏移量,減小可能出現重複悄息的時間窗,不過這種情況是無也完全避免的。

在使用自動提交時,每次呼叫輪詢方告都會把上一次呼叫返回的偏移量提交上去,它並不知道具體哪些訊息已經被處理了,所以在再次呼叫之前最好確保所有當前呼叫返回的訊息都已經處理完畢(在呼叫close()方法之前也會進行自動提交)。一般情況下不會有什麼問題,不過在處理異常或提前退出輪詢時要格外小心。

2.2 手動提交

在多partition多consumer的場景下自動提交總會發生一些不可控的情況。所以消費者API也為我們提供了另外一種提交偏移量的方式。開發者可以在程式中自己決定何時提交,而不是基於時間間隔。

在使用手動提交之前我們需要先將:

properties.put("enable.auto.commit", "false");

然後使用:

consumer.commitSync();

來提交。

commitSync()方法會提交由poll()方法返回的最新偏移量,提交成功後馬上返回,否則跑出異常。

我們處理訊息的邏輯可以變成這樣:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        try {
           consumer.commitSync();
        } catch (Exception e) {
            System.out.println("commit failed");
        }

    }
}

每處理一次訊息我們提交一次offset。

非同步手動提交

上面我們使用commitSync()的方式提交資料,每次提交都需要等待broker返回確認結果。這樣沒提交一次等待一次會限制我們的吞吐量。
如果採用降低提交頻率來保證吞吐量,那麼則有增加訊息重複消費的風險。所以kafka消費者提供了非同步提交的API。我們只管傳送提交請求無需等待broker返回。

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
    consumer.commitAsync();
}

commitAsync()方法提交最後一個偏移量。在成功提交或碰到無怯恢復的錯誤之前,commitAsync()會一直重試,但是commitAsync()不會,這也是commitAsync()不好的一個地方。它之所以不進行重試,是因為在它收到伺服器響應的時候, 可能有一個更大的偏移量已經提交成功。假設我們發出一個請求用於提交偏移量2000,這個時候發生了短暫的通訊問題,伺服器收不到請求,自然也不會作出任何響應。與此同時,我們處理了另外一批訊息,併成功提交了偏移量3000。如果commitAsync()重新嘗試提交偏移量2000 ,它有可能在偏移量3000之後提交成功。這個時候如果發生再均衡,就會出現重複訊息。

當然使用手動提交最大的好處就是如果發生了錯誤我們可以記錄下來。commitAsync()也支援回撥方法,提交offset發生錯誤我們可以記下當前的偏移量。

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
    consumer.commitAsync(new OffsetCommitCallback() {
        @Override
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
            if(e != null){
                System.out.println("commit failed"+map);
            }
        }
    });
}

同步和非同步組合提交

一般情況下,針對偶爾出現的提交失敗,不進行重試不會有太大問題,因為如果提交失敗是因為臨時問題導致的,那麼後續的提交總會有成功的。但如果這是發生在關閉消費者或再均衡前的最後一次提交,就要確保能夠提交成功。因此,在消費者關閉前一般會組合使用commitAsync()和commitSync()。

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
        consumer.commitAsync();
    }
} catch (Exception e) {
    System.out.println("commit failed");
} finally {
    try {
        consumer.commitSync();
    } finally {
        consumer.close();
    }
}

如果一切正常我們使用commitAsync()來提交。如果直接關閉消費者,就沒有所謂的下一次提交了。使用commitSync()會一直重試,直到提交成功。

2.3 提交特定偏移量

上面我們手動提交使用的commitAsync()和commitSync()都是提交每一次消費最後一條訊息的偏移量,那麼如果我們一次拉取了很多訊息但是沒有消費完,想提交我們消費完成的位置該怎麼處理呢?kafka也有相應的對策。

Map<TopicPartition,OffsetAndMetadata> currentOffset = new HashMap<>();
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        currentOffset.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset(),"metadata"));
        try {
            System.out.println("模擬訊息處理失敗的情況");
        } catch (Exception e) {
            consumer.commitAsync(currentOffset,null);
        }
    }
}

這裡呼叫的是commitAsync(),呼叫commitSync()也是可以的。程式碼中模擬我們在處理訊息的過程中可能會出錯的情況,每次讀訊息都把當前的offset存入map中,如果出錯就提交當前已經消費到的偏移量。

2.4 再均衡監聽器

前面我們說過當發生consumer退出或者新增,partition新增的時候會觸發再均衡。那麼發生再均衡的時候如果某個consumer正在消費的任務沒有消費完該如何提交當前消費到的offset呢?kafka提供了再均衡監聽器,在發生再均衡之前監聽到,當前consumer可以在失去分割槽所有權之前處理offset關閉控制代碼等。

消費者API中有一個()方法:

subscribe(Collection<TopicPartition> var1, ConsumerRebalanceListener var2);

ConsumerRebalanceListener物件就是監聽器的介面物件,我們需要實現自己的監聽器繼承該介面。接口裡面有兩個方法需要實現:

void onPartitionsRevoked(Collection<TopicPartition> var1);

void onPartitionsAssigned(Collection<TopicPartition> var1);

第一個方法會在再均衡開始之前和消費者停止讀取訊息之後被呼叫。如果在這裡提交偏移量,下一個接管分割槽的消費者就知道該從哪裡開始讀取了。

第二個會在重新分配分割槽之後和消費者開始讀取訊息之前被呼叫。、

我們來模擬一下再均衡的場景:

final Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("page_visits"));


final Map<TopicPartition,OffsetAndMetadata> currentOffset = new HashMap<>();
class HandleRebance implements ConsumerRebalanceListener{

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        System.out.println("partition is rebanlance");
        consumer.commitAsync(currentOffset,null);
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> collection) {

    }
}

consumer.subscribe(topic,new HandleRebance());
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        currentOffset.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset(),"metadata"));
        try {
            System.out.println("模擬訊息處理失敗的情況");
        } catch (Exception e) {
            consumer.commitAsync(currentOffset,null);
        }
    }
}

首先實現了ConsumerRebalanceListener介面,實現方法裡面如果監聽到發生再均衡我們提交當前處理過的偏移量。

2.5 從特定偏移量處開始處理

前面都是consumer.poll()之後讀取該批次的訊息,kafka還提供了從分割槽的開始或者末尾讀訊息的功能:

seekToEnd(Collection<TopicPartition> partitions)
seekToBeginning(Collection<TopicPartition> partitions)

另外kafka還提供了從指定偏移量處讀取訊息,可以通過seek()方法來處理:

seek(TopicPartition partition, long offset)

提交當前分割槽和當前消費位置資訊。

2.6 獨立消費者–不屬於群組的消費者

到目前為止我們討論的都是消費者群組,分割槽被自動分配給群組的消費者,群組的消費者有變動會觸發再均衡。那麼是不是可以迴歸到別的訊息佇列的方式:不需要群組消費者也可以自己訂閱主題?

kafka也提供了這樣的案例,因為kafka的主題有分割槽的概念,那麼如果沒有群組就意味著你的自己訂閱到特定的一個分割槽才能消費內容。如果是這樣的話,就不需要訂閱主題,而是為自己分配分割槽。一個消費者可以訂閱主題(井加入消費者群組),或者為自己分配分割槽,但不能同時做這兩件事情。

下面的例子演示如何為自己分配分割槽並讀取訊息的:

final Consumer<String, String> consumer = new KafkaConsumer<>(props);
List<PartitionInfo> partitionInfoList = consumer.partitionsFor("page_visits");
List<TopicPartition> topicPartitionList = new ArrayList<>();
if(partitionInfoList != null){
    for(PartitionInfo partitionInfo : partitionInfoList){
        topicPartitionList.add(new TopicPartition(partitionInfo.topic(),partitionInfo.partition()));
        consumer.assign(topicPartitionList);
    }
}

final Map<TopicPartition,OffsetAndMetadata> currentOffset = new HashMap<>();
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        currentOffset.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset(),"metadata"));
        try {
            System.out.println("模擬訊息處理失敗的情況");
        } catch (Exception e) {
            consumer.commitAsync(currentOffset,null);
        }
    }
}
  1. consumer.partitionsFor(“主題”)方法允許我們獲取某個主題的分割槽資訊。
  2. 知道想消費的分割槽後使用assign()手動為該消費者分配分割槽。

除了不會發生再均衡,也不需要手動查詢分割槽,其他的看起來一切正常。不過要記住,如果主題增加了新的分割槽,消費者並不會收到通知。所以,要麼週期性地呼叫consumer.partitionsFor()方法來檢查是否有新分割槽加入,要麼在新增新分割槽後重啟應用程式。

相關推薦

kafka同步非同步消費訊息偏移

1. 消費者位置(consumer position) 因為kafka服務端不儲存訊息的狀態,所以消費端需要自己去做很多事情。我們每次呼叫poll()方法他總是返回已經儲存在生產者佇列中還未被消費者消費的訊息。訊息在每一個分割槽中都是順序的,那麼必然可以通過一

如何管理Spark Streaming消費Kafka偏移

上篇文章,討論了在spark streaming中管理消費kafka的偏移量的方式,本篇就接著聊聊上次說升級失敗的案例。 事情發生一個月前,由於當時我們想提高spark streaming程式的並行處理效能,於是需要增加kafka分割槽個數,,這裡需要說下,在新版本sp

如何管理Spark Streaming消費Kafka偏移

前面的文章已經介紹了在spark streaming整合kafka時,如何處理其偏移量的問題,由於spark streaming自帶的checkpoint弊端非常明顯,所以一些對資料一致性要求比較高的專案裡面,不建議採用其自帶的checkpoint來做故障恢復。 在sp

** 無窮InF非數值NaN**

Matlab中: InF 代表正無窮量; -InF 代表負無窮量; NaN代表非數值量;(Not-a-Number.) 正負無窮量一般由運算溢位產生,產生了超出雙精度浮點數數值範圍的結果; 非數值量由於0/0或InF/InF型別非正常運算產生,這兩個NaN不相等。 除異常運算外,

Apache Kafka入門教程輕鬆學-第Kafka核心元件流程-設計-原理副本管理器

本入門教程,涵蓋Kafka核心內容,通過例項和大量圖表,幫助學習者理解,任何問題歡迎留言。 目錄: 本章簡單介紹了副本管理器,副本管理器負責分割槽及其副本的管理。副本管理器具體的工作流程可以參考牟大恩所著的《Kafka入門與實踐》。 副本管理器 副本機制使得kafka

分散式訊息中介軟體——Flume+Kafka+Storm+Redis生態架構實戰

一、Kafka專案應用架構分析 1、Kafka生態架構        資料收集的速度,跟處理的速度不一定一致,故使用Kafka中介軟體作為資料收集和資料處理的一個Access入口,接收flume收集的資料,並通過kafkaSpout提交給Storm進行處理。 2、kafka

如何匯出zk中的偏移offsets

我們在使用kafka的過程中,可能需要把某個group中的偏移量offset匯出來檢視。下面提供一種把偏移量從zookeeper中匯出來的方法。 執行命令 bin/kafka-run-cl

基於Ubuntu Server 16.04 LTS版本安裝部署Django之:安裝MySQL數據庫

ins cli 遠程訪問 lib root 版本 連接 str ibm 1.安裝mysql以及插件: sudo apt-get install mysql-server mysql-client sudo apt-get install libmysqld-devsud

ASP.NET Core MVC Visual Studio入門新增模型

ASP.NET Core MVC 和 Visual Studio入門(四)新增模型   Rick Anderson 和Tom Dykstra   在要節中將新增一些在資料庫中管理電影的類,這些類將成為MVC應用的“Model(模型)”部分。 這些類將與Enti

Fluentd日誌處理-插件使用調試問題

logstash health grep have gin data mdb eth -i fluentd一些插件的使用 geoip的配置模版 <filter request> @type geoip geoip_lookup_keys ip bac

百度地圖openlayers融合封裝之demo展示

2018.5.28 --11:31 wmap的目的是減少地圖開發的成本和時間。  如果你需要進行地圖開發,覺的百度地圖和openlayers的api文件晦澀難懂(主要是openlayers的api很難懂,百度api入門很簡單),可以使用wmap。你不需要去學習百度地圖api

小程式成長之路_引入小程式自帶icon 引用阿里圖示

上篇我們已經成功填加tabBar,那麼我們這篇就講解一下 引用圖示icon,小程式裡有自己的圖示供大家使用,但是圖示有限,有很多都滿足不了我們的需求。這次呢 給大家介紹一下阿里圖示,裡面有大量的圖示供你選擇, 下面我就給大家介紹兩種引用icon的方法 , 引用小程式自帶

【Oracle 叢集】ORACLE DATABASE 11G RAC 知識圖文詳細教程之快取融合技術主要後臺程序

      前面已經介紹了 RAC 的後臺程序,為了更深入的瞭解這些後臺程序的工作原理,先了解一下 RAC 中多節點對共享資料檔案訪問的管理是如何進行的。要了解 RAC 工作原理的中心,需要知道 Cache Fusion 這個重要的概念,要發揮 Cache Fusion 的作用,要有一個前提條件,那就

訊息佇列系列:Rabbitmq常用命令列

來源:http://www.cnblogs.com/gossip/p/4700147.html 列印了一些rabbitmq服務狀態資訊,包括記憶體,硬碟,和使用erlong的版本資訊 rabbitmqctl -q status 各個引數說明:http://www.r

FFMEPG 平臺移植,介面簡化外部模組接入 ffmpeg android移植(ffmpeg 視訊編碼)

FFMPEG 視訊編碼最常見的H264,H265需要X264,X265外部模組支援,可以從我們開源平臺的FFMPEG編譯專案裡面獲取程式碼和配置進行一鍵式編譯:https://github.com/Car-eye-team/Car-eye-FFMPEG,我們下面的程式碼主要是為了簡化程式碼呼叫結構。只需要配置

Spring原始碼深度解析總結6—— 配置檔案的讀取Bean的載入

經過前面的分析,我們終於結束了對XML配置檔案的解析,接下來將會面對更大的挑戰,就是對bean載入的探索。bean載入的功能的實現遠比bean的解析複雜的多,同樣,我們還是以最簡單的示例為基礎,對於bean的功能,在Spring中的呼叫方式為:MyTestBean bean

訊息佇列入門ActiveMQ的應用例項

>>部署和啟動ActiveMQ 我下載的是apache-activemq-5.12.0-bin.tar.gz, 解壓到本地目錄,進入到bin路徑下, 執行activemq啟動ActiveMQ。 執行方式:啟動 ./activemq start ActiveMQ預設使用的TCP連線埠是61616

玩轉extjs5之Ext.data.ModelExt.data.Store

一、Ext.data.Model        (1)Model代表應用程式管理的一些物件。例如,我們可能會為 我們想在系統中建模的現實世界中的一些物體像使用者、產品和汽車等定義一個Model。這些Model在 模型管理器中註冊,被Store使用, 而這些Store又被許多

Kafka 訊息偏移的維護

Kafka是大資料領域常用的訊息佇列,其高效的吞吐量和分散式容錯等特性是其收到青睞的重要原因。 kafka訊息的位置 用好Kafka,維護其訊息偏移量對於避免訊息的重複消費與遺漏消費,確保訊息的Exactly-once是至關重要的。 kafka的訊息所

zookeeper上修改kafka消費組的偏移

[[email protected] bin]$ zookeeper-shell.sh 192.168.0.1:2181 Connecting to 192.168.0.1:2181 Wel