1. 程式人生 > >基於kafka_2.11-2.1.0實現的生產者和消費者代碼樣例

基於kafka_2.11-2.1.0實現的生產者和消費者代碼樣例

如果 -s 分享圖片 定時 ots i++ broker lose 錯誤

1、搭建部署好zookeeper集群和kafka集群,這裏省略。

1 啟動zk:
2 bin/zkServer.sh start conf/zoo.cfg。
3 驗證zk是否啟動成功:
4 bin/zkServer.sh status conf/zoo.cfg。
5 啟動kafka:
6 bin/kafka-server-start.sh -daemon config/server.properties。

2、生產者和消費者代碼如下所示:

 1 package com.bie.kafka.producer;
 2 
 3 import java.util.Properties;
4 5 import org.apache.kafka.clients.producer.KafkaProducer; 6 import org.apache.kafka.clients.producer.Producer; 7 //import org.apache.kafka.clients.producer.ProducerConfig; 8 import org.apache.kafka.clients.producer.ProducerRecord; 9 10 /** 11 * 12 * @Description TODO 13 * @author biehl
14 * @Date 2019年4月6日 上午11:27:34 15 * 16 */ 17 public class ProducerTest { 18 19 public static void main(String[] args) { 20 // 構造一個java.util.Properties對象 21 Properties props = new Properties(); 22 // 指定bootstrap.servers屬性。必填,無默認值。用於創建向kafka broker服務器的連接。 23 props.put("
bootstrap.servers", "192.168.110.130:9092,192.168.110.131:9092,192.168.110.132:9092"); 24 // 指定key.serializer屬性。必填,無默認值。被發送到broker端的任何消息的格式都必須是字節數組。 25 // 因此消息的各個組件都必須首先做序列化,然後才能發送到broker。該參數就是為消息的key做序列化只用的。 26 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 27 // 指定value.serializer屬性。必填,無默認值。和key.serializer類似。此被用來對消息體即消息value部分做序列化。 28 // 將消息value部分轉換成字節數組。 29 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 30 //acks參數用於控制producer生產消息的持久性(durability)。參數可選值,0、1、-1(all)。 31 props.put("acks", "-1"); 32 //props.put(ProducerConfig.ACKS_CONFIG, "1"); 33 //在producer內部自動實現了消息重新發送。默認值0代表不進行重試。 34 props.put("retries", 3); 35 //props.put(ProducerConfig.RETRIES_CONFIG, 3); 36 //調優producer吞吐量和延時性能指標都有非常重要作用。默認值16384即16KB。 37 props.put("batch.size", 323840); 38 //props.put(ProducerConfig.BATCH_SIZE_CONFIG, 323840); 39 //控制消息發送延時行為的,該參數默認值是0。表示消息需要被立即發送,無須關系batch是否被填滿。 40 props.put("linger.ms", 10); 41 //props.put(ProducerConfig.LINGER_MS_CONFIG, 10); 42 //指定了producer端用於緩存消息的緩沖區的大小,單位是字節,默認值是33554432即32M。 43 props.put("buffer.memory", 33554432); 44 //props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); 45 props.put("max.block.ms", 3000); 46 //props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3000); 47 //設置producer段是否壓縮消息,默認值是none。即不壓縮消息。GZIP、Snappy、LZ4 48 //props.put("compression.type", "none"); 49 //props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none"); 50 //該參數用於控制producer發送請求的大小。producer端能夠發送的最大消息大小。 51 //props.put("max.request.size", 10485760); 52 //props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10485760); 53 //producer發送請求給broker後,broker需要在規定時間範圍內將處理結果返還給producer。默認30s 54 //props.put("request.timeout.ms", 60000); 55 //props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000); 56 57 58 // 使用上面創建的Properties對象構造KafkaProducer對象 59 //如果采用這種方式創建producer,那麽就不需要顯示的在Properties中指定key和value序列化類了呢。 60 // Serializer<String> keySerializer = new StringSerializer(); 61 // Serializer<String> valueSerializer = new StringSerializer(); 62 // Producer<String, String> producer = new KafkaProducer<String, String>(props, 63 // keySerializer, valueSerializer); 64 Producer<String, String> producer = new KafkaProducer<>(props); 65 for (int i = 0; i < 100; i++) { 66 //構造好kafkaProducer實例以後,下一步就是構造消息實例。 67 producer.send(new ProducerRecord<>("topic1", Integer.toString(i), Integer.toString(i))); 68 // 構造待發送的消息對象ProduceRecord的對象,指定消息要發送到的topic主題,分區以及對應的key和value鍵值對。 69 // 註意,分區和key信息可以不用指定,由kafka自行確定目標分區。 70 //ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("my-topic", 71 // Integer.toString(i), Integer.toString(i)); 72 // 調用kafkaProduce的send方法發送消息 73 //producer.send(producerRecord); 74 } 75 System.out.println("消息生產結束......"); 76 // 關閉kafkaProduce對象 77 producer.close(); 78 System.out.println("關閉生產者......"); 79 } 80 81 }

消費者代碼如下所示:

 1 package com.bie.kafka.consumer;
 2 
 3 import java.util.Arrays;
 4 import java.util.Properties;
 5 
 6 import org.apache.kafka.clients.consumer.ConsumerRecord;
 7 import org.apache.kafka.clients.consumer.ConsumerRecords;
 8 import org.apache.kafka.clients.consumer.KafkaConsumer;
 9 
10 /**
11  * 
12  * @Description TODO
13  * @author biehl
14  * @Date 2019年4月6日 下午8:12:28
15  *
16  */
17 public class ConsumerTest {
18 
19     public static void main(String[] args) {
20         String topicName = "topic1";
21         String groupId = "group1";
22         //構造java.util.Properties對象
23         Properties props = new Properties();
24         // 必須指定屬性。
25         props.put("bootstrap.servers", "192.168.110.130:9092,192.168.110.131:9092,192.168.110.132:9092");
26         // 必須指定屬性。
27         props.put("group.id", groupId);
28         props.put("enable.auto.commit", "true");
29         props.put("auto.commit.interval.ms", "1000");
30         // 從最早的消息開始讀取
31         props.put("auto.offset.reset", "earliest");
32         // 必須指定
33         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
34         // 必須指定
35         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
36 
37         // 使用創建的Properties實例構造consumer實例
38         KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
39         // 訂閱topic。調用kafkaConsumer.subscribe方法訂閱consumer group所需的topic列表
40         consumer.subscribe(Arrays.asList(topicName));
41         try {
42             while (true) {
43                 //循環調用kafkaConsumer.poll方法獲取封裝在ConsumerRecord的topic消息。
44                 ConsumerRecords<String, String> records = consumer.poll(1000);
45                 //獲取到封裝在ConsumerRecords消息以後,處理獲取到ConsumerRecord對象。
46                 for (ConsumerRecord<String, String> record : records) {
47                     //簡單的打印輸出
48                     System.out.println(
49                             "offset = " + record.offset() 
50                             + ",key = " + record.key() 
51                             + ",value =" + record.value());
52                 }
53             }
54         } catch (Exception e) {
55             //關閉kafkaConsumer
56             System.out.println("消息消費結束......");
57             consumer.close();
58         }
59         System.out.println("關閉消費者......");
60     }
61 }

遇到的坑,一開始報的錯誤莫名其妙,一開始以為使用的jar包版本問題,又是報slf4j的錯誤,又是報log4j的錯誤,又是報空指針的異常。最後百度意外遇到了可能是本地沒有將ip地址放到hosts文件裏面,果然是這個問題。

技術分享圖片

添加如下所示即可:

技術分享圖片

然後就可以開心的生產消息和消費消息了啊。開心。

項目結構如下所示:

技術分享圖片

待續.....

基於kafka_2.11-2.1.0實現的生產者和消費者代碼樣例