1. 程式人生 > >安裝部署(六) Kafka叢集安裝部署以及Producer和Consumer的JAVA程式碼測試

安裝部署(六) Kafka叢集安裝部署以及Producer和Consumer的JAVA程式碼測試

Kafka叢集安裝部署以及Producer和Consumer的JAVA程式碼測試


kafka scala2.11_0.10.0.0
ubuntu 14.04.04 x64
hadoop 2.7.2

spark 2.0.0

scala 2.11.8

jdk 1.8.0_101

zookeeper 3的叢集安裝參考spark安裝部署中的相應部分。


1、下載
二進位制包
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz


2、解壓
[email protected]:/server# tar xvzf kafka_2.11-0.10.0.0.tgz 
[email protected]:/server# mv kafka_2.11-0.10.0.0/ kafka/


3、環境變數
vi ~/.bashrc
export KAFKA_HOME=/server/kafka
export PATH=$PATH:$KAFKA_HOME/bin
source ~/.bashrc


4、配置檔案
config/server.properties
[email protected]
:/server/kafka/config# vi server.properties 
參考:
http://blog.csdn.net/z769184640/article/details/51585419
# 唯一標識一個broker.
broker.id=0
#繫結服務監聽的地址和埠,要填寫hostname -i 出來的地址,否則可能會繫結到127.0.0.1,producer可能會發不出訊息
listeners=PLAINTEXT://10.1.1.6:9092


#advertised.listeners=PLAINTEXT://your.host.name:9092
advertised.listeners=PLAINTEXT://10.1.1.6:9092
【不設定會出現3tries連結錯誤】






#存放日誌和訊息的目錄,可以是用逗號分開的目錄,同樣不推薦使用/tmp【此處沒改動,我的還是tmp】
log.dirs=/usr/local/services/kafka/kafka-logs
#每個topic預設partitions的數量,數量較大表示消費者可以有更大的並行度。
num.partitions=2
#Zookeeper的連線配置,用逗號隔開,也可以用10.1.1.6:2181/kakfa這樣的方式指定kafka資料在zk中的根目錄
zookeeper.connect=10.1.1.6:2181,10.1.1.11:2181,10.1.1.12:2181,10.1.1.13:2181,10.1.1.14:2181


5、分發
拷貝到其他機上
[email protected]
:/server# scp -r kafka/ [email protected]:/server/
[email protected]:/server# scp -r kafka/ [email protected]:/server/
[email protected]:/server# scp -r kafka/ [email protected]:/server/
[email protected]:/server# scp -r kafka/ [email protected]:/server/


修改11-14的環境變數和配置檔案,改動的地方如下:
broker.id=1~4[另外四臺]
listeners=PLAINTEXT://10.1.1.11~14[另外四臺]:9092


6、啟動


首先,啟動zookeeper叢集,啟動方法參考spark安裝部署那一篇。


然後,再啟動每個kafka節點


[email protected]
:/server/kafka# ./bin/kafka-server-start.sh -daemon config/server.properties
[email protected]:/server/kafka# ./bin/kafka-server-start.sh -daemon config/server.properties
[email protected]:/server/kafka# ./bin/kafka-server-start.sh -daemon config/server.properties
[email protected]:/server/kafka# ./bin/kafka-server-start.sh -daemon config/server.properties
[email protected]:/server/kafka# ./bin/kafka-server-start.sh -daemon config/server.properties
-daemon放在後臺執行。 
【注意:不要直接用kafka-server-start.sh -daemon config/server.properties】
可見:
[email protected]:/server/kafka# jps
1203 Kafka
551 DataNode
2124 Jps
701 NodeManager
911 Worker
28063 SparkSubmit
383 QuorumPeerMain
[email protected]:/server/kafka# 
其他機一樣
[email protected]:/server/kafka# jps
18592 NodeManager
7456 Main
18867 Worker
9780 Kafka
17894 DataNode
18073 SecondaryNameNode
18650 Master
17499 QuorumPeerMain
9837 Jps
18269 ResourceManager
17725 NameNode


7、測試


7.1 測試單機topic


7.1.1 建立一個topic名為my-test
[email protected]:/server/kafka# bin/kafka-topics.sh --create --zookeeper 10.1.1.6:2181 --replication-factor 3 --partitions 1 --topic my-test
Created topic "my-test".
#replication-factor是備份數
#partition是分割槽數


7.1.2 傳送訊息,ctrl+c終止
[email protected]:/server/kafka# bin/kafka-console-producer.sh --broker-list 10.1.1.6:9092 --topic my-test
今天是個好日子
hello
^[email protected]:/server/kafka#


7.1.3 另一臺機器上消費訊息
[email protected]:/server/kafka# bin/kafka-console-consumer.sh --zookeeper 10.1.1.11:2181 --from-beginning --topic my-test
今天是個好日子
hello
^CProcessed a total of 2 messages
[email protected]:/server/kafka# bin/kafka-console-consumer.sh --zookeeper 10.1.1.12:2181 --from-beginning --topic my-test
今天是個好日子
hello
^CProcessed a total of 2 messages
[email protected]:/server/kafka# 
[email protected]:/server/kafka# bin/kafka-console-consumer.sh --zookeeper 10.1.1.6:2181 --from-beginning --topic my-test
今天是個好日子
hello
^CProcessed a total of 2 messages
[email protected]:/server/kafka# 
其他機一樣


繼續傳送訊息則在消費者終端會一直出現新產生的訊息。


7.2 檢視主題詳細
[email protected]:/server/kafka/bin# ./kafka-topics.sh --describe --zookeeper 10.1.1.6:2181 --topic my-test 
Topic:my-testPartitionCount:1ReplicationFactor:3Configs:
Topic: my-testPartition: 0Leader: 3Replicas: 3,1,2Isr: 3,1,2
[email protected]:/server/kafka# bin/kafka-topics.sh --describe --zookeeper 10.1.1.6:2181 --topic my-test 
Topic:my-testPartitionCount:1ReplicationFactor:3Configs:
Topic: my-testPartition: 0Leader: 3Replicas: 3,1,2Isr: 3,1,2


7.3 用zookeeper查詢
[email protected]:/server/zookeeper/bin# $ZOOKEEPER_HOME/bin/zkCli.sh
WATCHER::


WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] 
登入成功
[zk: localhost:2181(CONNECTED) 1] ls /
[controller, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, config]
[zk: localhost:2181(CONNECTED) 2] ls /brokers
[ids, topics, seqid]
[zk: localhost:2181(CONNECTED) 3] ls /brokers/ids
[0, 1, 2, 3, 4]
[zk: localhost:2181(CONNECTED) 4] ls /brokers/ids/0
[]
[zk: localhost:2181(CONNECTED) 5] ls /brokers/topics
[my-test]
[zk: localhost:2181(CONNECTED) 6] ls /brokers/topics/test/partitions
Node does not exist: /brokers/topics/test/partitions
[zk: localhost:2181(CONNECTED) 7] ls /brokers/topics/my-test/partitions
[0]
[zk: localhost:2181(CONNECTED) 8] 


8、關閉kafka
pkill -9 -f server.properties


9、應用
參考:
0.10.0.0 
方法和0.8有所不同,不能用0.8.x的例子
http://blog.csdn.net/louisliaoxh/article/details/51577117 主要
http://www.cnblogs.com/fxjwind/p/5646631.html 主要
http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
0.8.x 參考
http://chengjianxiaoxue.iteye.com/blog/2190488
http://www.open-open.com/lib/view/open1407942131801.html
http://blog.csdn.net/z769184640/article/details/51585419
http://wenku.baidu.com/view/7478cab24431b90d6d85c703.html?from=search
高階:
http://blog.csdn.net/hxpjava1/article/details/19160665
http://orchome.com/11
【注意:網上的程式碼基本上都是缺這少那的,不能執行,下邊11部分自己的程式碼是測試成功的,但是不保證其他人也能在自己的環境跑】


9.1 Producer程式碼【見附件11.1】


9.1.2 jar包
【maven參考:http://www.cnblogs.com/xing901022/p/4170248.html】
project->run-as->build[goal:compile或者pom.xml加一句pom.xml檔案<build>標籤後面加上<defaultGoal>compile</defaultGoal>即可  ]
eclipse->file->import->maven->existing maven projects->choose your project folder
選中左側工程Kafka中的src/main/java/kafkaProducer.java 
eclipse-file-export-java-jar file,選中右側三個,還有export java source file and resources,選擇儲存的位置和名稱,點next,點next,select the class of the application entry point,選擇主類kafkaProducer.java 點finish,等待maven下載包並打包。
9.1.3 jar包(包含外部依賴包)
http://lvjun106.iteye.com/blog/1849803




9.2 Consumer程式碼【見附件】




9.2.2 jar包
步驟與9.1類似,kafkaProducer.java換成kafkaConsumer.java即可。


10. 測試
10.1 Producer類打包(含外部依賴包)
0.10.0.0測試
【MANIFEST.MF參考:http://www.cnblogs.com/lanxuezaipiao/p/3291641.html】
普通的eclipse mave run as -> build 不含外部依賴包,如果用manifest.mf,則需要新增很多依賴包,很麻煩,用以下方法簡單
需要在pom.xml的<project></project>內新增
 <build>
  <defaultGoal>compile</defaultGoal>
  <plugins>  
            <plugin>  
                <artifactId>maven-assembly-plugin</artifactId>  
                <configuration>  
                    <archive>  
                        <manifest>  
                            <mainClass>ktest.kafka.Producer</mainClass>  
                        </manifest>  
                    </archive>  
                    <descriptorRefs>  
                        <descriptorRef>jar-with-dependencies</descriptorRef>  
                    </descriptorRefs>  
                </configuration>  
            </plugin>  
        </plugins>  
  </build>
【注意:
mainClass是你的主類名,也就是package後邊的路徑+主類名



10.1.2 命令列編譯
run-configure agument寫 test-producer,因args[0],args[1],不然報錯
在專案根目錄下
jar包
E:\fm-workspace\workspace_2\kafka>mvn assembly:assembly


10.1.3 建立topic
[email protected]:/server/kafka/bin# ./kafka-topics.sh --create --zookeeper "10.1.1.6:2181" --topic "test-producer" --partitions 10 --replication-factor 3
Created topic "test-producer".


10.1.4 執行Producer jar
[email protected]:/projects/test/javatest# ll
總用量 11432
drwxr-xr-x 2 root root     4096  8月 11 14:27 ./
drwxr-xr-x 6 root root     4096  8月  8 14:58 ../
-rw-r--r-- 1 root root        0  8月 11 14:17 logContentFile.log
-rw-r--r-- 1 root root 11696578  8月 11 14:26 Producer-ok.jar
格式:java -jar 包名 topic名 message內容
[email protected]:/projects/test/javatest# java -jar Producer-ok.jar test-producer I_am_a_test_message
kafka Logger--> INFO{AbstractConfig.java:178}-ProducerConfig values: 
metric.reporters = []
metadata.max.age.ms = 300000
reconnect.backoff.ms = 50
【注意:不能是帶空格的字串會失敗】


10.2 consumer jar包
10.2.1 pom.xml 


<build>
  <defaultGoal>compile</defaultGoal>
  <plugins>  
            <plugin>  
                <artifactId>maven-assembly-plugin</artifactId>  
                <configuration>  
                    <archive>  
                        <manifest>  
                            <mainClass>ktest.kafka.Consumer</mainClass>  
                        </manifest>  
                    </archive>  
                    <descriptorRefs>  
                        <descriptorRef>jar-with-dependencies</descriptorRef>  
                    </descriptorRefs>  
                </configuration>  
            </plugin>  
        </plugins>  
  </build>


10.2.2 程式碼
見附件 11.2
group.id在$KAFKA_HOME/config/consumer.properties
#consumer group id
group.id=test-consumer-group




10.2.3 編譯
run-configure agument寫 test-producer,隨便寫點,不然報錯,因args[0]
E:\fm-workspace\workspace_2\kafka>mvn assembly:assembly
【雖然在run as build下會爆<>diamond錯誤,但是mvn下是Ok的,本來泛型就是沒錯的】


10.2.4 執行jar
開一個終端執行consumer
[email protected]:/projects/test/javatest# java -jar Consumer-ok.jar test-producer
kafka Logger--> INFO{AbstractConfig.java:178}-ConsumerConfig values: 
kafka Logger--> INFO{AppInfoParser.java:83}-Kafka version : 0.10.0.0
kafka Logger--> INFO{AppInfoParser.java:84}-Kafka commitId : b8642491e78c5a13
kafka Logger--> INFO{AbstractCoordinator.java:505}-Discovered coordinator 10.1.1.6:9092 (id: 2147483647 rack: null) for group test-consumer-group.
kafka Logger--> INFO{ConsumerCoordinator.java:280}-Revoking previously assigned partitions [] for group test-consumer-group
kafka Logger--> INFO{AbstractCoordinator.java:326}-(Re-)joining group test-consumer-group
kafka Logger--> INFO{AbstractCoordinator.java:434}-Successfully joined group test-consumer-group with generation 3
kafka Logger--> INFO{ConsumerCoordinator.java:219}-Setting newly assigned partitions [test-producer-2, test-producer-1, test-producer-0, test-producer-3] for group test-consumer-group
。。。
之後進入等待狀態,,,


再開一個終端執行producer
[email protected]:/projects/test/javatest# ll
總用量 22856
drwxr-xr-x 2 root root     4096  8月 11 15:19 ./
drwxr-xr-x 6 root root     4096  8月  8 14:58 ../
-rw-r--r-- 1 root root 11697980  8月 11 15:15 Consumer-ok.jar
-rw-r--r-- 1 root root        0  8月 11 14:17 logContentFile.log
-rw-r--r-- 1 root root 11696578  8月 11 14:26 Producer-ok.jar
[email protected]:/projects/test/javatest# java -jar Producer-ok.jar test-producer I_am_a_test_messagejlkjlkjflkjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjj


然後再看consumer終端


kafka Logger--> INFO{AppInfoParser.java:83}-Kafka version : 0.10.0.0
kafka Logger--> INFO{AppInfoParser.java:84}-Kafka commitId : b8642491e78c5a13
kafka Logger--> INFO{AbstractCoordinator.java:505}-Discovered coordinator 10.1.1.6:9092 (id: 2147483647 rack: null) for group test-consumer-group.
kafka Logger--> INFO{ConsumerCoordinator.java:280}-Revoking previously assigned partitions [] for group test-consumer-group
kafka Logger--> INFO{AbstractCoordinator.java:326}-(Re-)joining group test-consumer-group
kafka Logger--> INFO{AbstractCoordinator.java:434}-Successfully joined group test-consumer-group with generation 3
kafka Logger--> INFO{ConsumerCoordinator.java:219}-Setting newly assigned partitions [test-producer-2, test-producer-1, test-producer-0, test-producer-3] for group test-consumer-group


offset = 0, key = null, value = I_am_a_test_messagejlkjlkjflkjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjjj
【出現了Producer產生的message】測試成功。




####################################
問題:
slf4j對log4j有版本依賴
slf4j-log4j12 的1.75 版本 依賴 ( slf4j-api 1.75 和 log4j 1.2.17)
http://blog.csdn.net/anialy/article/details/8529188
http://my.oschina.net/zimingforever/blog/98048
報appender錯,需要把log4j.properties放到專案根目錄,預設配置即可。
找不到main函式,明明有,需要重新build一下。
<>報錯,那就在裡邊填個型別吧,必須靠譜的<String,String>
ubuntu下eclipse 總是報slf4j錯誤,包都全,但是windows下沒事,
還有匯出export jar包,第三項是儲存class檔案到Jar,不要選。
.如果執行程式出現錯誤:“Exception in thread "main" java.lang.NoClassDefFoundError: org/slf4j/LoggerFactory”,這是因為專案缺少slf4j-api.jar和slf4j-log4j12.jar這兩個jar包導致的錯誤。
http://www.cnblogs.com/xwdreamer/archive/2012/02/20/2359595.html




#################################
pom.xml 【不全面,參考部分即可】
按網上提供的pom.xml會報zookeeper錯,原因是裡邊沒有zookeeper資訊,另外本人的kafka是2.11_0.10的,pom.xml也改了,增加了build compile。


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>


  <groupId>bj.zm</groupId>
  <artifactId>kafka</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>


  <name>kafka</name>
  <url>http://maven.apache.org</url>


  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>


  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    
     <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.10.0.0</version>
    </dependency>
  <dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.8</version>
  </dependency>
  </dependencies>
  <build><defaultGoal>compile</defaultGoal></build>
</project>






#############################
參考:
############################# Server Basics #############################
# 唯一標識一個broker.
broker.id=1
############################# Socket Server Settings #############################
#繫結服務監聽的地址和埠,要填寫hostname -i 出來的地址,否則可能會繫結到127.0.0.1,producer可能會發不出訊息
listeners=PLAINTEXT://172.23.8.144:9092
#broker對producers和consumers服務的地址和埠,如果沒有配置,使用listeners的配置,本文沒有配置該項
#advertised.listeners=PLAINTEXT://your.host.name:9092
# 處理網路請求的執行緒數
num.network.threads=3
# 處理磁碟I/O的執行緒數
num.io.threads=8
# socket server的傳送buffer大小 (SO_SNDBUF) 
socket.send.buffer.bytes=102400
# socket server的接收buffer大小 (SO_RCVBUF)
socket.receive.buffer.bytes=102400
#一個請求的最大size,用來保護防止oom
socket.request.max.bytes=104857600
############################# Log Basics #############################
#存放日誌和訊息的目錄,可以是用逗號分開的目錄,同樣不推薦使用/tmp
log.dirs=/usr/local/services/kafka/kafka-logs
#每個topic預設partitions的數量,數量較大表示消費者可以有更大的並行度。
num.partitions=2
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
#日誌的過期時間,超過後被刪除,單位小時
log.retention.hours=168
#一個日誌檔案最大大小,超過會新建一個檔案
log.segment.bytes=1073741824
#根據過期策略檢查過期檔案的時間間隔,單位毫秒
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
#Zookeeper的連線配置,用逗號隔開,也可以用172.23.8.59:2181/kakfa這樣的方式指定kafka資料在zk中的根目錄
zookeeper.connect=172.23.8.144:2181,172.23.8.179:2181,172.23.8.59:2181
# 連線zk的超時時間
zookeeper.connection.timeout.ms=6000


自動提交offset偏移量


Properties props = new Properties();
//brokerServer(kafka)ip地址,不需要把所有叢集中的地址都寫上,可是一個或一部分
props.put("bootstrap.servers", "172.16.49.173:9092");
//設定consumer group name,必須設定
props.put("group.id", a_groupId);
//設定自動提交偏移量(offset),由auto.commit.interval.ms控制提交頻率
props.put("enable.auto.commit", "true");
//偏移量(offset)提交頻率
props.put("auto.commit.interval.ms", "1000");
//設定使用最開始的offset偏移量為該group.id的最早。如果不設定,則會是latest即該topic最新一個訊息的offset
//如果採用latest,消費者只能得道其啟動後,生產者生產的訊息
props.put("auto.offset.reset", "earliest");
//設定心跳時間
props.put("session.timeout.ms", "30000");
//設定key以及value的解析(反序列)類
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//訂閱topic
consumer.subscribe(Arrays.asList("topic_test"));
while (true) {
    //每次取100條資訊
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
 }


需要注意的:


group.id :必須設定 
auto.offset.reset:如果想獲得消費者啟動前生產者生產的訊息,則必須設定為earliest;如果只需要獲得消費者啟動後生產者生產的訊息,則不需要設定該項 
enable.auto.commit(預設值為true):如果使用手動commit offset則需要設定為false,並再適當的地方呼叫consumer.commitSync(),否則每次啟動消費折後都會從頭開始消費資訊(在auto.offset.reset=earliest的情況下);






11 程式碼
##################################
11.1 Producer
Producer.class程式碼


package ktest.kafka;




import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;


import java.util.Properties;
import java.util.concurrent.ExecutionException;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class Producer{


    public static void main(String[] args){
//    Producer pdc=new Producer();
//    KafkaProducer<String,String> producer;
      Properties props = new Properties();
//       public Producer(){
           props.put("bootstrap.servers", "10.1.1.6:9092,10.1.1.11:9092");
           //props.put("acks", "all"); //ack方式,all,會等所有的commit最慢的方式
           props.put("retries", 3); //失敗是否重試,設定會有可能產生重複資料
           //props.put("batch.size", 16384); //對於每個partition的batch buffer大小
           //props.put("linger.ms", 1);  //等多久,如果buffer沒滿,比如設為1,即訊息傳送會多1ms的延遲,如果buffer沒滿
           //props.put("buffer.memory", 33554432); //整個producer可以用於buffer的記憶體大小
           props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
           props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
           KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
           String topic=(String) args[0];
           //String partitionStr=(String) args[1];
           String messageStr=(String) args[1];
           //ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, partitionStr,messageStr);
           ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,messageStr);
//       }
    producer.send(record);
//        for(int i = 0; i < 100; i++)
//            producer.send(new ProducerRecord<String, String>("my-test", Integer.toString(i), Integer.toString(i)));
        producer.close();
    }
}




##############################
log4j.properties如下:
【注意:layout.ConversionPattern=kafka後的kafka改成你的專案名】


#config root logger  
log4j.rootLogger = INFO,system.out  
log4j.appender.system.out=org.apache.log4j.ConsoleAppender  
log4j.appender.system.out.layout=org.apache.log4j.PatternLayout  
log4j.appender.system.out.layout.ConversionPattern=kafka Logger-->%5p{%F:%L}-%m%n  
  
#config this Project.file logger  
log4j.logger.thisProject.file=INFO,thisProject.file.out  
log4j.appender.thisProject.file.out=org.apache.log4j.DailyRollingFileAppender  
log4j.appender.thisProject.file.out.File=logContentFile.log  
log4j.appender.thisProject.file.out.layout=org.apache.log4j.PatternLayout  




###################################
pom.xml Producer.class的,consumer要改成Consumer


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>


  <groupId>ktest</groupId>
  <artifactId>kafka</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>


  <name>kafka</name>
  <url>http://maven.apache.org</url>


<dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
    </dependency>
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.2</version>
</dependency>
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.2</version>
    </dependency>
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-ext</artifactId>
    <version>1.7.2</version>
</dependency>
    <dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.8</version>
    </dependency>
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.0.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>0.10.0.0</version>
    </dependency>
  </dependencies>
  <build>
  <defaultGoal>compile</defaultGoal>
  <plugins>  
            <plugin>  
                <artifactId>maven-assembly-plugin</artifactId>  
                <configuration>  
                    <archive>  
                        <manifest>  
                            <mainClass>ktest.kafka.Producer</mainClass>  
                        </manifest>  
                    </archive>  
                    <descriptorRefs>  
                        <descriptorRef>jar-with-dependencies</descriptorRef>  
                    </descriptorRefs>  
                </configuration>  
            </plugin>  
        </plugins>  
  </build>
</project>


##############################################
11.2 Consumer.java 


package ktest.kafka;




import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;


import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutionException;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class Consumer{


    public static void main(String[] args){
//    Producer pdc=new Producer();
//    KafkaProducer<String,String> producer;
   Properties props = new Properties();
        props.put("bootstrap.servers", "10.1.1.6:9092,10.1.1.11:9092");
        props.put("group.id", "test-consumer-group");//預設的,可在$KAFKA_HOME/config/consumer.properties看
        //props.put("acks", "all"); //ack方式,all,會等所有的commit最慢的方式
        props.put("enable.auto.commit", "true");  //自動commit
        props.put("auto.commit.interval.ms", "1000"); //定時commit的週期
        props.put("session.timeout.ms", "30000"); //consumer活性超時時間
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        String topic1=(String) args[0];
        consumer.subscribe(Arrays.asList(topic1));
//    String topic2=(String) args[1];
//    consumer.subscribe(Arrays.asList(topic1, topic2)); //subscribe,foo,bar,兩個topic
        while (true) {
            //每次取100條資訊
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
            System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
         }
    }
}


###################################
Consumer.java 的pom.xml 


  <build>
  <defaultGoal>compile</defaultGoal>
  <plugins>  
            <plugin>  
                <artifactId>maven-assembly-plugin</artifactId>  
                <configuration>  
                    <archive>  
                        <manifest>  
                            <mainClass>ktest.kafka.Consumer</mainClass>  
                        </manifest>  
                    </archive>  
                    <descriptorRefs>  
                        <descriptorRef>jar-with-dependencies</descriptorRef>  
                    </descriptorRefs>  
                </configuration>  
            </plugin>  
        </plugins>  
  </build>
  
#######################################


完整的pom.xml Consumer.java
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>


  <groupId>ktest</groupId>
  <artifactId>kafka</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>


  <name>kafka</name>
  <url>http://maven.apache.org</url>


<dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
    </dependency>
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.2</version>
</dependency>
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.2</version>
    </dependency>
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-ext</artifactId>
    <version>1.7.2</version>
</dependency>
    <dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.8</version>
    </dependency>
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.0.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>0.10.0.0</version>
    </dependency>
  </dependencies>
  <build>
  <defaultGoal>compile</defaultGoal>
  <plugins>  
            <plugin>  
                <artifactId>maven-assembly-plugin</artifactId>  
                <configuration>  
                    <archive>  
                        <manifest>  
                            <mainClass>ktest.kafka.Consumer</mainClass>  
                        </manifest>  
                    </archive>  
                    <descriptorRefs>  
                        <descriptorRef>jar-with-dependencies</descriptorRef>  
                    </descriptorRefs>  
                </configuration>  
            </plugin>  
        </plugins>  
  </build>
</project>