1. 程式人生 > >kafka叢集java測試程式碼

kafka叢集java測試程式碼

參照了其他的帖子之後,最近我自己也在學kafka,自己寫了一個demo,但是發現有好多地方都卡住了,寫了這篇帖子就是希望各位看官在學習的時候能少走彎路,今天搞了我一天,我把我所遇到的問題列出來,供大家參考。

我是自己在虛擬機器裡面搭建好了kafka的叢集,測試通過了,然後準備用java程式碼寫一個demo程式來測試看能不能通過。廢話不多說,開始吧。

很多帖子都是採用maven的形式,我這裡是直接建立了一個java專案,src下面建立一個lib資料夾,把jar包匯入進去。在這裡jar包就是一個坑,我剛開始是以為只要一個kafka的jar就可以了的,但是後面程式碼都寫完了,也沒有報錯,但是啟動的時候死活都是各種報錯,鬱悶了半天,於是懷疑是不是少了什麼東西,於是就開始各種百度,谷歌。結果也沒有看出個所以然。後來還是在網上看到了一哥們寫的,但是他所給的jar包不全,沒有引用json的jar包。消費者或接受不到,這裡補充一下。還有就是這裡面的很多方法都過期了,對於過期的方法要慎用,因為以後萬一升級,這些方法在以後的版本如果不支援了的話,現在寫的程式碼就需要重新寫,很麻煩。


java 程式碼:

生產者程式碼

package kafka;


import java.util.Properties;
import java.util.concurrent.TimeUnit;


import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;


public class KafkaProducer extends Thread {
private String topic;

public KafkaProducer(String topic){
super();
this.topic = topic;
}

@Override
public void run() {
Producer producer = createProducer();
int i=0;
while(true){
KeyedMessage t=new KeyedMessage(topic, "message: " + i);
producer.send(t);
System.out.println("傳送了: " + i);
try {
TimeUnit.SECONDS.sleep(1);
i++;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


private Producer createProducer() {
Properties properties = new Properties();
properties.put("zookeeper.connect", "172.16.12.252:2181,172.16.12.253:2181,172.16.12.254:2181");//宣告zk
properties.put("serializer.class", StringEncoder.class.getName());
properties.put("metadata.broker.list", "172.16.12.252:9092,172.16.12.253:9092,172.16.12.254:9092");// 宣告kafka broker
return new Producer(new ProducerConfig(properties));
}
public static void main(String[] args) {
new KafkaProducer("test").start();// 使用kafka叢集中建立好的主題 test 
new KafkaConsumer("test").start();// 使用kafka叢集中建立好的主題 test   
}
}

消費者程式碼

package kafka;


import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;


import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;


public class KafkaConsumer extends Thread {
private String topic;  
    
    public KafkaConsumer(String topic){  
        super();  
        this.topic = topic;  
    }  
      
      
    @Override  
    public void run() {  
        ConsumerConnector consumer = createConsumer();  
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
        topicCountMap.put(topic, 1); // 一次從主題中獲取一個數據  
         Map<String, List<KafkaStream<byte[], byte[]>>>  messageStreams = consumer.createMessageStreams(topicCountMap);  
         KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 獲取每次接收到的這個資料  
         ConsumerIterator<byte[], byte[]> iterator =  stream.iterator();  
         while(iterator.hasNext()){  
             String message = new String(iterator.next().message());  
             System.out.println("接收到: " + message);  
         }  
    }  
  
    private ConsumerConnector createConsumer() {  
        Properties properties = new Properties();  
        properties.put("zookeeper.connect", "172.16.12.252:2181,172.16.12.253:2181,172.16.12.254:2181");//宣告zk  
        properties.put("group.id", "group1");// 必須要使用別的組名稱, 如果生產者和消費者都在同一組,則不能訪問同一組內的topic資料  
        return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));  
     }  
}