1. 程式人生 > >zookeeper-kafka(叢集版)安裝部署以及java呼叫工具類

zookeeper-kafka(叢集版)安裝部署以及java呼叫工具類

 

 

 

 

Kafka安裝部署文件

■ 文件版本

V1.0

■ 作業系統

CentOS Linux release 7.3.1611

■ 編寫人員

閆立雄

■ 文件日期

2019-01-06

 

 

 

 

 

 

 

一.  概述

該文件詳細描述了在Linux環境下安裝Kafka和ZooKeeper的全過程,文件中以kafka_2.11-1.1.0.tgz和zookeeper-3.4.12.tar.gz為例。

 

二.  安裝ZooKeeper

2.1  下載ZooKeeper

       方式1   http://mirrors.shu.edu.cn/apache/zookeeper/網站上下載最新版ZooKeeper

       方式2  去我的百度網盤去下載

                  zookeeper-3.4.12.tar.gz

                  連結:https://pan.baidu.com/s/1SBK-fN8-yP2qYnzQzfBreA

                  提取碼:2dux

        然後把zookeeper-3.4.12.tar.gz檔案上傳到伺服器/usr/local/目錄下(叢集的話各上傳到伺服器目錄下)。

2.2  安裝ZooKeeper

# tar –zxf zookeeper-3.4.11.tar.gz

# mv zookeeper-3.4.12 /usr/local/zookeeper

# mkdir –p /var/lib/zookeeper

# mv /usr/local/zookeeper/conf/zoo_sample /usr/local/zookeeper/conf/zoo.cfg

# vi /usr/local/zookeeper/conf/zoo.cfg(server.1表示叢集的服務資訊,有幾臺叢集就配置幾個server。建議配置奇數的叢集,如:3、5、7個)。

tickTime=2000

dataDir=/var/lib/zookeeper

clientPort=2181

initLimit=20

syncLimit=5

server.1=192.168.1.10:2888:3888

server.2=192.168.1.11:2888:3888

server.3=192.168.1.12:2888:3888

# export JAVA_HOME=/usr/java/jdk1.8.0_121

# vi /var/lib/zookeeper/myid(配置所在伺服器叢集的id,如:在192.168.1.10伺服器就配置為1)

   1

# vi /usr/local/kafka/config/zookeeper.properties(註釋掉Kafka預設ZooKeeper相關引數dataDir、clientPort、maxClientCnxns)

2.3  使用Kafka預設安裝ZooKeeper

# mkdir –p /var/lib/zookeeper

# vi /usr/local/kafka/config/zookeeper.properties(server.1表示叢集的服務資訊,有幾臺叢集就配置幾個server。建議配置奇數的叢集,如:3、5、7個) 

tickTime=2000

dataDir=/var/lib/zookeeper

clientPort=2181

initLimit=20

syncLimit=5

server.1=192.168.1.10:2888:3888

server.2=192.168.1.11:2888:3888

server.3=192.168.1.12:2888:3888

# vi /var/lib/zookeeper/myid(配置所在伺服器叢集的id,如:在192.168.1.10伺服器就配置為1)

   1

 

三.  安裝Kafka

3.1  下載Kafka

       方式1   去apache網站下載(http://kafka.apache.org/downloads)最新版的Kafka,JDK推薦使用JDK1.8版本。

       方式2   去我的百度網盤去下載

                    kafka_2.11-1.1.0.tgz

                    連結:https://pan.baidu.com/s/15aEYWPPaBPr1fTJ-Z6qJbg

                   提取碼:3x98

      然後把kafka_2.11-1.1.0.tgz檔案上傳到伺服器/usr/local/目錄下(叢集的話各上傳到伺服器目錄下)。

3.2  安裝Kafka

# tar -zxf kafka_2.11-1.1.0.tgz

# mv kafka_2.11-1.1.0 /usr/local/kafka

# mkdir /home/kafka-logs

# export JAVA_HOME=/usr/java/jdk1.8.0_121

# vi /usr/local/kafka/config/server.properties

     broker.id=1

listeners=PLAINTEXT://192.168.1.10:9092

advertised.listeners=PLAINTEXT:// 192.168.1.10:9092

log.dirs=/home/kafka-logs

zookeeper.connect=192.168.1.10:2181,192.168.1.11:2181,192.168.1.12:2181

zookeeper.connection.timeout.ms=6000

num.recovery.threads.per.data.dir=10

delete.topic.enable=true

注意:如果伺服器分內外網IP,advertised.listeners要配置外網IP,也就是程式連線的IP地址。

# vi /usr/local/kafka/bin/kafka-server-start.sh (根據機器記憶體來設定記憶體大小,可以設定總記憶體大小的50%)

Export KAFKA_HEAP_OPTS=”-Xmx8G –Xms8G”

# hostname(檢視本機host)

# vi /etc/hosts (把本機host配置到hosts檔案中,其它叢集機器hosts也配置)

新增一行。

192.168.1.10  hostname

 

四.  Zookeeper、kafka常用命令

1.啟動zookeeper

sh /usr/local/zookeeper/bin/zkServer.sh start      單獨的zookeeper啟動

 

/usr/local/kafka/bin/zookeeper-server-start.sh -daemon /usr/local/kafka/config/zookeeper.properties     kafka自帶的zookeeper啟動

 

2.啟動Kafka

/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

 

3.建立一個主題

/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.1.10:2181 --replication-factor 1 --partitions 2 --topic test (replication-factor 副本因子    partitions分割槽)

 

4.傳送訊息

/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.1.10:9092 --topic test

 

5.消費訊息

/usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper 192.168.1.10:2181 --topic test --from-beginning (from-beginning 從最開始消費)

 

6.檢視已建立人topic列表

/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.1.10:2181

 

7.檢視topic屬性

/usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.1.10:2181 --topic test

 

8.刪除topic

/usr/local/kafka/bin/kafka-topics.sh --zookeeper 192.168.1.10:2181 --delete --topic test

 (1)登入zookeeper客戶端:命令:./zookeeper-shell.sh 192.168.1.10:2181

 (2)找到topic所在的目錄:ls /brokers/topics

 (3)找到要刪除的topic,執行命令:rmr /brokers/topics/【topic name】即可,此時topic被徹底刪除。

 

9.檢視分組消費情況

/usr/local/kafka/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 192.168.1.10:9092 --describe --group test-consumer-group

 

10.檢視所有分組列表

/usr/local/kafka/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 192.168.1.10:9092 --list

 

五.  常見問題

1、用命令傳送訊息時報Connection to node -1 could not be established. Broker may not be available.

解決:如果請求的地址是localhost的話,改成實際的IP地址。

2、JAVA程式碼消費不了Kafka的訊息,在ConsumerRecords<String, String> records = consumer.poll(100);方法中一直迴圈。

解決:是因為部署Kafka的機器之前有安裝過Kafka,產生了髒資料。

1)進入zookeeper 執行zkCli.sh 。(Kafka預設ZooKeeper執行./zookeeper-shell.sh 192.168.1.10:2181)
2)執行ls /brokers/topics 檢視主題
3)然後執行 rmr /brokers/topics/__consumer_offsets 刪除__consumer_offsets_主題

4)然後重啟kafka叢集。

3、傳送訊息時不斷的提示 Error while fetching metadata with correlation id 1。

解決:每臺伺服器的/etc/hosts需要配置127.0.0.1 本機hosts,注意不要寫訪問的IP地址,要寫127.0.0.1

 

六.  java呼叫工具類

      本人在springboot微服務中用到的kafka工具類。

       

        ylx_java_utility class.zip
        連結:https://pan.baidu.com/s/1gwmnSL-N2KPAtzh_kzM1og
        提取碼:pg9o

 

以上是本人自己總結,並且在專案總實際運用操作的。新手一枚,不喜勿噴!