1. 程式人生 > >Kafka客戶端使用

Kafka客戶端使用

Consumer客戶端

1 消費者模型

在開始編碼之前, 我們先回顧一下一些基本概念。 在Kafka中, 每個topic被分成一組稱為PartitionslogsProducer向這些logs的末尾寫入訊息, Consumer則自己按自己的節奏讀取log。 Kafka通過在一個Consumer Group中分配Partitions來伸縮topic的消費, Consumer Group是一組有同樣Group id的consumers。 下圖表示的是一個擁有3個分割槽的topic, 以及一個有兩個成員的Consumer Group。 每一個分割槽都只會分配給組內的一個成員。
這裡寫圖片描述

舊的Consumer依賴於Zookeeper來進行Group管理, 新的消費者使用kafka內部的Group coordination協議。

對於每個Group, brokers中的一個被選為Group coordinator。 這個coordinator負責管理Group的狀態, 它的主要工作是當新的成員加入, 或者原本的成員離開, 或者topic的元資料發生了改變時, 協調Partition分配。 重新分配Partition這個過程被稱為rebalancing the Group。

當Group第一次初始化時, Consumer通常從分割槽的最開始或者最末尾開始讀取。 每個Partition log中的訊息都是按順序讀取。 隨著Consumer的讀取, 它將會commit它所成功處理的message的offset。 例如下圖中, Consumer的位置位於offset 6, 上一次commit的offset為1。
這裡寫圖片描述

當一個Partition被重分配給組內的另一個Consumer, 其最初位置會被設定成上一次commit的offset。 如果上例中的Consumer突然奔潰了, 那麼接管這個Partition的另外一個組成員會從offset 1繼續消費。 在這種情況下,它會重新處理上一次提交位置到消費者奔潰的位置6的訊息。

這張圖還有兩個log中比較重要的位置。 log end offset是最後一條寫入log的message的offset, 而high watermark是最後一條message成功複製到所有的log replica的message offset。 從Consumer的角度來看, 所知道的最主要的事情就是其最多能夠讀到high watermark的message。 這能夠阻止Consumer讀取到未被複制到其他broker的, 可能會丟失的message。

2 配置和初始化

開始使用Consumer之前, 需要將kafka-clients依賴加入到你的專案中。

<dependency>
 <groupId>org.apache.kafka</groupId>
 <artifactId>kafka-clients</artifactId>
 <version>0.9.0.0-cp1</version>
</dependency>12345

Consumer使用一個Properties file來進行構建。 以下提供了使用Consumer Group所需要的最小配置。

Properties props = new Properties();
props.put("bootstrap.servers""localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);123456

和之前的Consumer, Producer一樣, 我們需要為Consumer提供一個broker的初始化列表, 以用於發現叢集中其他的broker。 這個配置並不需要提供所有的broker, client會從給定的配置中發現所有的當前存活的broker。 這裡我們假設broker執行在本地, 同時Consumer還需要知道如何反序列化message key和value。 最後, 為了加入Consumer Group, 我們需要配置一個Group id。 隨著教程的繼續, 我們會介紹更多的配置。

3 Topic訂閱

為了開始消費, 你必須指定你的應用需要讀取的topics, 在下面的例子中, 我們訂閱了topic foo和bar。

Consumer.subscribe(Arrays.asList(“foo”, “bar”));

訂閱之後, Consumer會與組內其他Consumer協調來獲取其分割槽分配。 這都是在你開始消費資料時自動處理的。 後面我們會說明如何使用assign api來手動分配分割槽, 但是要注意的是, 不能同時混合使用自動和手動的分配。

subscribe方法並不是遞增的: 你必須包含所有你想要消費的topics。 你可以在任何時刻改變你想消費的topic集, 當你呼叫subscribe時, 之前訂閱的topics列表會被新的列表取代。

3 基本的Poll事件迴圈

3.1 Consumer使用了NIO技術

Consumer需要能夠並行獲取資料, 在眾多brokers中獲取多個topic的多個分割槽訊息。 為了實現這個目的, Consumer API的設計成風格類似於unix中的poll或者select呼叫: 一旦topic註冊了, 所有的coordination, rebalancing和data fetch都是由一個處於迴圈中的poll呼叫來驅動。 這樣就提供了一個能在一個執行緒裡處理所有的IO的簡單有效的實現。

3.2 啟動Poll Loop

在你訂閱一個topic之後, 你需要啟動這個event loop以獲得Partition分配和開始獲取資料。 聽起來很複雜, 但是所有你需要做的就只有呼叫poll, 然後Consumer客戶端本身負責處理其他的工作。 每一次poll呼叫都會返回從所分配的Partition獲取的一組訊息(也許是空的)。 下面的例子展示了一個基本的poll迴圈, 列印獲取的records的offset和value。

try {
  while (running) {
    ConsumerRecords<String, String> records = Consumer.poll(1000); // 超時時間1000毫秒
    for (ConsumerRecord<String, String> record : records)
      System.out.println(record.offset() + ": " + record.value());
  }
} finally {
  Consumer.close();
}

poll API根據當前的位置返回records,當Group第一次建立時, 消費開始的位置會被根據reset policy(一般設定成從每個分割槽的最早的offset或者最新的offset開始)來設定。 只要Consumer開始提交offset, 那麼之後的rebalance都會重置消費開始位置到最新的被提交的offset。 傳遞給poll的引數是Consumer在當前位置等待有record返回時需要被阻塞的時間。 一旦有record時, Consumer會立即返回, 如果沒有record, 它將會等待直到超時。

Consumer被設計成只在自己的執行緒中執行, 在沒有外部同步措施的情況下, 在多執行緒中使用時不安全的, 同時也不建議這樣做。 在這個例子中, 我們使用了一個flag來使得當應用停止時能夠從迴圈中跳出。 當這個flag被另一個執行緒設定成false時, pool返回時迴圈會跳出, 無論返回什麼record, 處理過程都會結束。這個的例子使用了一個相對較少的超時時間, 以使得關閉Consumer並不會有太大的延時。

你還可以設定一個較長的timeout, 並且使用wakeup API來使得其從迴圈中跳出。

try {
  while (true) {
    ConsumerRecords<String, String> records = Consumer.poll(Long.MAX_VALUE);
    for (ConsumerRecord<String, String> record : records)
      System.out.println(record.offset() + “: ” + record.value());
  }
} catch (WakeupException e) {
  // ignore for shutdown
} finally {
  Consumer.close();
}

我們將timeout改為了Long.MAX_VALUE, 意味著Consumer會一直阻塞直到有record返回。 和前面設定flag不同, 用於觸發shutdown的執行緒可以呼叫Consumer.wakeup()來中斷一次poll, 使其丟擲WakeupExection。 這個API是執行緒安全的。 注意如果當前沒有活躍的poll, 那麼異常會在下一次poll呼叫時丟擲。 在這個例子中, 我們捕捉這個異常, 阻止其繼續傳播。

4 完整程式碼

在接下來的例子中, 我們將所有的程式碼塊放到一起來構建一個task, 初始化Consumer, 訂閱一個topic列表, 並且執行poll呼叫直到外部關閉它。

public class ConsumerLoop implements Runnable {
  private final KafkaConsumer<String, String> consumer;
  private final List<String> topics;
  private final int id;

  public ConsumerLoop(int id,
                      String groupId, 
                      List<String> topics) {
    this.id = id;
    this.topics = topics;
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put(“group.id”, groupId);
    props.put(“key.deserializer”, StringDeserializer.class.getName());
    props.put(“value.deserializer”, StringDeserializer.class.getName());
    this.consumer = new KafkaConsumer<>(props);
  }

  @Override
  public void run() {
    try {
      consumer.subscribe(topics);

      while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
        for (ConsumerRecord<String, String> record : records) {
          Map<String, Object> data = new HashMap<>();
          data.put("Partition", record.Partition());
          data.put("offset", record.offset());
          data.put("value", record.value());
          System.out.println(this.id + ": " + data);
        }
      }
    } catch (WakeupException e) {
      // ignore for shutdown 
    } finally {
      consumer.close();
    }
  }

  public void shutdown() {
    consumer.wakeup();
  }
}

為了測試這個例子, 你需要一個執行0.9.0.0的kafka發行版的broker, 以及一個提供一些string資料進行消費的topic。 寫入一些string資料的最簡單的方法就是通過kafka-verifiable-producer.sh指令碼。 你需要確保topic有超過一個Partition。 例如對於只有一個執行在localhost的kafka broker和Zookeeper, 你可以在kafka發行包根目錄下執行以下的命令:

# bin/kafka-topics.sh --create --topic Consumer-tutorial --replication-factor 1 --Partitions 3 --zookeeper localhost:2181
# bin/kafka-verifiable-producer.sh --topic Consumer-tutorial --max-messages 200000 --broker-list localhost:9092

然後我們可以用以下程式碼來設定一個有3個成員的Consumer Group, 所有的成員都訂閱我們剛剛建立的topic。

public static void main(String[] args) {
  int numConsumers = 3;
  String groupId = "consumer-tutorial-group"
  List<String> topics = Arrays.asList("consumer-tutorial");
  ExecutorService executor = Executors.newFixedThreadPool(numConsumers);

  final List<ConsumerLoop> consumers = new ArrayList<>();
  for (int i = 0; i < numConsumers; i++) {
    ConsumerLoop consumer = new ConsumerLoop(i, groupId, topics);
    consumers.add(consumer);
    executor.submit(consumer);
  }

  Runtime.getRuntime().addShutdownHook(new Thread() {
    @Override
    public void run() {
      for (ConsumerLoop consumer : consumers) {
        consumer.shutdown();
      } 
      executor.shutdown();
      try {
        executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
      } catch (InterruptedException e) {
        e.printStackTrace;
      }
    }
  });
}

這個例子將三個Runnable Consumer提交到一個executor。 每一個執行緒都有一個單獨的id, 所以你可以看出來哪個執行緒正在接收資料。 當你準備結束程序時, shutdown hook會被呼叫, 它會呼叫wakeup來終止三個執行緒, 並且等待它們關閉。 以下是程式執行的資料:

2: {Partition=0, offset=928, value=2786}
2: {Partition=0, offset=929, value=2789}
1: {Partition=2, offset=297, value=891}
2: {Partition=0, offset=930, value=2792}
1: {Partition=2, offset=298, value=894}
2: {Partition=0, offset=931, value=2795}
0: {Partition=1, offset=278, value=835}
2: {Partition=0, offset=932, value=2798}
0: {Partition=1, offset=279, value=838}
1: {Partition=2, offset=299, value=897}
1: {Partition=2, offset=300, value=900}
1: {Partition=2, offset=301, value=903}
1: {Partition=2, offset=302, value=906}
1: {Partition=2, offset=303, value=909}
1: {Partition=2, offset=304, value=912}
0: {Partition=1, offset=280, value=841}
2: {Partition=0, offset=933, value=2801}

5 消費者存活

每個Consumer都是被分配它所訂閱的topics中所有Partitions的一部分Partitions。 就像是對這些Partitions的Group lock。 只要這個鎖被持有, 組內其他的成員就無法從這些分割槽中讀取資料。 當Consumer還存活著, 這就是你想要的, 這是能夠避免重複消費的方法(加鎖)。 但是如果消費者因為機器或者應用的故障掛掉了, 那麼你希望鎖能夠釋放, 並且分割槽能夠分配給其他存活的Consumer。

Kafka group coordination協議通過心跳機制來解決這個問題。 在每次重分配之後, 當前所有的成員會週期性傳送心跳給Group coordinator。 只要coordinator接收心跳, 它就會假設成員是存活的。 每次接收到心跳, coordinator都會開始(或重置)一個timer。 當timer過期時, 如果還沒有心跳收到, coordinator會將這個成員標記為掛掉, 然後通知其他組內其他成員需要重新分配分割槽。 timer的時間被稱為session timeout, 由客戶端的session.timeout.ms進行設定。

props.put(“session.timeout.ms”, “60000”);

session timeout能夠保證, 當機器或者應用奔潰, 或者網路分割槽導致Consumer與coordinator分隔開來時, 鎖會被釋放。 然而應用故障判斷卻有點討巧, 因為有可能Consumer一直在傳送心跳給coordinator, 但並意味著應用的狀態是正常的。

Consumer的poll api就是設計用於解決這個問題的(傳送心跳), 當你呼叫poll或者其他阻塞API時所有的網路IO都是在前臺處理的, Consumer並不使用任何後臺執行緒。 意味著傳送給coordinator的心跳只會在poll呼叫時被髮送( 譯者注: 這個在0.10.1.0版本改成後臺傳送心跳), 如果應用停止poll( 無論是處理record的程式碼丟擲了異常或者系統奔潰了),那麼心跳將會停止傳送, 那麼session timeout就會過期, 組內的分割槽就會重新分配。

唯一一個你需要注意的問題就是: 如果你處理訊息的時間比session timeout要長的話, 那麼會使得coordinator認為Consumer已經掛了, 導致Partition重新分配。 你應該將session timeout設定的足夠大, 以使得這種情況不會發生。 預設session timeout是30s, 但是將它設定成幾分鐘是不可行的, 這樣就會導致coordinator需要更長的時間來檢測真正的Consumer奔潰。

6. offset提交

kafka為每個分割槽的每條訊息保持偏移量的值,這個偏移量是該分割槽中一條訊息的唯一標示符。也表示消費者在分割槽的位置。也就是說,一個位置是5的消費者,說明已經消費了0到4的訊息(記錄)並下一個接收訊息的偏移量設定為5。關於的消費者,實際上“位置”有2個概念。

  • 消費者將給出下一個訊息的偏移量的位置,這個是消費者在分割槽中能看到的最後的偏移量,消費者收到的資料稱為poll(long)[長輪詢],每次接收訊息,偏移量會自動增長,
  • “已提交”的位置是已安全儲存的最後偏移量,如果處理失敗,這個偏移量會恢復並重新開始。消費者可以自動定期提交偏移量,也可以選擇通過呼叫commitSync來控制,這是阻塞的,直到偏移量提交成功或在提交過程中發生致命的錯誤,commitAsync是非阻塞式的,當成功或失敗時,會引發OffsetCommitCallback。

當一個Consumer Group被建立時, 最開始消費的offset位置是由auto.offset.reset配置來控制的。 一旦Consumer開始消費, 它會根據應用的需要自動提交offset。 每次rebalance,消費開始的position都會被設定成分割槽最後提交的offset。如果Consumer在為已經成功處理的message提交offset之前奔潰了, 那麼重新分配到這個分割槽的Consumer會重複消費這些訊息。 所以, 你提交訊息的頻率越頻繁, 那麼因為奔潰而造成的重複消費就會越少。

到現在,我們都假定開啟了自動提交。 當我們將enable.auto.commit設定成true(預設是true), Consumer會根據auto.commit.interval.ms的設定, 來週期性自動觸發offset提交。 通過減少commit間隔, 你可以減少在奔潰之後消費者需要重新處理的message數量。

為了使用Consumer的commit API,你應該首先通過在Consumer的配置中設定enable.auto.commit為false來禁止自動commit。

props.put(“enable.auto.commit”, “false”);

commit API本身用起來不難, 但是重要的是你如何把它放入到程式碼中。 下面使用同步commit API是手動commit最簡單的方法。

try {
  while (running) {
    ConsumerRecords<String, String> records = Consumer.poll(1000);
    for (ConsumerRecord<String, String> record : records)
      System.out.println(record.offset() + ": " + record.value());

    try {
      Consumer.commitSync();
    } catch (CommitFailedException e) {
      // application specific failure handling
    }
  }
} finally {
  Consumer.close();
}

使用無參的commitSync方法會提交上次poll返回的最新的offset。 這個呼叫是阻塞式的, 直到commit成功或者因為不可恢復的錯誤而失敗。 你需要擔心的最大的問題就是訊息處理時間可能比session timeout長。 當這種情況發生時, coordinator會把這個Consumer踢出Group, 那麼Consumer就會丟擲CommitFailedException。 你的應用應該處理這個錯誤, 回滾自從上次成功提交offset以來的消費的message造成的改變。

一般你應該在訊息被成功處理完後才去提交offset。 如果Consumer在commit傳送之前就奔潰了, 那麼message將會被重新處理。 如果commit策略保證最後提交的offset不會超過當前的消費position, 那麼你就會獲得”at least once(至少一次)”的訊息分發語義。

這裡寫圖片描述

提交的offset超前於當前消費位置

通過改變commit策略來保證當前的消費位置絕對不超過最後一次committed offset 如上圖所示, 那麼你會獲得”at most once(至多一次)”的語義。 如果消費者在處理到最後一次committed offset之前就奔潰了, 那麼所有在這個間隔之間的資料都會丟失, 但是你可以確保沒有訊息會被處理超過一遍。 為了實現這種策略, 我們只需要改變commit和訊息處理的順序。

try {
  while (running) {
  ConsumerRecords<String, String> records = Consumer.poll(1000);

  try {
    Consumer.commitSync();
    for (ConsumerRecord<String, String> record : records)
      System.out.println(record.offset() + ": " + record.value());
    } catch (CommitFailedException e) {
      // application specific failure handling
    }
  }
} finally {
  Consumer.close();
}

使用自動提交為你提供”at least once”的語義, 因為Consumer保證只commit返回給應用的message的offset。你可能會重複處理的message就被限定在, 兩次commit時間間隔(由auto.commit.interval.ms設定)之間你的應用處理的message.

通過使用commit API, 你對可以接受多少重複處理有更好的控制。 在最極端的情況下, 你可以在每次處理完訊息都提交offset。如下面所示:

try {
  while (running) {
    ConsumerRecords<String, String> records = Consumer.poll(1000);

    try {
      for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.offset() + ": " + record.value());
        Consumer.commitSync(Collections.singletonMap(record.Partition(), new OffsetAndMetadata(record.offset() + 1)));
      }
    } catch (CommitFailedException e) {
      // application specific failure handling
    }
  }
} finally {
  Consumer.close();
}12345678910111213141516

在這個例子中, 我們在commitSync方法中顯式的傳遞了我們想要提交的offset. 提交的offset應該是你的應用將要讀取的下一條message的offset. 當呼叫無參的commitSync方法時, 就會提交返回給應用的最後一條message的offset(再加上1), 但是我們不能這樣做, 因為這樣會使得提交的位置超過實際處理的進度。

明顯每消費一條訊息就commit offset並不合適大多數場景, 因為處理執行緒必須阻塞, 直到commit請求從伺服器返回。 這樣會大大減少吞吐量。 一個更加合理的方式就是每N條記錄就提交一次, N可以進行調整以獲得更好的效能。

commitSync方法的引數是一個鍵是topic Partition, 值是offsetAndMetadata例項的map. commit API允許在每次commit時包含一些Metadata, 這些資料可以是記錄commit的時間, 傳送commit的host地址, 或者任何你的應用需要的資訊。 在這個例子中, 我們讓它為空。

相對每次接收到訊息都提交offset, 更加合理的方法就是處理完一個分割槽的訊息後再commit offset. **ConsumerRecords**Collection提供訪問其中包含的Partitions集合的方法, 以及每個Partition的messages的方法。 下面的例子就展示了用法。

try {
  while (running) {
    ConsumerRecords<String, String> records = Consumer.poll(Long.MAX_VALUE);
    for (TopicPartition Partition : records.Partitions()) {
      List<ConsumerRecord<String, String>> partitionRecords = records.records(Partition);
      for (ConsumerRecord<String, String> record : partitionRecords)
        System.out.println(record.offset() + ": " + record.value());

      long lastoffset = partitionRecords.get(partitionRecords.size() - 1).offset();
      Consumer.commitSync(Collections.singletonMap(Partition, new OffsetAndMetadata(lastoffset + 1)));
    }
  }
} finally {
  Consumer.close();
}123456789101112131415

到目前為止, 所有的例子都是關注於同步commit API, 但是Consumer同樣也有一個非同步的API, commitAsync, 使用非同步的commit可以使得你的應用擁有更高的吞吐量, 因為你可以不用等待commit返回, 開始處理下一批message. 需要權衡的就是你可能隨後發現, commit失敗了。 以下的例子展示了基本用法:

try {
  while (running) {
    ConsumerRecords<String, String> records = Consumer.poll(1000);
    for (ConsumerRecord<String, String> record : records)
      System.out.println(record.offset() + ": " + record.value());

    Consumer.commitAsync(new OffsetCommitCallback() {
      @Override
      public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, 
                             Exception exception) {
        if (exception != null) {
          // application specific failure handling
        }
      }
    });
  }
} finally {
  Consumer.close();
}

注意到對於commitAsync我們提供了一個callback, 這個方法在Consumer commit結束時被呼叫(無論commit成功與否), 如果你不需要callback, 那麼你可以呼叫無參的commitAsync

7. 檢視Consumer Group資訊

當一個Consumer Group處於活躍時, 你可以在命令列中使用位於Kafka發行包bin目錄下的Consumer-Groups.sh指令碼獲取Partition assignment和消費進度。

# bin/kafka-consumer-groups.sh –new-consumer –describe –group consumer-tutorial-group –bootstrap-server localhost:9092 1

例子輸出結果如下所示:

GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER
consumer-tutorial-group, consumer-tutorial, 0, 6667, 6667, 0, consumer-1_/127.0.0.1
consumer-tutorial-group, consumer-tutorial, 1, 6667, 6667, 0, consumer-2_/127.0.0.1
consumer-tutorial-group, consumer-tutorial, 2, 6666, 6666, 0, consumer-3_/127.0.0.11234

上面顯示了Consumer Group中所有的Partition assignment, 哪個消費者例項擁有哪個分割槽, 以及最新的committed offset(這裡的current offset)。 lag表示log end offset與最新的committed offset的差值。 管理員可以使用這個命令來監控以保證Consumer Group跟上了Producer的進度。

8. 使用手動分割槽分配

在教程前面提到的, 新的Consumer為一些並不需要Consumer Group的應用場景實現了更底層的訪問。 推薦使用這個API的理由就是它的易用性。 舊的”simple” Consumer同樣也提供了這個功能, 但是需要你自己進行一大堆的異常處理。 使用新的Consumer API, 你只需要分配你想要讀取資料的Partition, 然後開始poll data就可以了。

下面的例子展示瞭如何好使用partitionFor API來將一個topic的所有分割槽分配給一個Consumer.

List<TopicPartition> Partitions = new ArrayList<>();
for (PartitionInfo Partition : Consumer.partitionsFor(topic))
  Partitions.add(new TopicPartition(topic, Partition.Partition()));
Consumer.assign(Partitions);

subscribe方法類似, assign方法需要傳遞你想要讀取的Partition的列表。 一旦Partition被分配了, poll的工作方式就和之前講述的一致。

需要注意的是, 無論是使用一個simple Consumer或者是Consumer Group, 所有的offset commit都會經過Group coordinator。 所以如果你想要commit offsets,你仍然需要設定Group.id, 以防止與其他Consumer發生衝突。 如果一個simple Consumer試著使用和一個活躍的Consumer Group相同的Group id來commit offset, 那麼coordinator會拒絕這個commit(提交的結果就是丟擲CommitFailedException)。 有相同的Group id的simple Consumer卻不會有錯誤。

9. Conclusion

新的Consumer為Kafka社群帶來很多的好處, 包括一個清晰的API, 更好的安全性以及更少的依賴。 這篇教程介紹了基本的應用, 集中於poll的語義,以及使用commit API來控制訊息傳遞語義。 雖然還有很多細節沒有涉及到, 但是這對於開始使用kafka Consumer來說已經足夠了。

Producer客戶端

KafkaProducer(org.apache.kafka.clients.producer.KafkaProducer)是一個用於向kafka叢集傳送資料的Java客戶端。該Java客戶端是執行緒安全的,多個執行緒可以共享同一個producer例項,而且這通常比在多個執行緒中每個執行緒建立一個例項速度要快些。

1 程式碼示例

public class TestProducer {

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.137.200:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //生產者傳送訊息 
        String topic = "mytopic";
        Producer<String, String> procuder = new KafkaProducer<String,String>(props);
        for (int i = 1; i <= 10; i++) {
            String value = "value_" + i;
            ProducerRecord<String, String> msg = new ProducerRecord<String, String>(topic, value);
            procuder.send(msg);
        }
        //列出topic的相關資訊
        List<PartitionInfo> partitions = new ArrayList<PartitionInfo>() ;
        partitions = procuder.partitionsFor(topic);
        for(PartitionInfo p:partitions)
        {
            System.out.println(p);
        }

        System.out.println("send message over.");
        procuder.close(100,TimeUnit.MILLISECONDS);
    }
}

2 配置說明

  • bootstrap.servers: 用於初始化時建立連結到kafka叢集,以host:port形式,多個以逗號分隔host1:port1,host2:port2。
  • acks: 生產者需要server端在接收到訊息後,進行反饋確認的尺度,主要用於訊息的可靠性傳輸;acks=0表示生產者不需要來自server的確認;acks=1表示server端將訊息儲存後即可傳送ack,而不必等到其他follower角色的都收到了該訊息;acks=all(or acks=-1)意味著server端將等待所有的副本都被接收後才傳送確認。
  • retries: 生產者傳送失敗後,重試的次數。
  • batch.size: 當多條訊息傳送到同一個partition時,該值控制生產者批量傳送訊息的大小,批量傳送可以減少生產者到服務端的請求數,有助於提高客戶端和服務端的效能。
  • linger.ms: 預設情況下緩衝區的訊息會被立即傳送到服務端,即使緩衝區的空間並沒有被用完。可以將該值設定為大於0的值,這樣傳送者將等待一段時間後,再向服務端傳送請求,以實現每次請求可以儘可能多的傳送批量訊息。 batch.size和linger.ms是兩種實現讓客戶端每次請求儘可能多的傳送訊息的機制,它們可以並存使用,並不衝突。
  • buffer.memory:生產者緩衝區的大小,儲存的是還未來得及傳送到server端的訊息,如果生產者的傳送速度大於訊息被提交到server端的速度,該緩衝區將被耗盡。
  • key.serializer,value.serializer:說明了使用何種序列化方式將使用者提供的key和vaule值序列化成位元組。

producer包含一個用於儲存待發送訊息的緩衝池,緩衝池中訊息是還沒來得及傳輸到kafka叢集的訊息。位於底層的kafka I/O執行緒負責將緩衝池中的訊息轉換成請求傳送到叢集。如果在結束produce時,沒有呼叫close()方法,那麼這些資源會發生洩露。

3 訊息傳送方法send

/*
 *record:key-value形式的待發送資料
 *callback:到傳送的訊息被borker端確認後的回撥函式
 */
public Future<RecordMetadata> send(ProducerRecord<K,V> record); // Equivalent to send(record, null)
public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback);

send方法負責將緩衝池中的訊息非同步的傳送到broker的指定topic中。非同步傳送是指,方法將訊息儲存到底層待發送的I/O快取後,將立即返回,這可以實現並行無阻塞的傳送更多訊息。send方法的返回值是RecordMetadata型別,它含有訊息將被投遞的partition資訊,該條訊息的offset,以及時間戳。因為send返回的是Future物件,因此在該物件上呼叫get()方法將阻塞,直到相關的傳送請求完成並返回元資料資訊;或者在傳送時丟擲異常而退出。

阻塞傳送的方法如下:

String key = "Key";
String value = "Value";
ProducerRecord<String, String> record = new ProducerRecord<String, String>(key, value);
producer.send(record).get();
ProducerRecord<String,String> record = new ProducerRecord<String,String>("the-topic", key, value);
producer.send(myRecord, new Callback() {
                    public void onCompletion(RecordMetadata metadata, Exception e) {
                        if(e != null) {
                            e.printStackTrace();
                        } else {
                            System.out.println("The offset of the record we just sent is: " + metadata.offset());
                        }
                    }
                });