1. 程式人生 > >Apache kafka客戶端開發demo

Apache kafka客戶端開發demo

該部落格轉載自:http://www.aboutyun.com/thread-9906-1-1.html

1.依賴包

  1.         <dependency>
  2.             <groupId>org.apache.kafka</groupId>
  3.             <artifactId>kafka_2.10</artifactId>
  4.             <version>0.8.1</version>
  5.         </dependency>
複製程式碼

2.producer程式開發例子
2.1 producer引數說明

  1. #指定kafka節點列表,用於獲取metadata,不必全部指定
  2. metadata.broker.list=192.168.2.105:9092,192.168.2.106:9092
  3. # 指定分割槽處理類。預設kafka.producer.DefaultPartitioner,表通過key雜湊到對應分割槽
  4. #partitioner.class=com.meituan.mafka.client.producer.CustomizePartitioner
  5. # 是否壓縮,預設0表示不壓縮,1表示用gzip壓縮,2表示用snappy壓縮。壓縮後訊息中會有頭來指明訊息壓縮型別,故在消費者端訊息解壓是透明的無需指定。
  6. compression.codec=none
  7. # 指定序列化處理類(mafka client API呼叫說明-->3.序列化約定wiki),預設為kafka.serializer.DefaultEncoder,即byte[]
  8. serializer.class=com.meituan.mafka.client.codec.MafkaMessageEncoder
  9. # serializer.class=kafka.serializer.DefaultEncoder
  10. # serializer.class=kafka.serializer.StringEncoder
  11. # 如果要壓縮訊息,這裡指定哪些topic要壓縮訊息,預設empty,表示不壓縮。
  12. #compressed.topics=
  13. ########### request ack ###############
  14. # producer接收訊息ack的時機.預設為0. 
  15. # 0: producer不會等待broker傳送ack 
  16. # 1: 當leader接收到訊息之後傳送ack 
  17. # 2: 當所有的follower都同步訊息成功後傳送ack. 
  18. request.required.acks=0 
  19. # 在向producer傳送ack之前,broker允許等待的最大時間 
  20. # 如果超時,broker將會向producer傳送一個error ACK.意味著上一次訊息因為某種 
  21. # 原因未能成功(比如follower未能同步成功) 
  22. request.timeout.ms=10000
  23. ########## end #####################
  24. # 同步還是非同步傳送訊息,預設“sync”表同步,"async"表非同步。非同步可以提高發送吞吐量,
  25. # 也意味著訊息將會在本地buffer中,並適時批量傳送,但是也可能導致丟失未傳送過去的訊息
  26. producer.type=sync
  27. ############## 非同步傳送 (以下四個非同步引數可選) ####################
  28. # 在async模式下,當message被快取的時間超過此值後,將會批量傳送給broker,預設為5000ms
  29. # 此值和batch.num.messages協同工作.
  30. queue.buffering.max.ms = 5000
  31. # 在async模式下,producer端允許buffer的最大訊息量
  32. # 無論如何,producer都無法儘快的將訊息傳送給broker,從而導致訊息在producer端大量沉積
  33. # 此時,如果訊息的條數達到閥值,將會導致producer端阻塞或者訊息被拋棄,預設為10000
  34. queue.buffering.max.messages=20000
  35. # 如果是非同步,指定每次批量傳送資料量,預設為200
  36. batch.num.messages=500
  37. # 當訊息在producer端沉積的條數達到"queue.buffering.max.meesages"後 
  38. # 阻塞一定時間後,佇列仍然沒有enqueue(producer仍然沒有傳送出任何訊息) 
  39. # 此時producer可以繼續阻塞或者將訊息拋棄,此timeout值用於控制"阻塞"的時間 
  40. # -1: 無阻塞超時限制,訊息不會被拋棄 
  41. # 0:立即清空佇列,訊息被拋棄 
  42. queue.enqueue.timeout.ms=-1
  43. ################ end ###############
  44. # 當producer接收到error ACK,或者沒有接收到ACK時,允許訊息重發的次數 
  45. # 因為broker並沒有完整的機制來避免訊息重複,所以當網路異常時(比如ACK丟失) 
  46. # 有可能導致broker接收到重複的訊息,預設值為3.
  47. message.send.max.retries=3
  48. # producer重新整理topic metada的時間間隔,producer需要知道partition leader的位置,以及當前topic的情況 
  49. # 因此producer需要一個機制來獲取最新的metadata,當producer遇到特定錯誤時,將會立即重新整理 
  50. # (比如topic失效,partition丟失,leader失效等),此外也可以通過此引數來配置額外的重新整理機制,預設值600000 
  51. topic.metadata.refresh.interval.ms=60000
複製程式碼


  1. import java.util.*;  
  2. import kafka.javaapi.producer.Producer;  
  3. import kafka.producer.KeyedMessage;  
  4. import kafka.producer.ProducerConfig;  
  5. public class TestProducer {  
  6.     public static void main(String[] args) {  
  7.         long events = Long.parseLong(args[0]);  
  8.         Random rnd = new Random();  
  9.         Properties props = new Properties();  
  10.         props.put("metadata.broker.list", "192.168.2.105:9092");  
  11.         props.put("serializer.class", "kafka.serializer.StringEncoder"); //預設字串編碼訊息  
  12.         props.put("partitioner.class", "example.producer.SimplePartitioner");  
  13.         props.put("request.required.acks", "1");  
  14.         ProducerConfig config = new ProducerConfig(props);  
  15.         Producer<String, String> producer = new Producer<String, String>(config);  
  16.         for (long nEvents = 0; nEvents < events; nEvents++) {   
  17.                long runtime = new Date().getTime();    
  18.                String ip = “192.168.2.” + rnd.nextInt(255);   
  19.                String msg = runtime + “,www.example.com,” + ip;   
  20.                KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);  
  21.                producer.send(data);  
  22.         }  
  23.         producer.close();  
  24.     }  
  25. }  
複製程式碼


2.2 指定關鍵字key,傳送訊息到指定partitions
說明:如果需要實現自定義partitions訊息傳送,需要實現Partitioner介面
  1. public class CustomizePartitioner implements Partitioner {  
  2.     public CustomizePartitioner(VerifiableProperties props) {  
  3.     }  
  4.     /** 
  5.      * 返回分割槽索引編號 
  6.      * @param key sendMessage時,輸出的partKey 
  7.      * @param numPartitions topic中的分割槽總數 
  8.      * @return 
  9.      */  
  10.     @Override  
  11.     public int partition(Object key, int numPartitions) {  
  12.         System.out.println("key:" + key + "  numPartitions:" + numPartitions);  
  13.         String partKey = (String)key;  
  14.         if ("part2".equals(partKey))  
  15.             return 2;  
  16. //        System.out.println("partKey:" + key);  
  17.         ........  
  18.         ........  
  19.         return 0;  
  20.     }  
  21. }  
複製程式碼


3.consumer程式開發例子
3.1 consumer引數說明
  1. # zookeeper連線伺服器地址,此處為線下測試環境配置(kafka訊息服務-->kafka broker叢集線上部署環境wiki)
  2. # 配置例子:"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
  3. zookeeper.connect=192.168.2.225:2181,192.168.2.225:2182,192.168.2.225:2183/config/mobile/mq/mafka
  4. # zookeeper的session過期時間,預設5000ms,用於檢測消費者是否掛掉,當消費者掛掉,其他消費者要等該指定時間才能檢查到並且觸發重新負載均衡
  5. zookeeper.session.timeout.ms=5000
  6. zookeeper.connection.timeout.ms=10000
  7. # 指定多久消費者更新offset到zookeeper中。注意offset更新時基於time而不是每次獲得的訊息。一旦在更新zookeeper發生異常並重啟,將可能拿到已拿到過的訊息
  8. zookeeper.sync.time.ms=2000
  9. #指定消費組
  10. group.id=xxx
  11. # 當consumer消費一定量的訊息之後,將會自動向zookeeper提交offset資訊 
  12. # 注意offset資訊並不是每消費一次訊息就向zk提交一次,而是現在本地儲存(記憶體),並定期提交,預設為true
  13. auto.commit.enable=true
  14. # 自動更新時間。預設60 * 1000
  15. auto.commit.interval.ms=1000
  16. # 當前consumer的標識,可以設定,也可以有系統生成,主要用來跟蹤訊息消費情況,便於觀察
  17. conusmer.id=xxx 
  18. # 消費者客戶端編號,用於區分不同客戶端,預設客戶端程式自動產生
  19. client.id=xxxx
  20. # 最大取多少塊快取到消費者(預設10)
  21. queued.max.message.chunks=50
  22. # 當有新的consumer加入到group時,將會reblance,此後將會有partitions的消費端遷移到新 
  23. # 的consumer上,如果一個consumer獲得了某個partition的消費許可權,那麼它將會向zk註冊 
  24. # "Partition Owner registry"節點資訊,但是有可能此時舊的consumer尚沒有釋放此節點, 
  25. # 此值用於控制,註冊節點的重試次數. 
  26. rebalance.max.retries=5
  27. # 獲取訊息的最大尺寸,broker不會像consumer輸出大於此值的訊息chunk
  28. # 每次feth將得到多條訊息,此值為總大小,提升此值,將會消耗更多的consumer端記憶體
  29. fetch.min.bytes=6553600
  30. # 當訊息的尺寸不足時,server阻塞的時間,如果超時,訊息將立即傳送給consumer
  31. fetch.wait.max.ms=5000
  32. socket.receive.buffer.bytes=655360
  33. # 如果zookeeper沒有offset值或offset值超出範圍。那麼就給個初始的offset。有smallest、largest、
  34. # anything可選,分別表示給當前最小的offset、當前最大的offset、拋異常。預設largest
  35. auto.offset.reset=smallest
  36. # 指定序列化處理類(mafka client API呼叫說明-->3.序列化約定wiki),預設為kafka.serializer.DefaultDecoder,即byte[]
  37. derializer.class=com.meituan.mafka.client.codec.MafkaMessageDecoder
複製程式碼


3.2 多執行緒並行消費topic
ConsumerTest類
  1. import kafka.consumer.ConsumerIterator;  
  2. import kafka.consumer.KafkaStream;  
  3. public class ConsumerTest implements Runnable {  
  4.     private KafkaStream m_stream;  
  5.     private int m_threadNumber;  
  6.     public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {  
  7.         m_threadNumber = a_threadNumber;  
  8.         m_stream = a_stream;  
  9.     }  
  10.     public void run() {  
  11.         ConsumerIterator<byte[], byte[]> it = m_stream.iterator();  
  12.         while (it.hasNext())  
  13.             System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));  
  14.         System.out.println("Shutting down Thread: " + m_threadNumber);  
  15.     }  
  16. }  
複製程式碼


ConsumerGroupExample類
  1. import kafka.consumer.ConsumerConfig;  
  2. import kafka.consumer.KafkaStream;  
  3. import kafka.javaapi.consumer.ConsumerConnector;  
  4. import java.util.HashMap;  
  5. import java.util.List;  
  6. import java.util.Map;  
  7. import java.util.Properties;  
  8. import java.util.concurrent.ExecutorService;  
  9. import java.util.concurrent.Executors;  
  10. public class ConsumerGroupExample {  
  11.     private final ConsumerConnector consumer;  
  12.     private final String topic;  
  13.     private  ExecutorService executor;  
  14.     public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {  
  15.         consumer = kafka.consumer.Consumer.createJavaConsumerConnector(  
  16.                 createConsumerConfig(a_zookeeper, a_groupId));  
  17.         this.topic = a_topic;  
  18.     }  
  19.     public void shutdown() {  
  20.         if (consumer != null) consumer.shutdown();  
  21.         if (executor != null) executor.shutdown();  
  22.     }  
  23.     public void run(int a_numThreads) {  
  24.         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
  25.         topicCountMap.put(topic, new Integer(a_numThreads));  
  26.         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);  
  27.         List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);  
  28.         // 啟動所有執行緒  
  29.         executor = Executors.newFixedThreadPool(a_numThreads);  
  30.         // 開始消費訊息  
  31.         int threadNumber = 0;  
  32.         for (final KafkaStream stream : streams) {  
  33.             executor.submit(new ConsumerTest(stream, threadNumber));  
  34.             threadNumber++;  
  35.         }  
  36.     }  
  37.     private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {  
  38.         Properties props = new Properties();  
  39.         props.put("zookeeper.connect", "192.168.2.225:2183/config/mobile/mq/mafka");  
  40.         props.put("group.id", "push-token");  
  41.         props.put("zookeeper.session.timeout.ms", "60000");  
  42.         props.put("zookeeper.sync.time.ms", "2000");  
  43.         props.put("auto.commit.interval.ms", "1000");  
  44.         return new ConsumerConfig(props);  
  45.     }  
  46.     public static void main(String[] args) {  
  47.         String zooKeeper = args[0];  
  48.         String groupId = args[1];  
  49.         String topic = args[2];  
  50.         int threads = Integer.parseInt(args[3]);  
  51.         ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);  
  52.         example.run(threads);  
  53.         try {  
  54.             Thread.sleep(10000);  
  55.         } catch (InterruptedException ie) {  
  56.         }  
  57.         example.shutdown();  
  58.     }  
  59. }  
複製程式碼



總結:
kafka消費者api分為high api和low api,目前上述demo是都是使用kafka high api,高階api不用關心維護消費狀態資訊和負載均衡,系統會根據配置引數,
定期flush offset到zk上,如果有多個consumer且每個consumer建立了多個執行緒,高階api會根據zk上註冊consumer資訊,進行自動負載均衡操作。

注意事項:
1.高階api將會內部實現持久化每個分割槽最後讀到的訊息的offset,資料儲存在zookeeper中的消費組名中(如/consumers/push-token-group/offsets/push-token/2。
其中push-token-group是消費組,push-token是topic,最後一個2表示第3個分割槽),每間隔一個(預設1000ms)時間更新一次offset,
那麼可能在重啟消費者時拿到重複的訊息。此外,當分割槽leader發生變更時也可能拿到重複的訊息。因此在關閉消費者時最好等待一定時間(10s)然後再shutdown()

2.消費組名是一個全域性的資訊,要注意在新的消費者啟動之前舊的消費者要關閉。如果新的程序啟動並且消費組名相同,kafka會新增這個程序到可用消費執行緒組中用來消費
topic和觸發重新分配負載均衡,那麼同一個分割槽的訊息就有可能傳送到不同的程序中。

3.如果消費者組中所有consumer的匯流排程數量大於分割槽數,一部分執行緒或某些consumer可能無法讀取訊息或處於空閒狀態。

4.如果分割槽數多於執行緒數(如果消費組中執行者多個消費者,則執行緒數為消費者組內所有消費者執行緒總和),一部分執行緒會讀取到多個分割槽的訊息

5.如果一個執行緒消費多個分割槽訊息,那麼接收到的訊息是不能保證順序的。
備註:可用zookeeper web ui工具管理檢視zk目錄樹資料: xxx/consumers/push-token-group/owners/push-token/2其中
push-token-group為消費組,push-token為topic,2為分割槽3.檢視裡面的內容如:
push-token-group-mobile-platform03-1405157976163-7ab14bd1-0表示該分割槽被該標示的執行緒所執行。


總結:
producer效能優化:非同步化,訊息批量傳送,具體瀏覽上述引數說明。consumer效能優化:如果是高吞吐量資料,設定每次拿取訊息(fetch.min.bytes)大些,
拿取訊息頻繁(fetch.wait.max.ms)些(或時間間隔短些),如果是低延時要求,則設定時間時間間隔小,每次從kafka broker拿取訊息儘量小些。