1. 程式人生 > >java實現kafka單機版測試

java實現kafka單機版測試

這哥們的文章寫的很好,http://my.oschina.net/ielts0909/blog/93190   學習kafka可以讀一讀

我的系統是centos7(64位)

java環境是:


kafka安裝目錄:


需要修改config目錄下的server.properties

host.name=192.168.3.224(本機ip)

log.dirs=/opt/local/kafka-0.8.1.1-src/logs(日誌路徑-自定義)

然後是啟動:bin/zookeeper-server-start.sh config/zookeeper.properties  &

                  bin/kafka-server-start.sh config/server.properties &

檢視是否啟動成功,可以檢視9092埠和2181埠


建立test主題:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

開啟生產者:bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

開啟消費者:bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

在生產者輸入內容,消費者就會馬上看到

下面是java實現的傳送訊息和消費訊息

java生產者:


import java.util.Date;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class TestProducer {
	public static void main(String[] args) {
 
        // 設定配置屬性
        Properties props = new Properties();
        props.put("metadata.broker.list","192.168.3.224:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        // key.serializer.class預設為serializer.class
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");
        // 可選配置,如果不配置,則使用預設的partitioner
//        props.put("partitioner.class", "com.catt.kafka.demo.PartitionerDemo");
        // 觸發acknowledgement機制,否則是fire and forget,可能會引起資料丟失
        // 值為0,1,-1,可以參考
        // http://kafka.apache.org/08/configuration.html
        props.put("request.required.acks", "1");
        ProducerConfig config = new ProducerConfig(props);
 
        // 建立producer
        Producer<String, String> producer = new Producer<String, String>(config);
        // 產生併發送訊息
        long start=System.currentTimeMillis();
        long runtime = new Date().getTime();
        String ip = "192.168.3.224" ;	//rnd.nextInt(255);
        String msg = runtime + "小張666777" + ip;
        //如果topic不存在,則會自動建立,預設replication-factor為1,partitions為0
        KeyedMessage<String, String> data = new KeyedMessage<String, String>(
                "test456", ip, msg);
        producer.send(data);
        System.out.println("耗時:" + (System.currentTimeMillis() - start));
        // 關閉producer
        producer.close();
    }
}

java消費者:

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class Consumer extends Thread {
	private final ConsumerConnector consumer;
	private final String topic;
	private final String name;

	public Consumer(String name, String topic) {
		consumer = kafka.consumer.Consumer
				.createJavaConsumerConnector(createConsumerConfig());
		this.topic = topic;
		this.name = name;
	}

	private static ConsumerConfig createConsumerConfig() {
		Properties props = new Properties();
		props.put("zookeeper.connect","192.168.3.224:2181");
		props.put("group.id","jd-group");
		props.put("zookeeper.session.timeout.ms", "60000");
		props.put("zookeeper.sync.time.ms", "2000");
		props.put("auto.commit.interval.ms", "1000");
		// 每次最少接收的位元組數,預設是1
		// props.put("fetch.min.bytes", "1024");
		// 每次最少等待時間,預設是100
		// props.put("fetch.wait.max.ms", "600000");
		return new ConsumerConfig(props);
	}

	public void run() {
		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
		topicCountMap.put(topic, new Integer(1));
		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
				.createMessageStreams(topicCountMap);
		KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
		ConsumerIterator<byte[], byte[]> it = stream.iterator();
		while (it.hasNext()) {
			System.out.println("************" + name + "	gets	"
					+ new String(it.next().message()));
		}
	}
}

public class KafkaConsumerDemo {
	public static void main(String[] args) {
		 Consumer consumerThread1 = new Consumer("Consumer1","test123");

		 consumerThread1.start();
	}
}