1. 程式人生 > >kafka6 編寫使用自定義分割槽的生產者

kafka6 編寫使用自定義分割槽的生產者

一 客戶端

在上一篇部落格建立的簡單生產者的基礎上,進行兩個修改操作:

1.新建SimplePartitioner.java,修改返回分割槽為1。

SimplePartitioner.java程式碼如下

package cn.test.mykafka;

import java.util.Map;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

/**
 * 簡單分割槽函式
 *
 */

public class
SimplePartitioner implements Partitioner { @Override public void configure(Map<String, ?> configs) { // TODO Auto-generated method stub } @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// TODO Auto-generated method stub return 1; //設定返回分割槽1 } @Override public void close() { // TODO Auto-generated method stub } }

2.複製SimpleProducer.java為PartitionerProducer.java,修改3處:

增加一個partitioner.class配置;主題改為test-topic2;訊息改為hello world to partition 1 from win7 client。

 1 package cn.test.mykafka;
 2 
 3 import java.util.Properties;
 4 
 5 import org.apache.kafka.clients.producer.KafkaProducer;
 6 import org.apache.kafka.clients.producer.Producer;
 7 import org.apache.kafka.clients.producer.ProducerRecord;
 8 
 9 /**
10  * 使用自定義分割槽的生產者
11  *
12  */
13 
14 public class PartitionerProducer {
15 
16     public static void main(String[] args) {
17         
18          //建立配置資訊
19          Properties props = new Properties();
20          props.put("bootstrap.servers", "192.168.42.133:9092"); //指定broker的節點和埠
21          props.put("acks", "all");
22          props.put("retries", 0);
23          props.put("batch.size", 16384);
24          props.put("linger.ms", 1);
25          props.put("buffer.memory", 33554432);
26          props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
27          props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
28          props.put("partitioner.class","cn.test.mykafka.SimplePartitioner"); //自定義分割槽
29          
30          //建立一個生產者
31          Producer<String, String> producer = new KafkaProducer<>(props);
32          
33          //傳送訊息
34          ProducerRecord<String, String> msg = new ProducerRecord<String, String>("test-topic2","hello world to partition 1 from win7 client");
35          producer.send(msg);
36          //for (int i = 0; i < 10; i++)
37          //   producer.send(new ProducerRecord<String, String>("test-topic", Integer.toString(i), Integer.toString(i))); //topic,key,value 
38             
39          System.out.println("over");
40          producer.close();
41     }
42 }
PartitionerProducer.java

 

二 伺服器端

1.搭建單節點單broker的kafka。具體步驟看這裡

2.啟動伺服器

啟動zookeeper

[[email protected] kafka]# zookeeper-server-start.sh config/zookeeper.properties
[[email protected] kafka]# jps #開啟另一個終端檢視是否啟動成功
3892 Jps
3566 QuorumPeerMain

啟動kafka

[[email protected] kafka]# kafka-server-start.sh config/server.properties 

3.建立topic

#建立2個分割槽,1個副本的主題
[[email protected] kafka]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test-topic2
Created topic "test-topic2".  

4.啟動消費者

[[email protected] kafka]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic2 --from-beginning 

 

三 測試傳送訊息

1.在eclipse執行程式碼,傳送訊息。

2.檢視消費者是否接收到訊息。

如上消費者接收到訊息,說明訊息傳送成功。

3.檢視訊息是否傳送到分割槽1。

[[email protected] ~]# cd /tmp/kafka-logs/
[[email protected] kafka-logs]# ls #顯示主題test-topic2的2個分割槽
... test-topic2-0  test-topic2-1 ...
[[email protected] kafka-logs]# cat test-topic2-0/00000000000000000000.log #檢視分割槽0,顯示為空
[[email protected] kafka-logs]# cat test-topic2-1/00000000000000000000.log #檢視分割槽1,顯示訊息
c)g??????????????bVhello world to partition 1 from win7 client[[email protected] kafka-logs]# 

如上顯示訊息成功寫入分割槽1,說明自定義分割槽函式生效。