前面分享了kafka的基本知識,下面就要對kafka進行實操,先看如何安裝。
kafka需要zookepper的支援,所以要安裝kafka需要有zookeeper的環境,zookeeper安裝請參見《zookeeper之二:zookeeper3.7.0安裝完全指南(絕對實用)》。這裡可以是一個單獨的zookepper的環境也可以使用kafka整合的zookeeper環境,建議使用單獨的zookeeper環境。本文以單獨環境為例,整合環境請留言。
1、下載
kafka的官網地址是:http://kafka.apache.org/
從官網下載最新的kafka的安裝包,如下圖
這裡選擇二進位制包進行安裝,
我這裡已經下載好Scala2.12版本的kafka安裝包,
2、安裝
2.1、單機版
2.1.1、環境
os:在虛擬機器環境中的centos7-64進行安裝。
zookeeper:zookeeper已經安裝好,是3.7的版本。環境在本機:127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184
JDK:JDK環境已安裝,是1.8的版本
2.1.2、修改配置檔案
kafka的安裝包解壓後的目錄結構如下,
bin目錄下存放的是kafka的指令碼檔案,包括啟動、停止、客戶端、消費端等;
config目錄下存放的是和配置相關的檔案,包括客戶端、消費端、伺服器、zookeeper的配置等;
libs目錄存放的是kafka啟動用到的檔案及依賴;
logs目錄存放的是kafka的執行時日誌檔案;
瞭解完了kafka的檔案目錄,下面進行單機版kafka配置檔案的修改。要修改的檔案是config/server.properties檔案,該檔案比較長,這裡就不貼了,看下幾個必要的配置。這裡有必要提下kafka的配置檔案做的很好,有詳細的解釋還有例子。
broker.id broker的id或者編號,在叢集中該編號必須唯一
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
listeners kafka伺服器監聽的埠,該埠也是對外提供服務的埠
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.117.128:9092
num.partitions topic下分割槽的數量
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
log.dirs 訊息的存放目錄,這裡看配置是日誌的意思,因為kafka把訊息使用日誌的形式儲存,所以這裡不要和kafka的執行日誌相混淆。
# A comma separated list of directories under which to store log files
log.dirs=/home/dev/datas/kafka_2.12-2.8.0/single
log.retentio.hours 訊息儲存的小時數
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
default.replication.factor 訊息的副本數量,這是kafka高可用、資料不丟失的關鍵
default.replication.factor=3
zookeeper.connect zookeeper的地址
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2182/kafka,localhost:2183/kafka,localhost:2184/kafka
配置完上面的配置項後就可以啟動單機版的kafka了
2.1.3、啟動
有了上面的配置,現在啟動kafka,在kafka的解壓目錄下執行,
bin/kafka-server-start.sh config/server.properties
執行後看到下面的日誌列印,
看到是在前臺啟動的,有沒有辦法後臺啟動那,先停掉kafka,加上-daemon則使用守護執行緒在後臺啟動。
bin/kafka-server-start.sh -daemon config/server.properties
2.2、叢集版
2.2.1、環境
叢集版的環境和單機版的環境是一樣的,需要zookeeper和JDK。由於叢集版要求必須是2N+1臺機器,我部署最少節點的叢集,在虛擬機器上起3個程序代表3臺機器。
IP | id | 埠 | 日誌路徑 |
本機 | 1 | 9093 |
/home/dev/datas/kafka_2.12-2.8.0/cluster/node1 |
本機 | 2 | 9094 |
/home/dev/datas/kafka_2.12-2.8.0/cluster/node2 |
本機 | 3 | 9095 |
/home/dev/datas/kafka_2.12-2.8.0/cluster/node3 |
2.2.2、修改配置檔案
配置項和單機版的基本相同,選擇在config下cluster資料夾,其下放置3個配置檔案,
每個檔案的內容可參考單機的,這裡不再貼出。
關於日誌路徑也是要事先建好相應的目錄。
2.2.3、啟動
啟動命令和單機的也是相同的,選擇在後臺啟動,每次啟動指定不同的配置檔案,
[dev@localhost kafka_2.12-2.8.0]$ bin/kafka-server-start.sh -daemon config/cluster/server_node3.properties
3、測試
3.1、單機版測試
參見3.2叢集版測試方法
3.2、叢集版測試
3.2.1、通過檢視程序的方式
使用jps命令檢視程序
[dev@localhost kafka_2.12-2.8.0]$ jps
可以看到下面的列印
說明啟動了三個kafka程序
3.2.2、使用客戶端方式
使用kafka自帶的客戶端來建立一個topic
[dev@localhost kafka_2.12-2.8.0]$ bin/kafka-topics.sh --bootstrap-server 192.168.117.128:9093 --create --topic testKafka
看下圖
說明建立成功,現在在另外一個節點上檢視
[dev@localhost kafka_2.12-2.8.0]$ bin/kafka-topics.sh --bootstrap-server 192.168.117.128:9095 --list
看下圖
可以看到剛才建立的testKafka這個topic。
有不正之處,歡迎指正。參考:
https://www.cnblogs.com/zhaoshizi/p/12154518.html
https://www.cnblogs.com/sandea/p/12078442.html