使用 kafka 的java客戶端進行訊息的傳送與接收通訊操作
阿新 • • 發佈:2019-01-08
kafka的傳送端:
package com.zwz.test; import kafka.Kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class KafkaProducerDemo extends Thread{ private KafkaProducer<Integer,String> producer; private String topic; public KafkaProducerDemo(String topic){ Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.159.138:9092" ); //設定kafka的連線和埠 properties.put( ProducerConfig.CLIENT_ID_CONFIG,"KafkaProducerDemo" ); // properties.put( ProducerConfig.ACKS_CONFIG,"-1" ); properties.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.IntegerSerializer" ); properties.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer" ); producer = new KafkaProducer<Integer,String>( properties ); this.topic = topic; } @Override public void run(){ int num = 0; while(num<50){ String message = "message_"+num; System.out.println( "begin send message" + message ); producer.send( new ProducerRecord<Integer,String>(topic,message) ); num++; try{ Thread.sleep(1000); }catch(Exception e){ e.printStackTrace(); } } } public static void main(String[] args) { new KafkaProducerDemo("test").start(); } }
kafka的接收端:
package com.zwz.test; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Collections; import java.util.Properties; public class KafkaConsumerDemo extends Thread { private KafkaConsumer kafkaConsumer; public KafkaConsumerDemo( String topic ){ Properties prop = new Properties(); prop.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.159.138:9092" ); // prop.put( ConsumerConfig.GROUP_ID_CONFIG,"KafkaConsumerDemo" ); //分組設定 prop.put( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true" ); // prop.put( ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000" ); //設定間隔時間 prop.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.IntegerDeserializer" ); //反序列化的類 prop.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer" ); // prop.put( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest" ); //設定接收訊息從最前面開始 kafkaConsumer = new KafkaConsumer( prop ); kafkaConsumer.subscribe( Collections.singletonList( topic ) ); } @Override public void run() { while(true) { // super.run(); ConsumerRecords<Integer,String> consumerRecord = kafkaConsumer.poll(1000); for( ConsumerRecord record :consumerRecord ){ System.out.println( "message receive:"+record.value() ); } } } public static void main(String[] args) { new KafkaConsumerDemo("test").start(); } }
在kafka 的訊息傳送端的 ProducerConfig.ACKS_CONFIG(acks),"-1" 這個設定的引數有下面的幾個作用
當引數是0 的時候 表示訊息傳送給broker 以後,不需要進行確認(效能較高,但是會出現資料丟失的情況)
當引數是1 的時候 表示只需要獲得 kafka 叢集中的 leader 節點的確認即可返回 (leader、follower)
all(-1) 需要ISR 中的所有 Replica 去進行確認(需要叢集當中所有的節點進行確認),最安全的,但是也可能會出現資料丟失的情況
AUTO_OFFSET_RESET_CONFIG
對於新的groupid來說,如果設定為 earliest,那麼他會從最早的訊息開始消費
latest 對於新的groupid來說,直接取已經消費並且已經提交的最大 offset
earliest 對於新的 groupid來說,如果設定為 earliest,那麼他會從最早的訊息開始消費,重置 offset
none