前面分享了kafka的基本知識,下面就要對kafka進行實操,先看如何安裝。

kafka需要zookepper的支援,所以要安裝kafka需要有zookeeper的環境,zookeeper安裝請參見《zookeeper之二:zookeeper3.7.0安裝完全指南(絕對實用)》。這裡可以是一個單獨的zookepper的環境也可以使用kafka整合的zookeeper環境,建議使用單獨的zookeeper環境。本文以單獨環境為例,整合環境請留言。

1、下載

kafka的官網地址是:http://kafka.apache.org/

從官網下載最新的kafka的安裝包,如下圖

這裡選擇二進位制包進行安裝,

我這裡已經下載好Scala2.12版本的kafka安裝包,

2、安裝

2.1、單機版

2.1.1、環境

os:在虛擬機器環境中的centos7-64進行安裝。

zookeeper:zookeeper已經安裝好,是3.7的版本。環境在本機:127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184

JDK:JDK環境已安裝,是1.8的版本

2.1.2、修改配置檔案

kafka的安裝包解壓後的目錄結構如下,

bin目錄下存放的是kafka的指令碼檔案,包括啟動、停止、客戶端、消費端等;

config目錄下存放的是和配置相關的檔案,包括客戶端、消費端、伺服器、zookeeper的配置等;

libs目錄存放的是kafka啟動用到的檔案及依賴;

logs目錄存放的是kafka的執行時日誌檔案;

瞭解完了kafka的檔案目錄,下面進行單機版kafka配置檔案的修改。要修改的檔案是config/server.properties檔案,該檔案比較長,這裡就不貼了,看下幾個必要的配置。這裡有必要提下kafka的配置檔案做的很好,有詳細的解釋還有例子。

broker.id  broker的id或者編號,在叢集中該編號必須唯一

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

listeners  kafka伺服器監聽的埠,該埠也是對外提供服務的埠

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.117.128:9092

num.partitions  topic下分割槽的數量

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

log.dirs  訊息的存放目錄,這裡看配置是日誌的意思,因為kafka把訊息使用日誌的形式儲存,所以這裡不要和kafka的執行日誌相混淆。

# A comma separated list of directories under which to store log files
log.dirs=/home/dev/datas/kafka_2.12-2.8.0/single

log.retentio.hours  訊息儲存的小時數

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

default.replication.factor  訊息的副本數量,這是kafka高可用、資料不丟失的關鍵

default.replication.factor=3

zookeeper.connect  zookeeper的地址

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2182/kafka,localhost:2183/kafka,localhost:2184/kafka

配置完上面的配置項後就可以啟動單機版的kafka了

2.1.3、啟動

有了上面的配置,現在啟動kafka,在kafka的解壓目錄下執行,

bin/kafka-server-start.sh config/server.properties

執行後看到下面的日誌列印,

看到是在前臺啟動的,有沒有辦法後臺啟動那,先停掉kafka,加上-daemon則使用守護執行緒在後臺啟動。

bin/kafka-server-start.sh -daemon config/server.properties 

2.2、叢集版

2.2.1、環境

叢集版的環境和單機版的環境是一樣的,需要zookeeper和JDK。由於叢集版要求必須是2N+1臺機器,我部署最少節點的叢集,在虛擬機器上起3個程序代表3臺機器。

IP id 日誌路徑
本機 1 9093
/home/dev/datas/kafka_2.12-2.8.0/cluster/node1
本機 2 9094
/home/dev/datas/kafka_2.12-2.8.0/cluster/node2
本機 3 9095
/home/dev/datas/kafka_2.12-2.8.0/cluster/node3

2.2.2、修改配置檔案

配置項和單機版的基本相同,選擇在config下cluster資料夾,其下放置3個配置檔案,

每個檔案的內容可參考單機的,這裡不再貼出。

關於日誌路徑也是要事先建好相應的目錄。

2.2.3、啟動

啟動命令和單機的也是相同的,選擇在後臺啟動,每次啟動指定不同的配置檔案,

[dev@localhost kafka_2.12-2.8.0]$ bin/kafka-server-start.sh -daemon config/cluster/server_node3.properties

3、測試

3.1、單機版測試

參見3.2叢集版測試方法

3.2、叢集版測試

3.2.1、通過檢視程序的方式

使用jps命令檢視程序

[dev@localhost kafka_2.12-2.8.0]$ jps

可以看到下面的列印

說明啟動了三個kafka程序

3.2.2、使用客戶端方式

使用kafka自帶的客戶端來建立一個topic

[dev@localhost kafka_2.12-2.8.0]$ bin/kafka-topics.sh --bootstrap-server 192.168.117.128:9093 --create --topic testKafka

看下圖

說明建立成功,現在在另外一個節點上檢視

[dev@localhost kafka_2.12-2.8.0]$ bin/kafka-topics.sh --bootstrap-server 192.168.117.128:9095   --list

看下圖

可以看到剛才建立的testKafka這個topic。

有不正之處,歡迎指正。參考:

https://www.cnblogs.com/zhaoshizi/p/12154518.html

https://www.cnblogs.com/sandea/p/12078442.html