1. 程式人生 > >kafka叢集搭建和使用Java寫kafka生產者消費者

kafka叢集搭建和使用Java寫kafka生產者消費者

http://czj4451.iteye.com/blog/2041096

server.properties

需要配置

broker.id=110

host.name=192.168.1.108

zookeeper.connect=192.168.1.108:2181

log.dirs=/usr/local/kafka_2.10-0.8.2.0/logs

1 kafka叢集搭建

Java程式碼  收藏程式碼
  1. 1.zookeeper叢集  搭建在110, 111,112  
  2. 2.kafka使用3個節點110, 111,112  
  3. 修改配置檔案config/server.properties  
  4. broker.id=110
      
  5. host.name=192.168.1.110  
  6. log.dirs=/usr/local/kafka_2.10-0.8.2.0/logs  
  7. 複製到其他兩個節點,然後修改對應節點上的config/server.pro   
  8. 3.啟動,在三個節點分別執行  
  9. bin/kafka-server-start.sh  config/server.properties >/dev/null 2>&1 &  
  10. 4 建立主題  
  11. bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3
     --topic test  
  12. 5 檢視主題詳細  
  13. bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test  
  14.  --topic test  
  15. Topic:test      PartitionCount:3        ReplicationFactor:3     Configs:  
  16.         Topic: test     Partition: 0    Leader: 110     Replicas: 110,111,112  Isr: 110,111,112  
  17.         Topic: test     Partition: 1
        Leader: 111     Replicas: 111,112,110  Isr: 111,112,110  
  18.         Topic: test     Partition: 2    Leader: 112     Replicas: 112,110,111  Isr: 112,110,111  
  19. 6 去zk上看kafka叢集  
  20. [zk: localhost:2181(CONNECTED) 5] ls /  
  21. [admin, zookeeper, consumers, config, controller, zk-fifo, storm, brokers, controller_epoch]  
  22. [zk: localhost:2181(CONNECTED) 6] ls /brokers   ----> 檢視註冊在zk內的kafka  
  23. [topics, ids]  
  24. [zk: localhost:2181(CONNECTED) 7] ls /brokers/ids  
  25. [112110111]  
  26. [zk: localhost:2181(CONNECTED) 8] ls /brokers/ids/112  
  27. []  
  28. [zk: localhost:2181(CONNECTED) 9] ls /brokers/topics   
  29. [test]  
  30. [zk: localhost:2181(CONNECTED) 10] ls /brokers/topics/test   
  31. [partitions]  
  32. [zk: localhost:2181(CONNECTED) 11] ls /brokers/topics/test/partitions  
  33. [210]  
  34. [zk: localhost:2181(CONNECTED) 12]   


2  kafka java呼叫:

2.1 java端生產資料, kafka叢集消費資料:

Java程式碼  收藏程式碼
  1. 1 建立maven工程,pom.xml中增加如下:  
  2.  <dependency>  
  3.         <groupId>org.apache.kafka</groupId>  
  4.         <artifactId>kafka_2.10</artifactId>  
  5.         <version>0.8.2.0</version>  
  6.     </dependency>  
  7. 2 java程式碼:  向主題test內寫入資料  
  8. import java.util.Properties;  
  9. import java.util.concurrent.TimeUnit;  
  10. import kafka.javaapi.producer.Producer;  
  11. import kafka.producer.KeyedMessage;  
  12. import kafka.producer.ProducerConfig;  
  13. import kafka.serializer.StringEncoder;  
  14. public class kafkaProducer extends Thread{  
  15.     private String topic;  
  16.     public kafkaProducer(String topic){  
  17.         super();  
  18.         this.topic = topic;  
  19.     }  
  20.     @Override  
  21.     public void run() {  
  22.         Producer producer = createProducer();  
  23.         int i=0;  
  24.         while(true){  
  25.             producer.send(new KeyedMessage<Integer, String>(topic, "message: " + i++));  
  26.             try {  
  27.                 TimeUnit.SECONDS.sleep(1);  
  28.             } catch (InterruptedException e) {  
  29.                 e.printStackTrace();  
  30.             }  
  31.         }  
  32.     }  
  33.     private Producer createProducer() {  
  34.         Properties properties = new Properties();  
  35.         properties.put("zookeeper.connect""192.168.1.110:2181,192.168.1.111:2181,192.168.1.112:2181");//宣告zk  
  36.         properties.put("serializer.class", StringEncoder.class.getName());  
  37.         properties.put("metadata.broker.list""192.168.1.110:9092,192.168.1.111:9093,192.168.1.112:9094");// 宣告kafka broker  
  38.         return new Producer<Integer, String>(new ProducerConfig(properties));  
  39.      }  
  40.     public static void main(String[] args) {  
  41.         new kafkaProducer("test").start();// 使用kafka叢集中建立好的主題 test   
  42.     }  
  43. }  
  44. 3  kafka叢集中消費主題test的資料:  
  45. [root@h2master kafka]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginnin  
  46. 4   啟動java程式碼,然後在看叢集消費的資料如下:  
  47. message: 0  
  48. message: 1  
  49. message: 2  
  50. message: 3  
  51. message: 4  
  52. message: 5  
  53. message: 6  
  54. message: 7  
  55. message: 8  
  56. message: 9  
  57. message: 10  
  58. message: 11  
  59. message: 12  
  60. message: 13  
  61. message: 14  
  62. message: 15  
  63. message: 16  
  64. message: 17  
  65. message: 18  
  66. message: 19  
  67. message: 20  
  68. message: 21  

 3 kafka 使用Java寫消費者,這樣 先執行kafkaProducer ,在執行kafkaConsumer,即可得到生產者的資料:

Java程式碼  收藏程式碼
  1. import java.util.HashMap;  
  2. import java.util.List;  
  3. import java.util.Map;  
  4. import java.util.Properties;  
  5. import kafka.consumer.Consumer;  
  6. import kafka.consumer.ConsumerConfig;  
  7. import kafka.consumer.ConsumerIterator;  
  8. import kafka.consumer.KafkaStream;  
  9. import kafka.javaapi.consumer.ConsumerConnector;  
  10. /** 
  11.  * 接收資料 
  12.  * 接收到: message: 10 
  13. 接收到: message: 11 
  14. 接收到: message: 12 
  15. 接收到: message: 13 
  16. 接收到: message: 14 
  17.  * @author zm 
  18.  * 
  19.  */  
  20. public class kafkaConsumer extends Thread{  
  21.     private String topic;  
  22.     public kafkaConsumer(String topic){  
  23.         super();  
  24.         this.topic = topic;  
  25.     }  
  26.     @Override  
  27.     public void run() {  
  28.         ConsumerConnector consumer = createConsumer();  
  29.         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
  30.         topicCountMap.put(topic, 1); // 一次從主題中獲取一個數據  
  31.          Map<String, List<KafkaStream<byte[], byte[]>>>  messageStreams = consumer.createMessageStreams(topicCountMap);  
  32.          KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 獲取每次接收到的這個資料  
  33.          ConsumerIterator<byte[], byte[]> iterator =  stream.iterator();  
  34.          while(iterator.hasNext()){  
  35.              String message = new String(iterator.next().message());  
  36.              System.out.println("接收到: " + message);  
  37.          }  
  38.     }  
  39.     private ConsumerConnector createConsumer() {  
  40.         Properties properties = new Properties();  
  41.         properties.put("zookeeper.connect""192.168.1.110:2181,192.168.1.111:2181,192.168.1.112:2181");//宣告zk  
  42.         properties.put("zookeeper.session.timeout.ms", "10000");
            properties.put("group.id", "0");// 必須要使用別的組名稱, 如果生產者和消費者都在同一組,則不能訪問同一組內的topic資料  同一組,則不能訪問同一組內的topic資料  
  43.         return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));  
  44.      }  
  45.     public static void main(String[] args) {  
  46.         new kafkaConsumer("test").start();// 使用kafka叢集中建立好的主題 test   
  47.     }  
  48. }