1. 程式人生 > >Kafka分割槽機制介紹與示例

Kafka分割槽機制介紹與示例

Kafka中可以將Topic從物理上劃分成一個或多個分割槽(Partition),每個分割槽在物理上對應一個資料夾,以”topicName_partitionIndex”的命名方式命名,該資料夾下儲存這個分割槽的所有訊息(.log)和索引檔案(.index),這使得Kafka的吞吐率可以水平擴充套件。

生產者在生產資料的時候,可以為每條訊息指定Key,這樣訊息被髮送到broker時,會根據分割槽規則選擇被儲存到哪一個分割槽中,如果分割槽規則設定的合理,那麼所有的訊息將會被均勻的分佈到不同的分割槽中,這樣就實現了負載均衡和水平擴充套件。另外,在消費者端,同一個消費組可以多執行緒併發的從多個分割槽中同時消費資料(後續將介紹這塊)。

上面所說的分割槽規則,是實現了kafka.producer.Partitioner介面的類,可以自定義。比如,下面的程式碼SimplePartitioner中,將訊息的key做了hashcode,然後和分割槽數(numPartitions)做模運算,使得每一個key都可以分佈到一個分割槽中:

  1. package com.lxw1234.kafka;
  2. import kafka.producer.Partitioner;
  3. import kafka.utils.VerifiableProperties;
  4. public class SimplePartitioner implements Partitioner {
  5. public SimplePartitioner (VerifiableProperties props) {
  6. }
  7. @Override
  8. public int partition(Object key, int numPartitions) {
  9. int partition = 0;
  10. String k = (String)key;
  11. partition = Math.abs(k.hashCode()) % numPartitions;
  12. return partition;
  13. }
  14. }

在建立Topic時候可以使用–partitions <numPartitions>指定分割槽數。也可以在server.properties配置檔案中配置引數num.partitions來指定預設的分割槽數。

但有一點需要注意,為Topic建立分割槽時,分割槽數最好是broker數量的整數倍,這樣才能是一個Topic的分割槽均勻的分佈在整個Kafka叢集中,假設我的Kafka叢集由4個broker組成,以下圖為例:

kafka partition

建立帶分割槽的Topic

現在建立一個topic “lxw1234”,為該topic指定4個分割槽,那麼這4個分割槽將會在每個broker上各分佈一個:

  1. ./kafka-topics.sh
  2. --create
  3. --zookeeper zk1:2181,zk2:2181,zk3:2181
  4. --replication-factor 1
  5. --partitions 4
  6. --topic lxw1234

kafka partition

這樣所有的分割槽就均勻分佈在叢集中,如果建立topic時候指定了3個分割槽,那麼就有一個broker上沒有該topic的分割槽。

帶分割槽規則的生產者

現在用一個生產者示例(PartitionerProducer),向Topic lxw1234中傳送訊息。該生產者使用的分割槽規則,就是上面的SimplePartitioner。從0-10一共11條訊息,每條訊息的key為”key”+index,訊息內容為”key”+index+”–value”+index。比如:key0–value0、key1–value1、、、key10–value10。

  1. package com.lxw1234.kafka;
  2. import java.util.Properties;
  3. import kafka.javaapi.producer.Producer;
  4. import kafka.producer.KeyedMessage;
  5. import kafka.producer.ProducerConfig;
  6. public class PartitionerProducer {
  7. public static void main(String[] args) {
  8. Properties props = new Properties();
  9. props.put("serializer.class", "kafka.serializer.StringEncoder");
  10. props.put("metadata.broker.list", "127.0.0.17:9091,127.0.0.17:9092,127.0.0.102:9091,127.0.0.102:9092");
  11. props.put("partitioner.class", "com.lxw1234.kafka.SimplePartitioner");
  12. Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props));
  13. String topic = "lxw1234";
  14. for(int i=0; i<=10; i++) {
  15. String k = "key" + i;
  16. String v = k + "--value" + i;
  17. producer.send(new KeyedMessage<String, String>(topic,k,v));
  18. }
  19. producer.close();
  20. }
  21. }

理論上來說,生產者在傳送訊息的時候,會按照SimplePartitioner的規則,將key0做hashcode,然後和分割槽數(4)做模運算,得到分割槽索引:

hashcode(”key0”) % 4 = 1

hashcode(”key1”) % 4 = 2

hashcode(”key2”) % 4 = 3

hashcode(”key3”) % 4 = 0

         ……

對應的訊息將會被髮送至相應的分割槽中。

統計各分割槽訊息的消費者

下面的消費者程式碼用來驗證,在消費資料時,打印出訊息所在的分割槽及訊息內容:

  1. package com.lxw1234.kafka;
  2. import java.util.HashMap;
  3. import java.util.List;
  4. import java.util.Map;
  5. import java.util.Properties;
  6. import kafka.consumer.Consumer;
  7. import kafka.consumer.ConsumerConfig;
  8. import kafka.consumer.ConsumerIterator;
  9. import kafka.consumer.KafkaStream;
  10. import kafka.javaapi.consumer.ConsumerConnector;
  11. import kafka.message.MessageAndMetadata;
  12. public class MyConsumer {
  13. public static void main(String[] args) {
  14. String topic = "lxw1234";
  15. ConsumerConnector consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());
  16. Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
  17. topicCountMap.put(topic, new Integer(1));
  18. Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
  19. KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
  20. ConsumerIterator<byte[], byte[]> it = stream.iterator();
  21. while(it.hasNext()) {
  22. MessageAndMetadata<byte[], byte[]> mam = it.next();
  23. System.out.println("consume: Partition [" + mam.partition() + "] Message: [" + new String(mam.message()) + "] ..");
  24. }
  25. }
  26. private static ConsumerConfig createConsumerConfig() {
  27. Properties props = new Properties();
  28. props.put("group.id","group1");
  29. props.put("zookeeper.connect","127.0.0.132:2181,127.0.0.133:2182,127.0.0.134:2183");
  30. props.put("zookeeper.session.timeout.ms", "400");
  31. props.put("zookeeper.sync.time.ms", "200");
  32. props.put("auto.commit.interval.ms", "1000");
  33. props.put("auto.offset.reset", "smallest");
  34. return new ConsumerConfig(props);
  35. }
  36. }

執行程式驗證結果

先啟動消費者,再執行生產者。

之後在消費者的控制檯可以看到如下輸出:

kafka partition

結果和正常預期一致。