Kafka學習(三)簡單例項(可以簡單做測試)
阿新 • • 發佈:2018-11-08
java客戶端連線kafka簡單測試
本案例kafka版本是kafka_2.11-0.9.0.1,用java來實現kafka生產者、消費者的示例
在測試的過程中遇到的特別的問題以及解決辦法,其他小問題就不一一列舉了。
1 . 使用kafka-clients進行測試,maven依賴
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> </dependency>
需要注意:kafka-clients的版本必須和kafka安裝的版本一致
2 . Consumer測試類收不到訊息的原因:
①. 先開啟consumer測試類,等待poll資料。
②. 開啟producer測試類,傳送資料,並確保傳送成功。
③. 正常情況下這樣就可以收到資料了。生產者示例
public class ProducerTest { private void execMsgSend() throws Exception{ Properties props = new Properties(); props.put("bootstrap.servers", "阿里雲外網Ip:9092"); props.put("acks", "1"); props.put("retries", 0); props.put("batch.size", 16384); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> procuder = new KafkaProducer<String, String>(props); String topic = "test"; for (int i = 1; i <= 10; i++) { String value = " this is another message_" + i; ProducerRecord<String,String> record = new ProducerRecord<String, String>(topic,i+"",value); procuder.send(record,new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { System.out.println("message send to partition " + metadata.partition() + ", offset: " + metadata.offset()); } }); System.out.println(i+" ---- success"); Thread.sleep(1000); } System.out.println("send message over."); procuder.close(); } public static void main(String[] args) throws Exception{ ProducerTest test1 = new ProducerTest(); test1.execMsgSend(); } }
消費者示例
public class Consumer { public static void main(String[] s){ Properties props = new Properties(); props.put("bootstrap.servers", "阿里雲外網Ip:9092"); props.put("group.id", "1"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test")); while (true) { System.out.println("poll start..."); ConsumerRecords<String, String> records = consumer.poll(100); int count = records.count(); System.out.println("the numbers of topic:" + count); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); } } }