1. 程式人生 > >kafka官網示例說明--KafkaConsumer

kafka官網示例說明--KafkaConsumer

Kafka client會消費kafka cluster中的記錄。

它將透明地處理Kafka叢集中的伺服器故障,並透明地適應它在叢集內遷移的資料分割槽。該客戶機還與伺服器互動,以允許使用者組使用消費者組來負載平衡消費(如下所述)。

消費者維護TCP連線到必要的代理來獲取資料。使用後未能關閉消費者會洩漏這些連線。消費者不是執行緒安全的。更多細節見多執行緒處理。

偏移量

Kafka為分割槽中的每個記錄保留一個數值偏移量。這個偏移量充當該分割槽中記錄的唯一識別符號,並且表示該分割槽中的消費者的位置。也就是說,擁有5號位置的消費者使用偏移值為0到4的記錄,並將使用偏移量5記錄下一個記錄。實際上,與使用者的使用者有關的位置有兩個概念。

消費者的位置將提供下一個記錄的偏移量。它將比消費者在該分割槽中看到的最高偏移量大一個。它在消費者每次接收資料呼叫poll(long)和接收訊息時自動地前進。
已提交的位置是安全儲存的最後一個偏移量。如果程序失敗並重新啟動,這將恢復到它將恢復的偏移量。消費者可以定期自動提交補償;或者,它可以選擇通過呼叫commitSync來手動控制這個提交的位置,它會阻塞,直到在提交過程中成功提交了補償或致命錯誤,或者提交了非阻塞的commitAsync,並將觸發OffsetCommitCallback,要麼成功提交,要麼失敗。

消費者組和訂閱主題

Kafka使用了消費者組(Consumer Groups)的概念來允許一個過程池來劃分消費和處理記錄的工作。
這些程序可以在同一臺機器上執行,或者,更有可能的是,它們可以分佈在許多機器上,從而為處理提供額外的可伸縮性和容錯。
每個Kafka消費者可以配置一個消費者團體,它屬於,並且可以動態設定主題希望通過訂閱訂閱列表(列表,Consumer Rebalance Listener),或通過訂閱訂閱所有主題匹配特定的模式(模式,ConsumerRebalanceListener)。Kafka將在每個使用者組中的一個流程中傳遞訂閱主題中的每個訊息。這是通過在每個組的消費者過程中平衡主題中的分割槽來實現的。因此,如果有一個帶有四個分割槽的主題,以及一個有兩個程序的消費者組,每個程序將從兩個分割槽中消耗。這個組成員是動態維護的:如果程序失敗,分配給它的分割槽將被重新分配給同一組中的其他程序,如果一個新程序加入這個組,那麼分割槽將從現有的消費者轉移到這個新的程序。
因此,如果兩個程序訂閱一個主題同時指定不同的組,則它們將在該主題中獲得所有的記錄;如果他們都指定相同的組,他們將會得到大約一半的記錄。從概念上來說,您可以將消費者組看作是由多個程序組成的單個邏輯訂閱者。作為一個多使用者系統,Kafka自然支援在沒有重複資料的情況下為給定的主題提供任意數量的使用者組(額外的消費者實際上相當便宜)。這是對訊息傳遞系統中常見的功能的輕微概括。 為了獲得與傳統訊息傳遞系統中的佇列相似的語義,所有流程都將是單個消費者組的一部分,因此記錄交付將在組中與佇列相同。與傳統的訊息傳遞系統不同,您可以擁有多個這樣的組。要獲得類似於傳統訊息傳遞系統中的pub-sub的語義,每個流程都有自己的消費者組,因此每個程序將訂閱釋出到該主題的所有記錄。 此外,自動組分配時,消費者可以通過Consumer Rebalance Listener通知,這允許他們完成清理等必要的應用程式級的邏輯狀態,手動抵消提交(注意,抵消總是承諾對於一個給定的消費者團體),等等的詳細資訊,請參閱儲存抵消外部卡夫卡對消費者也可以手動指定分割槽分配給它分配(列表),禁用這個動態分割槽分配。

用法示例

1. 自動確認Offset

Properties props = new Properties();
/* 定義kakfa 服務的地址,不需要將所有broker指定上 */
props.put("bootstrap.servers", "localhost:9092");
/* 制定consumer group */
props.put("group.id", "test");
/* 是否自動確認offset */
props.put("enable.auto.commit", "true");
/* 自動確認offset的時間間隔 */
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
/* key的序列化類 */
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
/* value的序列化類 */
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 /* 定義consumer */
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
/* 消費者訂閱的topic, 可同時訂閱多個 */
consumer.subscribe(Arrays.asList("foo", "bar"));

 /* 讀取資料,讀取超時時間為100ms */
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}

說明: 
1. bootstrap.servers 只是代表kafka的連線入口,只需要指定叢集中的某一broker; 
2. 一旦consumer和kakfa叢集建立連線,consumer會以心跳的方式來高速叢集自己還活著,如果session.timeout.ms 內心跳未到達伺服器,伺服器認為心跳丟失,會做rebalence。

2. 手工控制Offset

如果consumer在獲得資料後需要加入處理,資料完畢後才確認offset,需要程式來控制offset的確認。舉個栗子: 
consumer獲得資料後,需要將資料持久化到DB中。自動確認offset的情況下,如果資料從kafka叢集讀出,就確認,但是持久化過程失敗,就會導致資料丟失。我們就需要控制offset的確認。

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
/* 關閉自動確認選項 */
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        buffer.add(record);
    }
    /* 資料達到批量要求,就寫入DB,同步確認offset */
    if (buffer.size() >= minBatchSize) {
        insertIntoDb(buffer);
        consumer.commitSync();
        buffer.clear();
    }
}

還可以精細的控制對具體分割槽具體offset資料的確認:

try {
    while(running) {
        ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
        for (TopicPartition partition : records.partitions()) {
            List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
            for (ConsumerRecord<String, String> record : partitionRecords) {
                System.out.println(record.offset() + ": " + record.value());
            }
            /* 同步確認某個分割槽的特定offset */
            long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
            consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
        }
    }
} finally {
  consumer.close();
}

說明:確認的offset為已接受資料最大offset+1。

3. 分割槽訂閱

可以向特定的分割槽訂閱訊息。但是會失去partion的負載分擔。有幾種場景可能會這麼玩: 
1. 只需要獲取本機磁碟的分割槽資料; 
2. 程式自己或者外部程式能夠自己實現負載和錯誤處理。例如YARN/Mesos的介入,當consumer掛掉後,再啟動一個consumer。

String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));

說明: 
1. 此種情況用了consumer Group,也不會做負載均衡。 
2. topic的訂閱和分割槽訂閱不可以在同一consumer中混用。

4. 外部儲存offset

消費者可以自定義kafka的offset儲存位置。該設計的主要目的是讓消費者將資料和offset進行原子性的儲存。這樣可以避免上面提到的重複消費問題。舉慄說明: 
訂閱特定分割槽。儲存所獲得的記錄時,將每條記錄的offset一起儲存。保證資料和offset的儲存是原子性的。當非同步儲存被異常打斷時,凡已經儲存的資料,都有有相應的offset記錄。這種方式可以保證不會有資料丟失,也不會重複的從服務端讀取。 
如何配置實現: 
1. 去使能offset自動確認:enable.auto.commit=false; 
2. 從ConsumerRecord中獲取offset,儲存下來; 
3. Consumer重啟時,呼叫seek(TopicPartition, long)重置在服務端的消費記錄。

如果消費分割槽也是自定義的,這種方式用起來會很爽。如果分割槽是自動分配的,當分割槽發生reblance的時候,就要考慮清楚了。如果因為升級等原因,分割槽漂移到一個不會更新offset的consumer上,那就日了狗了。 
該情況下: 
1. 原consumer需要監聽分割槽撤銷事件,並在撤銷時確認好offset。介面:ConsumerRebalanceListener.onPartitionsRevoked(Collection); 
2. 新consumer監聽分割槽分配事件,獲取當前分割槽消費的offset。介面:ConsumerRebalanceListener.onPartitionsAssigned(Collection); 
3. consumer監聽到 ConsumerRebalance事件,還沒有處理或者持久化的快取資料flush掉。

5. 控制消費位置

大多數情況下,服務端的Consumer的消費位置都是由客戶端間歇性的確認。Kafka允許Consumer自己設定消費起點,達到的效果: 
1. 可以消費已經消費過的資料; 
2. 可以跳躍性的消費資料; 
看下這樣做的一些場景: 
1. 對Consumer來說,資料具備時效性,只需要獲取最近一段時間內的資料,就可以進行跳躍性的獲取資料; 
2. 上面自己存offset的場景,重啟後就需要從指定的位置開始消費。 
介面上面已經提到過了,用seek(TopicPartition, long)。、 
麻蛋,說指標不就好了,這一小節就是多餘的叨叨。

6. 控制消費流Consumption Flow Control

如果一個consumer同時消費多個分割槽,預設情況下,這多個分割槽的優先順序是一樣的,同時消費。Kafka提供機制,可以讓暫停某些分割槽的消費,先獲取其他分割槽的內容。場景舉慄: 
1. 流式計算,consumer同時消費兩個Topic,然後對兩個Topic的資料做Join操作。但是這兩個Topic裡面的資料產生速率差距較大。Consumer就需要控制下獲取邏輯,先獲取慢的Topic,慢的讀到資料後再去讀快的。 
2. 同樣多個Topic同時消費,但是Consumer啟動是,本地已經存有了大量某些Topic資料。此時就可以優先去消費下其他的Topic。

調控的手段:讓某個分割槽消費先暫停,時機到了再恢復,然後接著poll。介面:pause(TopicPartition…),resume(TopicPartition…)

7. 多執行緒處理模型 Multi-threaded Processing

Kafka的Consumer的介面為非執行緒安全的。多執行緒共用IO,Consumer執行緒需要自己做好執行緒同步。 
如果想立即終止consumer,唯一辦法是用呼叫介面:wakeup(),使處理執行緒產生WakeupException。

public class KafkaConsumerRunner implements Runnable {
    /* 注意,這倆貨是類成員變數 */
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final KafkaConsumer consumer;

    public void run() {
        try {
            consumer.subscribe(Arrays.asList("topic"));
            while (!closed.get()) {
                ConsumerRecords records = consumer.poll(10000);
                // Handle new records
            }
        } catch (WakeupException e) {
            // Ignore exception if closing
            if (!closed.get()) throw e;
        } finally {
            consumer.close();
        }
    }

    // Shutdown hook which can be called from a separate thread
    public void shutdown() {
        closed.set(true);
        consumer.wakeup();
    }
}

說明: 
1. KafkaConsumerRunner是runnable的,請自覺補腦多執行緒執行; 
2. 外部執行緒控制KafkaConsumerRunner執行緒的停止; 
3. 主要說的是多執行緒消費同一topic,而不是消費同一分割槽;

比較一下兩種模型:

Consumer單執行緒模型

優點:實現容易; 
沒有執行緒之間的協作。通常比下面的那種更快; 
單分割槽資料的順序處理; 
缺點:多個TCP連線,但是關係不大,kafka對自己的server自信滿滿; 
太多的Request可能導致server的吞吐降低一丟丟; 
consumer數量受到分割槽數量限制,一個consumer一個分割槽;

Consumer多執行緒模型

優點:一個consumer任意多的執行緒,執行緒數不用受到分割槽數限制; 
缺點:如果有保序需求,自己要加控制邏輯; 
該模型中如果手動offset,自己要加控制邏輯; 
一種可行的解決辦法:為每個分割槽分配獨立的儲存,獲取的資料根據資料所在分割槽進行hash儲存。這樣可以解決順序消費,和offset的確認問題。

相關推薦

kafka示例說明--KafkaConsumer

Kafka client會消費kafka cluster中的記錄。 它將透明地處理Kafka叢集中的伺服器故障,並透明地適應它在叢集內遷移的資料分割槽。該客戶機還與伺服器互動,以允許使用者組使用消費者組來負載平衡消費(如下所述)。 消費者維護TCP連線到必要的代理來獲

Leaflet_創建地圖(示例

位置 custom phone world! -c setview art locate ipa 官網:http://leafletjs.com/examples.html 快速啟動指南 http://leafletjs.com/examples/quick-start/e

Dubbo入門-示例dubbo-demo的使用

2.進入剛clone下來的incubator-dubbo目錄,使用mvn編譯安裝【mvn install -Dmaven.test.skip=true】,等待安裝結束看到BuildSuccess即可。 3.目錄不變,設定專案使用idea編輯器開啟(mvn自己

Elasticsearch.Net 示例的坑

  經過昨天的ElasticSearch 安裝,服務以及可以啟動了,接下來就可以開發了,找到了官網提供的API以及示例,Es 官方提供的.net 客戶端有兩個版本一個低階版本: 【Elasticsearch.Net.dll】這個dll檔案官方解釋無依賴關係的客戶端,對於您如何構建和表示您的請求和相應沒有任何意

Kafka介紹

本文內容來自:http://kafka.apache.org/documentation.html#quickstart Kafka is a distributed, partitioned, replicated commit log service. It prov

FineUI(開源版)v4.2.2釋出(8年125個版本,示例突破300個)!

開源版是 FineUI 的基石,從 2008 年至今已經持續釋出了 120 多個版本,擁有會員 15,000 多位,捐贈會員達到 1,200 多位。 FineUI(開源版)v4.2.2 是 8 年來的第 125 個版本,對錶單、表格進行底層結構的重要調整,使其更簡單更易於擴充套件,同時官網示例數也突破

kindeditor異步加載示例無效,解決無法通過方法初始化編輯器

har cdd kxml yep adf tr1 ket 沒有效果 iba 官網示例:http://kindeditor.net/ke4/examples/dynamic-load.html 項目中發現一個問題,kindeditor官網是通過 初始化編輯器,但是現在有

VuetodoMVC示例

這個示例是模仿官網示例樣式和功能用我自己的方式寫的,基本上沒有看官網的原始碼,只參考自定義指令。讓我們一步步來探討一下。官網demo 要實現的功能 單條新增todo 單條刪除todo 雙擊編輯todo 單條todo已完成相應樣式狀態改變 全部todo是已完成相應樣式狀態改變 清除

基於Kafka 入門小案例-學習

首先Maven引入 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <v

mongoDB中Storing Log Data的python示例程式碼有誤,

如題,    在這篇文章中的python程式碼中, 構造一個記錄的程式碼如下:     >>> event = { ... _id: bson.ObjectId(), ... host: "127.0.0.1"

ReduxCounter最基本示例的思考

1.不使用redux實現 如果不使用redux,僅僅依靠react去實現Counter功能是極其簡單的。程式碼如下: index.js import React from 'react'; import ReactDOM from 'react-dom'; import Cou

使用java的MultipartFile實現layui檔案上傳實現全部示例,java檔案上傳

layui(諧音:類UI) 是一款採用自身模組規範編寫的前端 UI 框架,遵循原生 HTML/CSS/JS 的書寫與組織形式,門檻極低,拿來即用。 layui檔案上傳示例地址:https://www.layui.com/demo/upload.html 本次教程是基於springboot2.0的。 測試

【Spark深入學習 -16】學習SparkSQL

客戶 .com pmu 參考資料 一行 uap lsa bmi orb ----本節內容-------1.概覽 1.1 Spark SQL 1.2 DatSets和DataFrame2.動手幹活 2.1 契入點:SparkSessi

vue2.0實踐 —— Node + vue 實現移動

縮放 one fix show htm cati 接口 簡介 tac 簡介 使用 Node + vue 對公司的官網進行了一個簡單的移動端的實現。 源碼 https://github.com/wx1993/node-vue-fabaocn 效果 組件 輪

關註PHPthinking微信公眾號——紅包來襲

技術 text font 微信訂閱號 微信紅包 water fonts pac think 歡迎大家掃描關註PHPthinking官方微信訂閱號,我們將給您定期發送質量博文、新聞趣事、站點公告等等,同一時候還有PHPthinking準備的每日微信紅包(金額不等,已發出百

Android學習 多讀,故意健康---手勢

same str ces 12px lis assume extend current -- 官網地址 ttp://developer.android.com/training/gestures/detector.html: 一、能夠直接覆蓋Activity的on

比特幣新技術|比特幣中國

利潤 需要 公司 影響 投資者 網站 社區 聯網 勝利 親愛的朋友們! 改善您生活最好方式之一,是每天在不斷地改善自己的所想法,感情,話與決定!如果您對額外的收入來源有興趣的話(類似互聯網交易,交易所交易,投資)那,歡迎您參與國際儲備體IRS公司。由於我們一直在努力為了

Python自學筆記-map和reduce函數(來自廖雪峰的Python3)

求和 rabl style 序列 list port lambda char att 感覺廖雪峰的官網http://www.liaoxuefeng.com/裏面的教程不錯,所以學習一下,把需要復習的摘抄一下。 以下內容主要為了自己復習用,詳細內容請登錄廖雪峰的官網查看。

入侵拿下DVBBS php詳細過程(圖)

sta 電話 subst wget 團隊 sim 不遠 cls 接下來 幾 個月前,DVBBS php2.0暴了一個可以直接讀出管理員密碼的sql註入漏洞,當時這個漏洞出來的時候,我看的心癢,怎麽還會有這麽弱智的漏洞,DVBBS php2.0這套代碼我還沒仔細看過,於是5月

做sxy的一點經驗

offset true document container fse dev cnblogs 可用 滾輪 jquery2及以上不再支持IE8;IE不支持document.body.scrollTop, 也不支持$().scrollTop(), 用 var top = wi