kafka6 編寫使用自定義分割槽的生產者
阿新 • • 發佈:2018-11-12
一 客戶端
在上一篇部落格建立的簡單生產者的基礎上,進行兩個修改操作:
1.新建SimplePartitioner.java,修改返回分割槽為1。
SimplePartitioner.java程式碼如下
package cn.test.mykafka; import java.util.Map; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; /** * 簡單分割槽函式 * */ public classSimplePartitioner implements Partitioner { @Override public void configure(Map<String, ?> configs) { // TODO Auto-generated method stub } @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// TODO Auto-generated method stub return 1; //設定返回分割槽1 } @Override public void close() { // TODO Auto-generated method stub } }
2.複製SimpleProducer.java為PartitionerProducer.java,修改3處:
增加一個partitioner.class配置;主題改為test-topic2;訊息改為hello world to partition 1 from win7 client。
1 package cn.test.mykafka; 2 3 import java.util.Properties; 4 5 import org.apache.kafka.clients.producer.KafkaProducer; 6 import org.apache.kafka.clients.producer.Producer; 7 import org.apache.kafka.clients.producer.ProducerRecord; 8 9 /** 10 * 使用自定義分割槽的生產者 11 * 12 */ 13 14 public class PartitionerProducer { 15 16 public static void main(String[] args) { 17 18 //建立配置資訊 19 Properties props = new Properties(); 20 props.put("bootstrap.servers", "192.168.42.133:9092"); //指定broker的節點和埠 21 props.put("acks", "all"); 22 props.put("retries", 0); 23 props.put("batch.size", 16384); 24 props.put("linger.ms", 1); 25 props.put("buffer.memory", 33554432); 26 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 27 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 28 props.put("partitioner.class","cn.test.mykafka.SimplePartitioner"); //自定義分割槽 29 30 //建立一個生產者 31 Producer<String, String> producer = new KafkaProducer<>(props); 32 33 //傳送訊息 34 ProducerRecord<String, String> msg = new ProducerRecord<String, String>("test-topic2","hello world to partition 1 from win7 client"); 35 producer.send(msg); 36 //for (int i = 0; i < 10; i++) 37 // producer.send(new ProducerRecord<String, String>("test-topic", Integer.toString(i), Integer.toString(i))); //topic,key,value 38 39 System.out.println("over"); 40 producer.close(); 41 } 42 }PartitionerProducer.java
二 伺服器端
1.搭建單節點單broker的kafka。具體步驟看這裡。
2.啟動伺服器
啟動zookeeper
[[email protected] kafka]# zookeeper-server-start.sh config/zookeeper.properties [[email protected] kafka]# jps #開啟另一個終端檢視是否啟動成功 3892 Jps 3566 QuorumPeerMain
啟動kafka
[[email protected] kafka]# kafka-server-start.sh config/server.properties
3.建立topic
#建立2個分割槽,1個副本的主題 [[email protected] kafka]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test-topic2 Created topic "test-topic2".
4.啟動消費者
[[email protected] kafka]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic2 --from-beginning
三 測試傳送訊息
1.在eclipse執行程式碼,傳送訊息。
2.檢視消費者是否接收到訊息。
如上消費者接收到訊息,說明訊息傳送成功。
3.檢視訊息是否傳送到分割槽1。
[[email protected] ~]# cd /tmp/kafka-logs/ [[email protected] kafka-logs]# ls #顯示主題test-topic2的2個分割槽 ... test-topic2-0 test-topic2-1 ... [[email protected] kafka-logs]# cat test-topic2-0/00000000000000000000.log #檢視分割槽0,顯示為空 [[email protected] kafka-logs]# cat test-topic2-1/00000000000000000000.log #檢視分割槽1,顯示訊息 c)g??????????????bVhello world to partition 1 from win7 client[[email protected] kafka-logs]#
如上顯示訊息成功寫入分割槽1,說明自定義分割槽函式生效。