1. 程式人生 > >kafka0.90新版消費者API介紹及使用

kafka0.90新版消費者API介紹及使用

kafka最初時開發時, 所帶的producer和consumer client都是Scala所寫. 我們逐漸發現這些API具有一些限制. high-level的api支援consumer groups和故障轉移, 但是不支援許多複雜的使用場景, 同時還有一個simple consumer client提供全部的控制, 但是需要使用者管理故障轉移和負責錯誤處理. 為了支援一些舊client api很難或者無法做到的用例場景, 我們決定重新設計client, 提供一組能夠長期支援的新api.

第一步是在0.8.1重寫了Producer API, 0.9版本作為第二個步驟引入了新的Consumer API. 新的API基於kafka本身提供的一個新的group coordination 協議構建, 有以下的優點:

  • 清晰統一的API: 新的consumer結合了之前的”simple”和”high-level” consumer客戶端, 不僅提供group coordination還可以進行自己控制消費策略的lower level訪問

  • 減少了依賴: 新的consumer由純Java所寫, 不需要依賴Scala執行時環境, 也不需要Zookeeper, 意味著你的專案所需要的包會減少一些

  • 更好的安全性: Kafka 0.9版本實現的 security extensions在新的consumer中提供支援

  • 新的consumer還添加了一組用於管理容錯的consumer group的協議. 之前這個功能由一個重量級的

    Java client(與Zookeeper進行互動)來實現. 這種複雜的邏輯使得完整功能的客戶端難以由其他語言實現. 由於引進新的協議, 這變得遠比以前容易. 實際上已經還在C客戶端上轉向使用這個協議.

雖然新consumer使用了重新設計的API和一個新的協調協議, 但是和之前舊版本API概念並不是完全不同的, 所以對舊consumer熟悉的使用者理解新的API並不會有什麼困難. 然而還是有一些關於group管理和執行緒模型等的很微妙的細節值得特別注意. 這篇教程的目的就是覆蓋新consumer api的使用以及解釋所有的這些細節.

值得注意的是: 在重寫的過程中, 新的consumer在穩定性方面仍然被視為beta. 我們在0.9.0分支中以及修復了很多重要的bug, 如果你在使用0.9.0.0發行版時遇到了問題, 我們希望你能夠對那個分支進行

測試. 如果仍然有問題, 請反饋到 mail listsJIRA.

Getting Started

在開始編碼之前, 我們先回顧一下一些基本概念. 在Kafka中, 每個topic被分成一組稱為partitions的logs. Producer向這些logs的末尾寫入訊息, consumer則自己按自己的節奏讀取log. Kafka通過在一個consumer group中分配partitions來伸縮topic的消費, consumer group是一組有同樣group id的consumers. 下圖表示的是一個擁有3個分割槽的topic, 以及一個有兩個成員的consumer group. 每一個分割槽都只會分配給組內的一個成員. 
Distributing Kafka Partitions among consumer groups

舊的consumer依賴於Zookeeper來進行group管理, 新的消費者使用kafka內部的group coordination協議. 對於每個group, brokers中的一個被選為group coordinator. 這個coordinator負責管理group的狀態, 它的主要工作是當新的成員加入, 或者原本的成員離開, 或者topic的元資料發生了改變時, 協調partition分配. 重新分配partition這個過程被稱為rebalancing the group.

當group第一次初始化時, consumer通常從分割槽的最開始或者最末尾開始讀取. 每個partition log中的訊息都是按順序讀取. 隨著consumer的讀取, 它將會commit它所成功處理的message的offset. 例如下圖中, consumer的位置位於offset 6, 上一次commit的offset為1. 
consumer's position in the log

當一個partition被重分配給組內的另一個consumer, 其最初位置會被設定成上一次commit的offset. 如果上例中的consumer突然奔潰了, 那麼接管這個partition的另外一個組成員會從offset 1繼續消費. 在這種情況下,它會重新處理上一次提交位置到消費者奔潰的位置6的訊息.

這張圖還有兩個log中比較重要的位置. log end offset是最後一條寫入log的message的offset, 而high watermark是最後一條message成功複製到所有的log replica的message offset. 從consumer的角度來看, 所知道的最主要的事情就是其最多能夠讀到high watermark的message. 這能夠阻止consumer讀取到未被複制到其他broker的, 可能會丟失的message.

Configuration and Initialization 配置和初始化

開始使用consumer之前, 需要將kafka-clients依賴加入到你的專案中.

<dependency>
 <groupId>org.apache.kafka</groupId>
 <artifactId>kafka-clients</artifactId>
 <version>0.9.0.0-cp1</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 1
  • 2
  • 3
  • 4
  • 5

consumer使用一個Properties file來進行構建. 以下提供了使用consumer group所需要的最小配置.

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

和之前的consumer, producer一樣, 我們需要為consumer提供一個broker的初始化列表, 以用於發現叢集中其他的broker. 這個配置並不需要提供所有的broker, client會從給定的配置中發現所有的當前存活的broker. 這裡我們假設broker執行在本地, 同時consumer還需要知道如何反序列化message key和value. 最後, 為了加入consumer group, 我們需要配置一個group id. 隨著教程的繼續, 我們會介紹更多的配置.

* Topic訂閱*

為了開始消費, 你必須指定你的應用需要讀取的topics, 在下面的例子中, 我們訂閱了topic foo和bar.

consumer.subscribe(Arrays.asList(“foo”, “bar”));
  • 1
  • 1

訂閱之後, consumer會與組內其他consumer協調來獲取其分割槽分配. 這都是在你開始消費資料時自動處理的. 後面我們會說明如何使用assign api來手動分配分割槽, 但是要注意的是, 不能同時混合使用自動和手動的分配.

subscribe方法並不是遞增的: 你必須包含所有你想要消費的topics. 你可以在任何時刻改變你想消費的topic集, 當你呼叫subscribe時, 之前訂閱的topics列表會被新的列表取代.

基本的Poll事件迴圈

consumer需要能夠並行獲取資料, 在眾多brokers中獲取多個topic的許多分割槽訊息. 為了實現這個目的, consumer API的設計成風格類似於unix中的poll或者select呼叫: 一旦topic註冊了, 所有的coordination, rebalancing和data fetch都是由一個處於迴圈中的poll呼叫來驅動. 這樣就提供了一個能在一個執行緒裡處理所有的IO的簡單有效的實現.

在你訂閱一個topic之後, 你需要啟動這個event loop以獲得partition分配和開始獲取資料. 聽起來很複雜, 但是所有你需要做的就只有呼叫poll, 然後consumer客戶端本身負責處理其他的工作. 每一次poll呼叫都會返回從所分配的partition獲取的一組訊息(也許是空的). 下面的例子展示了一個基本的poll迴圈, 列印獲取的records的offset和value.

try {
  while (running) {
    ConsumerRecords<String, String> records = consumer.poll(1000);
    for (ConsumerRecord<String, String> record : records)
      System.out.println(record.offset() + ": " + record.value());
  }
} finally {
  consumer.close();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

poll API根據當前的位置返回records, 當group第一次建立時, 消費開始的位置會被根據reset policy(一般設定成從每個分割槽的最早的offset或者最新的offset開始)來設定. 只要consumer開始提交offset, 那麼之後的rebalance都會重置消費開始位置到最新的被提交的offset. 傳遞給poll的引數是consumer其在當前位置等待有record返回時需要被阻塞的時間. 一旦有record時, consumer會立即返回, 如果沒有record, 它將會等待直到超時.

consumer被設計成只在自己的執行緒中執行, 在沒有外部同步措施的情況下, 在多執行緒中使用時不安全的, 同時也不建議這樣做. 在這個例子中, 我們使用了一個flag來使得當應用停止時能夠從迴圈中跳出. 當這個flag被另一個執行緒設定成false時, pool返回時迴圈會跳出, 無論返回什麼record, 處理過程都會結束.這個的例子使用了一個相對較少的超時時間, 以使得關閉consumer並不會有太大的延時.

你還可以設定一個較長的timeout, 並且使用wakeup API來使得其從迴圈中跳出.

try {
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
    for (ConsumerRecord<String, String> record : records)
      System.out.println(record.offset() + “: ” + record.value());
  }
} catch (WakeupException e) {
  // ignore for shutdown
} finally {
  consumer.close();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

我們將timeout改為了Long.MAX_VALUE, 意味著consumer會一直阻塞直到有record返回. 和前面設定flag不同, 用於觸發shutdown的執行緒可以呼叫consumer.wakeup()來中斷一次poll, 使其丟擲WakeupExection. 這個API是執行緒安全的. 注意如果當前沒有活躍的poll, 那麼異常會在下一次poll呼叫時丟擲. 在這個例子中, 我們捕捉這個異常, 阻止其繼續傳播.

完整程式碼

在接下來的例子中, 我們將所有的程式碼塊放到一起來構建一個task, 初始化consumer, 訂閱一個topic列表, 並且執行poll呼叫直到外部關閉它.

public class ConsumerLoop implements Runnable {
  private final KafkaConsumer<String, String> consumer;
  private final List<String> topics;
  private final int id;

  public ConsumerLoop(int id,
                      String groupId, 
                      List<String> topics) {
    this.id = id;
    this.topics = topics;
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put(“group.id”, groupId);
    props.put(“key.deserializer”, StringDeserializer.class.getName());
    props.put(“value.deserializer”, StringDeserializer.class.getName());
    this.consumer = new KafkaConsumer<>(props);
  }

  @Override
  public void run() {
    try {
      consumer.subscribe(topics);

      while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
        for (ConsumerRecord<String, String> record : records) {
          Map<String, Object> data = new HashMap<>();
          data.put("partition", record.partition());
          data.put("offset", record.offset());
          data.put("value", record.value());
          System.out.println(this.id + ": " + data);
        }
      }
    } catch (WakeupException e) {
      // ignore for shutdown 
    } finally {
      consumer.close();
    }
  }

  public void shutdown() {
    consumer.wakeup();
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

為了測試這個例子, 你需要一個執行0.9.0.0的kafka發行版的broker, 以及一個提供一些string資料進行消費的topic. 寫入一些string資料的最簡單的方法就是通過kafka-verifiable-producer.sh指令碼. 你需要確保topic有超過一個partition. 例如對於只有一個執行在localhost的kafka broker和Zookeeper, 你可以在kafka發行包根目錄下執行以下的命令: 
# bin/kafka-topics.sh --create --topic consumer-tutorial --replication-factor 1 --partitions 3 --zookeeper localhost:2181

# bin/kafka-verifiable-producer.sh --topic consumer-tutorial --max-messages 200000 --broker-list localhost:9092

然後我們可以用以下程式碼來設定一個有3個成員的consumer group, 所有的成員都訂閱我們剛剛建立的topic.

public static void main(String[] args) {
  int numConsumers = 3;
  String groupId = "consumer-tutorial-group"
  List<String> topics = Arrays.asList("consumer-tutorial");
  ExecutorService executor = Executors.newFixedThreadPool(numConsumers);

  final List<ConsumerLoop> consumers = new ArrayList<>();
  for (int i = 0; i < numConsumers; i++) {
    ConsumerLoop consumer = new ConsumerLoop(i, groupId, topics);
    consumers.add(consumer);
    executor.submit(consumer);
  }

  Runtime.getRuntime().addShutdownHook(new Thread() {
    @Override
    public void run() {
      for (ConsumerLoop consumer : consumers) {
        consumer.shutdown();
      } 
      executor.shutdown();
      try {
        executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
      } catch (InterruptedException e) {
        e.printStackTrace;
      }
    }
  });
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

這個例子將三個Runnable consumer提交到一個executor. 每一個執行緒都有一個單獨的id, 所以你可以看出來哪個執行緒正在接收資料. 當你準備結束程序時, shutdown hook會被呼叫, 它會呼叫wakeup來終止三個執行緒, 並且等待它們關閉. 以下是程式執行的資料:

2: {partition=0, offset=928, value=2786}
2: {partition=0, offset=929, value=2789}
1: {partition=2, offset=297, value=891}
2: {partition=0, offset=930, value=2792}
1: {partition=2, offset=298, value=894}
2: {partition=0, offset=931, value=2795}
0: {partition=1, offset=278, value=835}
2: {partition=0, offset=932, value=2798}
0: {partition=1, offset=279, value=838}
1: {partition=2, offset=299, value=897}
1: {partition=2, offset=300, value=900}
1: {partition=2, offset=301, value=903}
1: {partition=2, offset=302, value=906}
1: {partition=2, offset=303, value=909}
1: {partition=2, offset=304, value=912}
0: {partition=1, offset=280, value=841}
2: {partition=0, offset=933, value=2801}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

消費者存活

每個consumer都是被分配它所訂閱的topics中所有partitions的一部分partitions. 就像是對這些partitions的group lock. 只要這個鎖被持有, 組內其他的成員就無法從這些分割槽中讀取資料. 當consumer還存活著, 這就是你想要的, 這是能夠避免重複消費的方法(加鎖). 但是如果消費者因為機器或者應用的故障掛掉了, 那麼你希望鎖能夠釋放, 並且分割槽能夠分配給其他存活的consumer.

Kafka group coordination協議通過心跳機制來解決這個問題. 在每次重分配之後, 當前所有的成員會週期性傳送心跳給group coordinator. 只要coordinator接收心跳, 它就會假設成員是存活的. 每次接收到心跳, coordinator都會開始(或重置)一個timer. 當timer過期時, 如果還沒有心跳收到, coordinator會將這個成員標記為掛掉, 然後通知其他組內其他成員需要重新分配分割槽. timer的時間被稱為session timeout, 由客戶端的session.timeout.ms進行設定.

props.put(“session.timeout.ms”, “60000”);
  • 1
  • 1

session timeout能夠保證, 當機器或者應用奔潰, 或者網路分割槽導致consumer與coordinator分隔開來時, 鎖會被釋放. 然而應用故障判斷卻有點討巧, 因為有可能consumer一直在傳送心跳給coordinator, 但並意味著應用的狀態是正常的.

consumer的poll api就是設計用於解決這個問題的(傳送心跳), 當你呼叫poll或者其他阻塞API時所有的網路IO都是在前臺處理的, consumer並不使用任何後臺執行緒. 意味著傳送給coordinator的心跳只會在poll呼叫時被髮送( 譯者注: 這個在0.10.1.0版本改成後臺傳送心跳), 如果應用停止poll( 無論是處理record的程式碼丟擲了異常或者系統奔潰了),那麼心跳將會停止傳送, 那麼session timeout就會過期, 組內的分割槽就會重新分配.

唯一一個你需要注意的問題就是: 如果你處理訊息的時間比session timeout要長的話, 那麼會使得coordinator認為consumer已經掛了, 導致partition重新分配. 你應該將session timeout設定的足夠大, 以使得這種情況不會發生. 預設session timeout是30s, 但是將它設定成幾分鐘是不可行的, 這樣就會導致coordinator需要更長的時間來檢測真正的consumer奔潰.

訊息傳遞語義

當一個consumer group被建立時, 最開始消費的offset位置是由auto.offset.reset配置來控制的. 一旦consumer開始消費, 它會根據應用的需要自動提交offset. 每次rebalance, 消費開始的position都會被設定成分割槽最後提交的offset. 如果consumer在為已經成功處理的message提交offset之前奔潰了, 那麼重新分配到這個分割槽的consumer會重複消費這些訊息. 所以, 你提交訊息的頻率越頻繁, 那麼因為奔潰而造成的重複消費就會越少.

到現在,我們都假定開啟了自動提交. 當我們將enable.auto.commit設定成true(預設是true), consumer會根據auto.commit.interval.ms的設定, 來週期性自動觸發offset提交. 通過減少commit間隔, 你可以減少在奔潰之後消費者需要重新處理的message數量.

為了使用consumer的commit API, 你應該首先通過在consumer的配置中設定enable.auto.commit為false來禁止自動commit.

props.put(“enable.auto.commit”, “false”);
  • 1
  • 1

commit API本身用起來不難, 但是重要的是你如何把它放入到程式碼中. 下面使用同步commit API是手動commit最簡單的方法.

try {
  while (running) {
    ConsumerRecords<String, String> records = consumer.poll(1000);
    for (ConsumerRecord<String, String> record : records)
      System.out.println(record.offset() + ": " + record.value());

    try {
      consumer.commitSync();
    } catch (CommitFailedException e) {
      // application specific failure handling
    }
  }
} finally {
  consumer.close();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

使用無參的commitSync方法會提交上次poll返回的最新的offset. 這個呼叫是阻塞式的, 直到commit成功或者因為不可恢復的錯誤而失敗. 你需要擔心的最大的問題就是訊息處理時間可能比session timeout長. 當這種情況發生時, coordinator會把這個consumer踢出group, 那麼consumer就會丟擲CommitFailedException. 你的應用應該處理這個錯誤, 回滾自從上次成功提交offset以來的消費的message造成的改變.

一般你應該在訊息被成功處理完後才去提交offset. 如果consumer在commit傳送之前就奔潰了, 那麼message將會被重新處理. 如果commit策略保證最後提交的offset不會超過當前的消費position, 那麼你就會獲得”at least once(至少一次)”的訊息分發語義. 
consumer-atmostonce-1.png 

*Figure 3: 提交的offset超前於當前消費位置*

通過改變commit策略來保證當前的消費位置絕對不超過最後一次committed offset, 如上圖所示, 那麼你會獲得”at most once(至多一次)”的語義. 如果消費者在處理到最後一次committed offset之前就奔潰了, 那麼所有在這個間隔之間的資料都會丟失, 但是你可以確保沒有訊息會被處理超過一遍. 為了實現這種策略, 我們只需要改變commit和訊息處理的順序.
try {
  while (running) {
  ConsumerRecords<String, String> records = consumer.poll(1000);

  try {
    consumer.commitSync();
    for (ConsumerRecord<String, String> record : records)
      System.out.println(record.offset() + ": " + record.value());
    } catch (CommitFailedException e) {
      // application specific failure handling
    }
  }
} finally {
  consumer.close();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

使用自動提交為你提供”at least once”的語義, 因為consumer保證只commit返回給應用的message的offset. 你可能會重複處理的message就被限定在, 兩次commit時間間隔(由auto.commit.interval.ms設定)之間你的應用處理的message.

通過使用commit API, 你對可以接受多少重複處理有更好的控制. 在最極端的情況下, 你可以在每次處理完訊息都提交offset. 如下面所示:

try {
  while (running) {
    ConsumerRecords<String, String> records = consumer.poll(1000);

    try {
      for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.offset() + ": " + record.value());
        consumer.commitSync(Collections.singletonMap(record.partition(), new OffsetAndMetadata(record.offset() + 1)));
      }
    } catch (CommitFailedException e) {
      // application specific failure handling
    }
  }
} finally {
  consumer.close();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

在這個例子中, 我們在commitSync方法中顯式的傳遞了我們想要提交的offset. 提交的offset應該是你的應用將要讀取的下一條message的offset. 當呼叫無參的commitSync方法時, 就會提交返回給應用的最後一條message的offset(再加上1), 但是我們不能這樣做, 因為這樣會使得提交的位置超過實際處理的進度.

明顯每消費一條訊息就commit offset並不合適大多數場景, 因為處理執行緒必須阻塞, 直到commit請求從伺服器返回. 這樣會大大減少吞吐量. 一個更加合理的方式就是每N條記錄就提交一次, N可以進行調整以獲得更好的效能.

commitSync方法的引數是一個鍵是topic partition, 值是offsetAndMetadata例項的map. commit API允許在每次commit時包含一些Metadata, 這些資料可以是記錄commit的時間, 傳送commit的host地址, 或者任何你的應用需要的資訊. 在這個例子中, 我們讓它為空.

相對每次接收到訊息都提交offset, 更加合理的方法就是處理完一個分割槽的訊息後再commit offset. ConsumerRecords Collection提供訪問其中包含的partitions集合的方法, 以及每個partition的messages的方法. 下面的例子就展示了用法.

try {
  while (running) {
    ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
    for (TopicPartition partition : records.partitions()) {
      List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
      for (ConsumerRecord<String, String> record : partitionRecords)
        System.out.println(record.offset() + ": " + record.value());

      long lastoffset = partitionRecords.get(partitionRecords.size() - 1).offset();
      consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastoffset + 1)));
    }
  }
} finally {
  consumer.close();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

到目前為止, 所有的例子都是關注於同步commit API, 但是consumer同樣也有一個非同步的API, commitAsync, 使用非同步的commit可以使得你的應用擁有更高的吞吐量, 因為你可以不用等待commit返回, 開始處理下一批message. 需要權衡的就是你可能隨後發現, commit失敗了. 以下的例子展示了基本用法:

try {
  while (running) {
    ConsumerRecords<String, String> records = consumer.poll(1000);
    for (ConsumerRecord<String, String> record : records)
      System.out.println(record.offset() + ": " + record.value());

    consumer.commitAsync(new OffsetCommitCallback() {
      @Override
      public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, 
                             Exception exception) {
        if (exception != null) {
          // application specific failure handling
        }
      }
    });
  }
} finally {
  consumer.close();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

注意到對於commitAsync我們提供了一個callback, 這個方法在consumer commit結束時被呼叫(無論commit成功與否), 如果你不需要callback, 那麼你可以呼叫無參的commitAsync.

檢視Consumer Group資訊

當一個consumer group處於活躍時, 你可以在命令列中使用位於Kafka發行包bin目錄下的consumer-groups.sh指令碼獲取partition assignment和消費進度.

# bin/kafka-consumer-groups.sh –new-consumer –describe –group consumer-tutorial-group –bootstrap-server localhost:9092 
  • 1
  • 1

例子輸出結果如下所示:

GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER
consumer-tutorial-group, consumer-tutorial, 0, 6667, 6667, 0, consumer-1_/127.0.0.1
consumer-tutorial-group, consumer-tutorial, 1, 6667, 6667, 0, consumer-2_/127.0.0.1
consumer-tutorial-group, consumer-tutorial, 
            
           

相關推薦

kafka0.90新版消費者API介紹使用

kafka最初時開發時, 所帶的producer和consumer client都是Scala所寫. 我們逐漸發現這些API具有一些限制. high-level的api支援consumer groups和故障轉移, 但是不支援許多複雜的使用場景, 同時還有一個simp

Parallels Desktop 12最新版軟件介紹版本信息

img 虛擬系統 too roi mar top cto 功能介紹 ffffff 軟件介紹:Parallels Desktop 13 Mac版是Mac平臺上一款非常經典的Mac虛擬機軟件,Parallels Desktop可以在你的Mac電腦上安裝一個windows系統,讓

Kafka新版消費者API示例(一)

Kafka的高階消費者(high-level consumer)和低階消費者(low-level consumer,底層用SimpleConsumer實現)是舊版本的consumer中的。 新版本的consumer中沒有這兩個概念。新版本把高階消費者和低階消費者整合到一起了,對應KafkaCon

Kafka新版消費者API示例(二)

kafka手動提交策略提供了更加靈活的管理方式,在某些場景我們需要對消費偏移量有更精準的管理。以保證訊息不被重複消費以及訊息不丟失。 Kafka提供兩種手動提交方式: 1.非同步提交(commitAsync):    非同步模式下,提交失敗也不會嘗試提交。消費者執行

藍芽API介紹基本功能實現

本文已授權微信公眾號fanfan程式媛獨家釋出 ONE,傳統藍芽 BluetoothAdapter:本地藍芽裝置介面卡,用於管理藍芽的開啟/關閉、重新命名、掃描、配對、連線BluetoothClass:藍芽裝置類,用於描述藍芽裝置型別BluetoothDevice:遠端

基於SVG的web頁面圖形繪製API介紹程式設計演示

<div id="blur-image-demo"> <div id="left" style="width:20%;"><img src="woniu.png" alt="Original image" width="325" height="471"></di

Parallels Desktop 12最新版軟體介紹版本資訊

版權歸作者所有,任何形式轉載請聯絡作者。 作者:你們都(來自豆瓣) 來源:https://www.douban.com/note/690309442/ 軟體介紹: Parallels Desktop 13 Mac版是Mac平臺上一款非常經典的Mac虛擬機器軟體,Parallels Des

videoJS播放器嵌入頁面api介紹

1、建立videoJS播放器例項 (1)呼叫swf檔案 <script type="text/javascript">videojs.options.flash.swf = "player/video-js.swf";</script> (2)

Zabbix API介紹使用

zabbix擁有完善的API,基於JSON RPC提供資產,主機,主機組,監控項,告警等方面的介面。在做運維自動化時,需要用API功能對zabbix二次開發。本文我將介紹如何用python信使用zabbix的API。 API介紹 API地址, ht

Unix 網路程式設計(四)- 典型TCP客服伺服器程式開發例項基本套接字API介紹

寫在開頭: 在上一節中我們學習了一些基礎的用來支援網路程式設計的API,包括“套接字的地址結構”、“位元組排序函式”等。這些API幾乎是所有的網路程式設計中都會使用的一些,對於我們正確的編寫網路程式有很大的作用。在本節中我們會介紹編寫一個基於TCP的套接字程式需要的一些AP

D3js-API介紹【英】

power rtg fine ict ndb rri zip vertex href Everything in D3 is scoped under the d3 namespace. D3 uses semantic versioning. You can fi

JavaWeb網上圖書商城完整項目--day03-1.圖書模塊功能介紹相關類創建

class default package ren 書籍 logs main java getc 1 前兩天我們學習了user用戶模塊和圖書的分類模塊,接下來我們學習圖書模塊 圖書模塊的功能主要是下面的功能: 2 接下來我們創建對應的包 我們來看看對應的數據庫表t_bo

C#數據緩存介紹Caching通用幫助類整理

能夠 eric article for generic arr stat ati cti C#緩存主要是為了提高數據的讀取速度。由於server和應用client之間存在著流量的瓶頸,所以讀取大容量數據時,使用緩存來直接為client服務,能夠降低client與serv

kafka中生產者和消費者API

actor 成功 edm icc per class 持久化 spout payment 使用idea實現相關API操作,先要再pom.xml重添加Kafka依賴: <dependency> <groupId>

ssh介紹scp,sftp應用

network 服務器 數據安全 linux 數據包 一、ssh介紹 SSH是 secure Shell Protocol的簡寫,由IETF網絡工作小組(Network Working Group)制定;在進行數據傳輸之前 ,SSH先對聯機數據包通過加密技術進行加密處理,加密後再進行數據

自動化運維之saltstack(二)states介紹使用

配置文件 如何 states master 根目錄 一、什麽是Salt States?Salt States是Salt模塊的擴展,主系統使用的狀態系統叫SLS系統,SLS代表Saltstack State,Salt是一些狀態文件,其中包含有關如何配置Salt子節點的信息,這些狀態被存放在一

mongoDB簡單介紹安裝

疑問 每次 data- .org 存儲 cmd 針對 安裝包 目錄 近期一段時間對mongoDB進行了簡單的學習,從它是什麽?幹什麽?怎麽用?優缺點?這一系列的疑問到如今可以簡單運用。我想須要對其進行簡單的總結和概述。那麽這一篇就從最基礎的開始,對其

關於RestFul API 介紹與實踐

clas 分享 ice div 之前 api 設計 article alt 之前演示的PPT,直接看圖。。。 ?參考鏈接: ?RESTful API 設計最佳實踐 ?RESTful API 設計指南 ?SOAP webserivce和 RESTfulwebse

展示C代碼覆蓋率的gcovr工具簡單介紹相關命令使用演示樣例

文件夾 mes repo 例如 oid else if dir total down (本人正在參加2015博客之星評選,誠邀你來投票,謝謝:username=zhouzxi">http://vote.blog.csdn.net/blogstar2015

架構師之路--搜索業務和技術介紹容錯機制

朋友 單節點 adb 一致性 公司 一個 memcache 消息通知 包括  今天和搜索部門一起做了一下MQ的遷移,順便交流一下業務和技術。發現現在90後小夥都挺不錯。我是指能力和探究心。我家男孩,不招女婿。   在前面的文章中也提到,我們有媒資庫(樂視視頻音頻本身內容)