kafka消費者、生產者,Java實現
阿新 • • 發佈:2019-01-07
參考:https://www.cnblogs.com/zlslch/p/5966004.html
1、KafkaProducerOps.java
//執行成功 import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.io.IOException; import java.io.InputStream; import java.util.Properties; import java.util.Random; public class KafkaProducerOps { public static void main(String[] args) throws IOException { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.0.129:9092,192.168.0.129:9093,192.168.0.129:9094"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1");//16M props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); /** * 兩個泛型引數 * 第一個泛型引數:指的就是kafka中一條記錄key的型別 * 第二個泛型引數:指的就是kafka中一條記錄value的型別 */ String[] girls = new String[]{"姚慧瑩", "劉向前", "周 新", "楊柳"}; Producer<String, String> producer = new KafkaProducer<String, String>(props); String topic = "mytest";//props.getProperty(Constants.KAFKA_PRODUCER_TOPIC); String key = "1"; String value = "今天的姑娘們很美111"; ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, key, value); producer.send(producerRecord); producer.close(); } }
2、KafkaConsumerOps.java
import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Properties; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class KafkaConsumerOps { public static void main(String[] args)throws IOException{ Properties props = new Properties(); props.put("bootstrap.servers", "192.168.0.129:9092"); props.put("group.id", "logGroup"); // 自動確認設定 // props.put("enable.auto.commit", "true"); props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); Consumer<String,String> consu = new KafkaConsumer<String,String>(props); Collection<String> topics = Arrays.asList("mytest"); //消費者訂閱topic consu.subscribe(topics); ConsumerRecords<String,String>consumerRecords = null; while(true){ //接下來就要從topic中拉去資料 consumerRecords = consu.poll(1000); //遍歷每一條記錄 for(ConsumerRecord<String, String> consumerRecord : consumerRecords){ long offset = consumerRecord.offset(); int partition = consumerRecord.partition(); Object key = consumerRecord.key(); Object value = consumerRecord.value(); System.out.println(offset+" "+partition+" "+key+" "+value); } } } }
先執行消費者程式,再執行生產者程式,在消費者終端可以看到如下的輸出:
6 0 1 今天的姑娘們很美111