kafka 自定義分割槽例項
阿新 • • 發佈:2019-01-23
第一步:使用./kafka-topics.sh 命令建立topic及partitions 分割槽數
bin/kafka-topics.sh --create --zookeeper 192.168.31.130:2181 --replication-factor 2 --partitions 3 --topic Topic-test
第二步:實現org.apache.kafka.clients.producer.Partitioner
分割槽介面,以實現自定義的訊息分割槽
package com.east.spark.kafka; import java.util.List; import java.util.Map; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MyPartition implements Partitioner { private static Logger LOG = LoggerFactory.getLogger(MyPartition.class); public MyPartition() { // TODO Auto-generated constructor stub } @Override public void configure(Map<String, ?> configs) { // TODO Auto-generated method stub } @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // TODO Auto-generated method stub List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); int partitionNum = 0; try { partitionNum = Integer.parseInt((String) key); } catch (Exception e) { partitionNum = key.hashCode(); } LOG.info("the message sendTo topic:" + topic + " and the partitionNum:" + partitionNum); return Math.abs(partitionNum % numPartitions); } @Override public void close() { // TODO Auto-generated method stub } }
第三步:編寫 producer
package com.east.spark.kafka; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; public class Producer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.31.130:9092,192.168.31.131:9092,192.168.31.132:9092");// 該地址是叢集的子集,用來探測叢集。 props.put("acks", "all");// 記錄完整提交,最慢的但是最大可能的持久化 props.put("retries", 3);// 請求失敗重試的次數 props.put("batch.size", 16384);// batch的大小 props.put("linger.ms", 1);// 預設情況即使緩衝區有剩餘的空間,也會立即傳送請求,設定一段時間用來等待從而將緩衝區填的更多,單位為毫秒,producer傳送資料會延遲1ms,可以減少傳送到kafka伺服器的請求資料 props.put("buffer.memory", 33554432);// 提供給生產者緩衝記憶體總量 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 序列化的方式, props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 設定屬性 自定義分割槽類 props.put("partitioner.class", "com.east.spark.kafka.MyPartition"); KafkaProducer<String, String> producer = new KafkaProducer(props); for (int i = 0; i < 10000; i++) { // 三個引數分別為topic, key,value,send()是非同步的,新增到緩衝區立即返回,更高效。 producer.send(new ProducerRecord<String, String>("Topic-test", Integer.toString(i), Integer.toString(i))); try { Thread.sleep(3000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } producer.close(); } }
第四步:編寫 Consumer
package com.east.spark.kafka; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class Consumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.31.130:9092,192.168.31.131:9092,192.168.31.132:9092");// 該地址是叢集的子集,用來探測叢集。 props.put("group.id", "test");// cousumer的分組id props.put("enable.auto.commit", "true");// 自動提交offsets props.put("auto.commit.interval.ms", "1000");// 每隔1s,自動提交offsets props.put("session.timeout.ms", "30000");// Consumer向叢集傳送自己的心跳,超時則認為Consumer已經死了,kafka會把它的分割槽分配給其他程序 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 反序列化器 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer(props); consumer.subscribe(Arrays.asList("Topic-test"));// 訂閱的topic,可以多個 while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("partition = %d , offset = %d, key = %s, value = %s", record.partition(), record.offset(), record.key(), record.value()); System.out.println(); } } } }
消費端的輸出日誌
partition = 0 , offset = 49, key = 21, value = 21
partition = 0 , offset = 50, key = 0, value = 0
partition = 2 , offset = 40, key = 20, value = 20
partition = 1 , offset = 48, key = 1, value = 1
partition = 2 , offset = 41, key = 2, value = 2
partition = 0 , offset = 51, key = 3, value = 3
partition = 1 , offset = 49, key = 4, value = 4
partition = 2 , offset = 42, key = 5, value = 5
partition = 0 , offset = 52, key = 6, value = 6
partition = 1 , offset = 50, key = 7, value = 7
partition = 2 , offset = 43, key = 8, value = 8
partition = 0 , offset = 53, key = 9, value = 9
partition = 1 , offset = 51, key = 10, value = 10
partition = 2 , offset = 44, key = 11, value = 11
partition = 0 , offset = 54, key = 12, value = 12
partition = 1 , offset = 52, key = 13, value = 13
partition = 2 , offset = 45, key = 14, value = 14
partition = 0 , offset = 55, key = 15, value = 15
partition = 1 , offset = 53, key = 16, value = 16
partition = 2 , offset = 46, key = 17, value = 17
partition = 0 , offset = 56, key = 18, value = 18
partition = 1 , offset = 54, key = 19, value = 19
partition = 2 , offset = 47, key = 20, value = 20
partition = 0 , offset = 57, key = 21, value = 21
partition = 1 , offset = 55, key = 22, value = 22
partition = 2 , offset = 48, key = 23, value = 23
partition = 0 , offset = 58, key = 24, value = 24
partition = 1 , offset = 56, key = 25, value = 25
生產者端的輸出日誌
INFO | the message sendTo topic:Topic-test and the partitionNum:42
INFO | the message sendTo topic:Topic-test and the partitionNum:43
INFO | the message sendTo topic:Topic-test and the partitionNum:44
INFO | the message sendTo topic:Topic-test and the partitionNum:45
INFO | the message sendTo topic:Topic-test and the partitionNum:46
INFO | the message sendTo topic:Topic-test and the partitionNum:47
INFO | the message sendTo topic:Topic-test and the partitionNum:48
INFO | the message sendTo topic:Topic-test and the partitionNum:49
INFO | the message sendTo topic:Topic-test and the partitionNum:50
INFO | the message sendTo topic:Topic-test and the partitionNum:51
INFO | the message sendTo topic:Topic-test and the partitionNum:52
INFO | the message sendTo topic:Topic-test and the partitionNum:53
INFO | the message sendTo topic:Topic-test and the partitionNum:54
INFO | the message sendTo topic:Topic-test and the partitionNum:55
INFO | the message sendTo topic:Topic-test and the partitionNum:56
INFO | the message sendTo topic:Topic-test and the partitionNum:57
INFO | the message sendTo topic:Topic-test and the partitionNum:58
INFO | the message sendTo topic:Topic-test and the partitionNum:59
參考連結:https://my.oschina.net/u/1024107/blog/750146