Kafka Producer APIs

舊版的Procuder API有兩種:kafka.producer.SyncProducer和kafka.producer.async.AsyncProducer.它們都實現了同一個介面:

  1. class Producer {
  2. /* 將訊息傳送到指定分割槽 */
  3. public void send(kafka.javaapi.producer.ProducerData<K,V> producerData);
  4. /* 批量傳送一批訊息 */
  5. public void send(java.util.List<kafka.javaapi.producer.ProducerData<K,V>> producerData);
  6. /* 關閉producer */
  7. public void close();
  8. }

新版的Producer API提供了以下功能:

  1. 可以將多個訊息快取到本地佇列裡,然後非同步的批量傳送到broker,可以通過引數producer.type=async做到。快取的大小可以通過一些引數指定:queue.timebatch.size。一個後臺執行緒((kafka.producer.async.ProducerSendThread)從佇列中取出資料並讓kafka.producer.EventHandler將訊息傳送到broker,也可以通過引數event.handler定製handler,在producer端處理資料的不同的階段註冊處理器,比如可以對這一過程進行日誌追蹤,或進行一些監控。只需實現kafka.producer.async.CallbackHandler介面,並在callback.handler中配置。
  2. 自己編寫Encoder來序列化訊息,只需實現下面這個介面。預設的Encoder是kafka.serializer.DefaultEncoder
    1. interface Encoder<T> {
    2. public Message toMessage(T data);
    3. }
  3. 提供了基於Zookeeper的broker自動感知能力,可以通過引數zk.connect實現。如果不使用Zookeeper,也可以使用broker.list引數指定一個靜態的brokers列表,這樣訊息將被隨機的傳送到一個broker上,一旦選中的broker失敗了,訊息傳送也就失敗了。
  4. 通過分割槽函式kafka.producer.Partitioner類對訊息分割槽
    1. interface Partitioner<T> {
    2. int partition(T key, int numPartitions);
    3. }

    分割槽函式有兩個引數:key和可用的分割槽數量,從分割槽列表中選擇一個分割槽並返回id。預設的分割槽策略是hash(key)%numPartitions.如果key是null,就隨機的選擇一個。可以通過引數partitioner.class定製分割槽函式。

新的api完整例項如下:

  1. import java.util.*;
  2. import kafka.javaapi.producer.Producer;
  3. import kafka.producer.KeyedMessage;
  4. import kafka.producer.ProducerConfig;
  5. public class TestProducer {
  6. public static void main(String[] args) {
  7. long events = Long.parseLong(args[0]);
  8. Random rnd = new Random();
  9. Properties props = new Properties();
  10. props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");
  11. props.put("serializer.class", "kafka.serializer.StringEncoder");
  12. props.put("partitioner.class", "example.producer.SimplePartitioner");
  13. props.put("request.required.acks", "1");
  14. ProducerConfig config = new ProducerConfig(props);
  15. Producer<String, String> producer = new Producer<String, String>(config);
  16. for (long nEvents = 0; nEvents < events; nEvents++) {
  17. long runtime = new Date().getTime();
  18. String ip = “192.168.2.” + rnd.nextInt(255);
  19. String msg = runtime + “,www.example.com,” + ip;
  20. KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
  21. producer.send(data);
  22. }
  23. producer.close();
  24. }
  25. }

下面這個是用到的分割槽函式:

  1. import kafka.producer.Partitioner;
  2. import kafka.utils.VerifiableProperties;
  3. public class SimplePartitioner implements Partitioner<String> {
  4. public SimplePartitioner (VerifiableProperties props) {
  5. }
  6. public int partition(String key, int a_numPartitions) {
  7. int partition = 0;
  8. int offset = key.lastIndexOf('.');
  9. if (offset > 0) {
  10. partition = Integer.parseInt( key.substring(offset+1)) % a_numPartitions;
  11. }
  12. return partition;
  13. }
  14. }

KafKa Consumer APIs

Consumer API有兩個級別。低級別的和一個指定的broker保持連線,並在接收完訊息後關閉連線,這個級別是無狀態的,每次讀取訊息都帶著offset。

高級別的API隱藏了和brokers連線的細節,在不必關心服務端架構的情況下和服務端通訊。還可以自己維護消費狀態,並可以通過一些條件指定訂閱特定的topic,比如白名單黑名單或者正則表示式。

低級別的API

  1. class SimpleConsumer {
  2. /*向一個broker傳送讀取請求並得到訊息集 */
  3. public ByteBufferMessageSet fetch(FetchRequest request);
  4. /*向一個broker傳送讀取請求並得到一個相應集 */
  5. public MultiFetchResponse multifetch(List<FetchRequest> fetches);
  6. /**
  7. * 得到指定時間之前的offsets
  8. * 返回值是offsets列表,以倒序排序
  9. * @param time: 時間,毫秒,
  10. *              如果指定為OffsetRequest$.MODULE$.LATIEST_TIME(), 得到最新的offset.
  11. *              如果指定為OffsetRequest$.MODULE$.EARLIEST_TIME(),得到最老的offset.
  12. */
  13. public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
  14. }

低級別的API是高級別API實現的基礎,也是為了一些對維持消費狀態有特殊需求的場景,比如Hadoop consumer這樣的離線consumer。

高級別的API

  1. /* 建立連線 */
  2. ConsumerConnector connector = Consumer.create(consumerConfig);
  3. interface ConsumerConnector {
  4. /**
  5. * 這個方法可以得到一個流的列表,每個流都是MessageAndMetadata的迭代,通過MessageAndMetadata可以拿到訊息和其他的元資料(目前之後topic)
  6. *  Input: a map of <topic, #streams>
  7. *  Output: a map of <topic, list of message streams>
  8. */
  9. public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap);
  10. /**
  11. * 你也可以得到一個流的列表,它包含了符合TopicFiler的訊息的迭代,
  12. * 一個TopicFilter是一個封裝了白名單或黑名單的正則表示式。
  13. */
  14. public List<KafkaStream> createMessageStreamsByFilter(
  15. TopicFilter topicFilter, int numStreams);
  16. /* 提交目前消費到的offset */
  17. public commitOffsets()
  18. /* 關閉連線 */
  19. public shutdown()
  20. }

這個API圍繞著由KafkaStream實現的迭代器展開,每個流代表一系列從一個或多個分割槽多和broker上匯聚來的訊息,每個流由一個執行緒處理,所以客戶端可以在建立的時候通過引數指定想要幾個流。一個流是多個分割槽多個broker的合併,但是每個分割槽的訊息只會流向一個流。

每呼叫一次createMessageStreams都會將consumer註冊到topic上,這樣consumer和brokers之間的負載均衡就會進行調整。API鼓勵每次呼叫建立更多的topic流以減少這種調整。createMessageStreamsByFilter方法註冊監聽可以感知新的符合filter的tipic。