1. 程式人生 > >編寫Java程式向Kafka叢集生產並消費資料

編寫Java程式向Kafka叢集生產並消費資料

一.Kafka生產資料

1.預備知識:

  • 1.程式設計環境如下:
    01.使用windows的intellij編寫java程式,連線到本地虛擬機器上的kafka叢集,生產和消費資料。
    02.一定要注意配置等問題,否則會導致無法連線到zookeeper和kafka叢集。

2.Kafka直接生產資料

1.程式碼如下:

/*
1.use the Kafka1.0.0 alone don't have any problems.But if i use Kafka1.0.0 with spark2.2.0 and scala
2.11.8,there is a problem.
 */
import org.apache
.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.PartitionInfo; import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.concurrent
.TimeUnit; public class TestProducer { public static void main(String[] args) { Properties props = new Properties();//New configuration file props.put("bootstrap.servers", "192.168.211.3:9092");//you should write specific ip address rather than localhost props.put("key.serializer"
, "org.apache.kafka.common.serialization.StringSerializer");//StringSerializer/IntegerSerializer/or other self-defined Serializer. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //the producer produce to specified topic String topic = "dblab"; Producer<String, String> procuder = new KafkaProducer<String,String>(props); for (int i = 1; i <= 10; i++) { String value = "value: " + i; //produce message through ProducerRecord<string,String> ProducerRecord<String, String> msg = new ProducerRecord<String, String>(topic, value); System.out.println(msg); procuder.send(msg); } System.out.println("send message over."); procuder.close(100,TimeUnit.MILLISECONDS); } }

2.Kafka通過讀取csv檔案生產模擬資料

1.程式碼如下:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;

/*
1.the class ReadCsvAndSendToKafka is Read Csv file And Send this content To Kafka
2.
 */
public class ReadCsvAndSendToKafka {
    public static void main(String[] args) {
        Properties properties = new Properties();//get an properties
        properties.put("bootstrap.servers","192.168.211.3:2181");
        properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(properties);
        try {        
            //the filePath is a varibales  
            if(args.length = 0){
                System.out.println("you should enter a parameter");
                return ;
            }
            //class BufferedReader has a better efficiency than class Reader
            BufferedReader reader = new BufferedReader(new FileReader(args[0]));
            String firstLine = reader.readLine();//read the first line
            System.out.println("The first line is :"+firstLine);
            String value = null;
            String topic = "dblab";//the topic name in kafka is dblab
            while((value=reader.readLine())!=null){
                /*
                1.the message send to kafka is a specific pattern,the pattern is ProducerRecord,it is a key-value pair
                2.the key-value is topic-value
                 */
                ProducerRecord<String, String> message = new ProducerRecord<String, String>(topic,value);
                Thread.sleep(1000);//print every one second
                producer.send(message);
                //System.out.println(value);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3.kafka通過java程式消費資料

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class TestConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers","192.168.211.3:2181");//get a connection to zookeeper.very important!rather than broker's port
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//StringDeserializer
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //properties.put("group.id", "MyGroup");

        Consumer<String,String> consumer  = new KafkaConsumer<String,String>(properties);
        consumer.subscribe(Arrays.asList("dblab","MyTest"));

        while(true){
            ConsumerRecords<String,String> records = consumer.poll(100);
            for(ConsumerRecord re:records){
                System.out.println(re.value());
            }
        }
    }
}