1. 程式人生 > >使用java建立kafka的生產者和消費者

使用java建立kafka的生產者和消費者

    建立一個Kafka的主題,連線到zk叢集,副本因子3,分割槽3,主題名是test111
        [[email protected] kafka]# bin/kafka-topics.sh --create --zookeeper h5:2181 --topic test111 --replication-factor 3 --partitions 3
    檢視Kafka的主題詳情
        [[email protected] kafka]# bin/kafka-topics.sh --describe --zookeeper h5:2181 --topic test111
    檢視Kafka所有的主題    
        [

[email protected] kafka]# bin/kafka-topics.sh --list --zookeeper h5:2181
    刪除Kafka指定的主題    
        [[email protected] kafka]# bin/kafka-topics.sh --delete --zookeeper h5:2181,h6:2181,h7:2181 --topic test111
        如刪除時提示
        Topic guowang1 is marked for deletion.
   Note: This will have no impact if delete.topic.enable is not set to true.

      請修改Kafka/config/server.properties新增delete.topic.enable=true
        kafka_2.10-0.8.2.0.jar
        kafka-clients-0.8.2.0.jar
        metrics-core-2.2.0.jar
        scala-library-2.10.4.jar
        zkclient-0.3.jar
        zookeeper-3.4.6.jar
    
    1、生產者        

複製程式碼
 1 package storm.test.kafka;
 2 
 3         import
java.util.Properties; 4 5 import kafka.javaapi.producer.Producer; 6 import kafka.producer.KeyedMessage; 7 import kafka.producer.ProducerConfig; 8 import kafka.serializer.StringEncoder; 9 10 public class TestProducer { 11 12 public static void main(String[] args) throws Exception { 13 Properties prop = new Properties(); 14 prop.put("zookeeper.connect", "h5:2181,h6:2181,h7:2181"); 15 prop.put("metadata.broker.list", "h5:9092,h6:9092,h7:9092"); 16 prop.put("serializer.class", StringEncoder.class.getName()); 17 Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(prop)); 18 int i = 0; 19 while(true){ 20 producer.send(new KeyedMessage<String, String>("test111", "msg:"+i++)); 21 Thread.sleep(1000); 22 } 23 } 24 25 }
複製程式碼


    2、消費者        

複製程式碼
 1 package storm.test.kafka;
 2 
 3         import java.util.HashMap;
 4         import java.util.List;
 5         import java.util.Map;
 6         import java.util.Properties;
 7 
 8         import kafka.consumer.Consumer;
 9         import kafka.consumer.ConsumerConfig;
10         import kafka.consumer.ConsumerIterator;
11         import kafka.consumer.KafkaStream;
12         import kafka.javaapi.consumer.ConsumerConnector;
13         import kafka.serializer.StringEncoder;
14 
15         public class TestConsumer {
16 
17             static final String topic = "test111";
18             
19             public static void main(String[] args) {
20                 Properties prop = new Properties();
21                 prop.put("zookeeper.connect", "h5:2181,h6:2181,h7:2181");
22                 prop.put("serializer.class", StringEncoder.class.getName());
23                 prop.put("metadata.broker.list", "h5:9092,h6:9092,h7:9092");
24                 prop.put("group.id", "group1");
25                 ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(prop));
26                 Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
27                 topicCountMap.put(topic, 1);
28                 Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
29                 final KafkaStream<byte[], byte[]> kafkaStream = messageStreams.get(topic).get(0);
30                 ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
31                 while (iterator.hasNext()) {
32                     String msg = new String(iterator.next().message());
33                     System.out.println("收到訊息:"+msg);
34                 }
35             }
36 
37         }
複製程式碼