1. 程式人生 > >Kafka單執行緒Consumer及引數詳解

Kafka單執行緒Consumer及引數詳解

請使用0.9以後的版本:

示例程式碼

 Properties props = new Properties();
        props.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        
        props.put("auto.offset.reset","earliest");
        
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("foo", "bar"));
      try{  
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
         }
        }finally{
          consumer.close();
        }

1、只需要配置kafka的server groupid autocommit 序列化 autooffsetreset(其中 bootstrap.server group.id key.deserializer value.deserializer 必須指定);

2、用這些Properties構建consumer物件(KafkaConsumer還有其他構造,可以把序列化傳進去);

3、subscribe訂閱topic列表(可以用正則訂閱Pattern.compile("kafka.*")

使用正則必須指定一個listener subscribe(Pattern pattern, ConsumerRebalanceListener listener)); 可以重寫這個介面來實現 分割槽變更時的邏輯。如果設定了enable.auto.commit = true 就不用理會這個邏輯。

4、然後迴圈poll訊息(這裡的1000是超時設定,如果沒有很多資料,也就等一秒);

5、處理訊息(列印了offset key value 這裡寫處理邏輯)。

6、關閉KafkaConsumer(可以傳一個timeout值 等待秒數 預設是30)。

引數詳解

bootstrap.server(最好用主機名不用ip kafka內部用的主機名 除非自己配置了ip)

deserializer 反序列化consumer從broker端獲取的是位元組陣列,還原回物件型別。

預設有十幾種:StringDeserializer LongDeserializer DoubleDeserializer。。

也可以自定義:定義serializer格式 建立自定義deserializer類實現Deserializer 介面 重寫邏輯

除了四個必傳的 bootstrap.server group.id key.deserializer value.deserializer

還有session.timeout.ms "coordinator檢測失敗的時間"

是檢測consumer掛掉的時間 為了可以及時的rebalance 預設是10秒 可以設定更小的值避免訊息延遲。

max.poll.interval.ms "consumer處理邏輯最大時間"

處理邏輯比較複雜的時候 可以設定這個值 避免造成不必要的 rebalance ,因為兩次poll時間超過了這個引數,kafka認為這個consumer已經跟不上了,會踢出組,而且不能提交offset,就會重複消費。預設是5分鐘。

auto.offset.reset "無位移或者位移越界時kafka的應對策略"

所以如果啟動了一個group從頭消費 成功提交位移後 重啟後還是接著消費 這個引數無效

所以3個值的解釋是:

earliset 當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從最早的位移消費

latest 當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分割槽下的資料 none topic各分割槽都存在已提交的offset時,從offset後開始消費;只要有一個分割槽不存在已提交的offset,則丟擲異常

(注意kafka-0.10.1.X版本之前: auto.offset.reset 的值為smallest,和,largest.(offest儲存在zk中) 、

我們這是說的是新版本:kafka-0.10.1.X版本之後: auto.offset.reset 的值更改為:earliest,latest,和none (offest儲存在kafka的一個特殊的topic名為:__consumer_offsets裡面))

enable.auto.commit 是否自動提交位移

true 自動提交 false需要使用者手動提交 有隻處理一次需要的 最近設定為false自己控制。

fetch.max.bytes consumer單次獲取最大位元組數

max.poll.records 單次poll返回的最大訊息數

預設500條 如果消費很輕量 可以適當提高這個值 增加消費速度。

hearbeat.interval.ms consumer其他組員感知rabalance的時間

該值必須小於 session.timeout.ms 如果檢測到 consumer掛掉 也就根本無法感知rabalance了

connections.max.idle.ms 定期關閉連線的時間

預設是9分鐘 可以設定為-1 永不關閉

更多實時計算,Kafka等相關技術博文,歡迎關注實時流式計算

相關推薦

Kafka執行Consumer引數

請使用0.9以後的版本: 示例程式碼 Properties props = new Properties(); props.put("bootstrap.servers", "kafka01:9092,kafka02:9092"); props.put("group.id",

java執行(上)——執行狀態屬性

在作業系統中,我們通過分時的方法在CPU上不斷地切換處理多個程序任務,給人並行處理的感覺,這種方法在作業系統中叫做多工。多工在較低層次上擴展出多執行緒的概念,也就是指一個程式同時執行多個執行緒。這種可以同時執行一個以上的執行緒的程式,我們叫做多執行緒程式。

pthread_create函式的詳細講解(包括向執行函式傳遞引數)

pthread_create是UNIX環境建立執行緒函式 標頭檔案   #include<pthread.h> 函式宣告   int pthread_create(pthread_t*restrict tidp,const pthread_attr_t

Python多程序與多執行程式設計GIL

介紹如何使用python的multiprocess和threading模組進行多執行緒和多程序程式設計。 Python的多程序程式設計與multiprocess模組 python的多程序程式設計主要依靠multiprocess模組。我們先對比兩段程式碼,看看多程序程式設計的優勢。我們模擬了一個非常耗時的任

kafka執行消費處理和手動提交處理方案設計

kafka與其他訊息佇列不同的是, kafka的消費者狀態由外部( 消費者本身或者類似於Zookeeper之類的外部儲存 )進行維護, 所以kafka的消費就更加靈活, 但是也帶來了很多的問題, 因為客戶端消費超時被判定掛掉而消費者重新分配分割槽, 導致重複消費

UI的執行問題:執行原因更新UI的四種方式

1、UI執行緒為什麼設計為單執行緒?  UI控制元件的操作不是執行緒安全的,對於多執行緒併發訪問的時候,如果使用加鎖機制會導致:  UI控制元件的操作變得很複雜。  加鎖的操作必定會導致效率下降。  所以android系統在UI操作上使用單執行緒機制。  2、更新UI有四種方

Python程序、執行、協程執行效能、效率(tqdm)

多程序實踐——multiprocessing 筆者最近在實踐多程序發現multiprocessing,真心很好用,不僅加速了運算,同時可以GPU呼叫,而且互相之間無關聯,這樣可以很放心的進行計算。 譬如(參考:多程序): from multiprocessing import Pool

Java執行池Executor框架

Java的執行緒既是工作單元,也是執行機制。從JDK 5開始,把工作單元與執行機制分離開來。工作單元包括Runnable和Callable,而執行機制由Executor框架提供。 Executor框架簡介在HotSpot VM的執行緒模型中,Java執行緒(java.lang.Thread)被一對一對映為本

執行之Future使用

什麼是Future Future是一個未來物件,裡面儲存這執行緒處理結果,它像一個提貨憑證,拿著它你可以隨時去提取結果 什麼時候使用 在兩種情況下,離開Future幾乎很難辦。 一種情況是拆分訂單,比如你的應用收到一個批量訂單,此時如果要求最快的處理訂單,那麼需要併發

晶振工作原理引數

晶振工作原理及引數詳解(最透徹) 晶振是石英晶體諧振器(quartz crystal oscillator)的簡稱,也稱有源晶振,它能夠產生中央處理器(CPU)執行指令所必須的時鐘頻率訊號,CPU一切指令的執行都是建立在這個基礎上的,時鐘訊號頻率越高,通常CPU的執行速度也就越快。 只要是包

美團面試題:Java-執行池 ThreadPool 專題

去美團面試,問到了什麼是執行緒池,如何使用,為什麼要用,以下做個總結。關於執行緒之前也寫過一篇文章《高階面試題總結—執行緒池還能這麼玩?》 1、什麼是執行緒池:  java.util.concurrent.Executors提供了一個 java.util.conc

Android 多執行之HandlerThread 完全

  之前對執行緒也寫過幾篇文章,不過倒是沒有針對android,因為java與android線上程方面大部分還是相同,不過本篇我們要介紹的是android的專屬類HandlerThread,因為HandlerThread在設定思想上還是挺值得我們學習的,那麼我們下面來

Java多執行Condition介面原理

Condition介面提供了類似Object的監視器方法,與Lock配合可以實現等待/通知模式,但是這兩者在使用方式以及功能特性上還是有差別的 Condition介面詳解 Condition定義了等待/通知兩種型別的方法,當前執行緒呼叫這些方法時,需要提前獲

[Kafka] Apache Kafka 簡介、叢集搭建配置

前言 kafka是一種高吞吐量的分散式釋出訂閱訊息系統,它可以處理消費者規模的網站中的所有動作流資料。這種動作(網頁瀏覽,搜尋和其他使用者的行動)是在現代網路上的許多社會功能的一個關鍵因素。這些資料通常是由於吞吐量的要求而通過處理日誌和日誌聚合來解決。 Kafk

rsync用法引數

rsync rsync的目的是實現本地主機和遠端主機上的檔案同步(包括本地推到遠端,遠端拉到本地兩種同步方式),也可以實現本地不同路徑下檔案的同步,但不能實現遠端路徑1到遠端路徑2之間的同步(scp可以實現)。 不考慮rsync的實現細節,就檔案同步而言,涉及了原始檔和目

執行之重排序

重排序 重排序是指編譯器和處理器為了優化程式效能而對指令序列進行重新排序的一種手段。 資料依賴性 如果兩個操作訪問同一個變數,且這兩個操作中有一個為寫操作,此時這兩個操作之間就存在資料的依賴性。資料依賴分為3中型別,如下表所示: 上面3中情況,只要重排序兩個操作的順序。程式的結

C#基礎系列:多執行的常見用法

前言:此篇就主要從博主使用過的幾種多執行緒的用法從應用層面大概介紹下。文中觀點都是博主個人的理解,如果有不對的地方望大家指正~~ 1、多執行緒:使用多個處理控制代碼同時對多個任務進行控制處理的一種技術。據博主的理解,多執行緒就是該應用的主執行緒任命其他多個執行緒去協

Android 多執行程式設計之 HandlerThread

    HandlerThread有那些特點: HandlerThread本質上是一個執行緒類,它繼承了Thread; HandlerThread有自己的內部Looper物件,可以進行looper迴圈; 通過獲取HandlerThread的looper物件傳

hadoop的三種執行模式區別配置

       基於hadoop進行開發時,有時候,會被hadoop的三種執行模式搞混,也會被hadoop叢集有哪些配置弄得暈頭轉向,因為看不同的文件有不同的配置方法。所以要先弄明白hadoop的執行模

Java多執行同步和非同步

1. 多執行緒併發時,多個執行緒同時請求同一資源,必然導致此資源的資料不安全。 2. 執行緒池 在WEB服務中,對於web伺服器的響應速度必須儘可能的快,這就容不得在使用者提交請求按鈕後,再建立執行緒提供服務。為了減少使用者的等待時間,執行緒必須預先建立,放線上程池中,執行