Kafka JavaApi中消費者與生產者的配置
阿新 • • 發佈:2018-12-24
檔案目錄如下:
1.ConsumerDemo配置
package com.course.test; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Arrays; import java.util.Properties; public class ConsumerDemo { public static void main(String[] args) { Properties pros = new Properties(); pros.put("bootstrap.servers","localhost:9092"); pros.put("group.id","test"); // 用來表示consumer程序所在組的一個字串,如果設定同樣的group_id,表示這些程序都是屬於同一個consumer——group pros.put("enable.auto.commit","true"); // 如果設定為true,consumer所接收到的訊息的offset將會自動同步到zookeeper pros.put("auto.commit.interval.ms","1000"); // consumer向zookeeper提交offset的頻率,單位是秒 pros.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); pros.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pros); consumer.subscribe(Arrays.asList("my_test")); while (true){ ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String,String>record:records){ System.out.printf("offset = %d, key = %s , value = %s%n",record.offset(),record.key(),record.value()); } } } }
2.ProducerDemo配置
package com.course.test; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class ProducerDemo { public static void main(String[] args) { Properties prop = new Properties(); prop.put("bootstrap.servers","localhost:9092"); prop.put("acks","all"); // 生產者需要server接收到資料之後,要發出一個確認接收的訊號 // 0 producer不需要等待任何確認的訊息 // 1 意味著至少要等待leader已經成功將資料寫入本地log,並不意味著所有follower已經寫入 // all 意味著leader需要等待所有備份都成功寫入到日誌中 prop.put("retries",0); // 重試次數 // 比如有兩條訊息, 1 和 2 。1先來,但是如果1傳送失敗了,重試次數為1.2就會接著傳送資料,然後1再發一次,這樣會改變訊息傳送的順序 prop.put("buffer.memory",33554432); // 快取大小 prop.put("batch.size",1000); // producer試圖批量處理訊息記錄。目的是減少請求次數,改善客戶端和服務端之間的效能。 // 這個配置是控制批量處理訊息的位元組數。如果設定為0,則禁用批處理。如果設定過大,會佔用記憶體空間. prop.put("linger.ms",1); prop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); prop.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<String, String>(prop); for (int i = 0 ; i < 100 ; i++){ producer.send(new ProducerRecord<String, String>("my_test", Integer.toString(i+1),Integer.toString(i))); } producer.close(); } }
然後我們可以先執行ProducerDemo,再執行ConsumerDemo,可以看到生產者傳送的資訊。