1. 程式人生 > >K8S 搭建 Kafka:2.13-2.6.0 和 Zookeeper:3.6.2 叢集

K8S 搭建 Kafka:2.13-2.6.0 和 Zookeeper:3.6.2 叢集

## 搭建 Kafka:2.13-2.6.0 和 Zookeeper:3.6.2 叢集 ### 一、服務版本資訊: * **Kafka**:v2.13-2.6.0 * **Zookeeper**:v3.6.2 * **Kubernetes**:v1.18.4 ### 二、製作 Zookeeper 映象 Zookeeper 使用的是 docker hub 中提供的官方映象,使用如下命令可以直接下載: ```bash docker pull zookeeper:3.6.2 ``` 由於官方映象中使用的啟動指令碼不適用於我們公司內部使用,所以對其中的 docker-entrypoint.sh 指令碼和 Dockerfile 進行了一些修改。 #### 1. 修改 docker-entrypoint.sh 指令碼 修改後的 docker-entrypoint.sh 指令碼如下(原指令碼內容可參考:https://github.com/31z4/zookeeper-docker/tree/2373492c6f8e74d3c1167726b19babe8ac7055dd/3.6.2): ```bash #!/bin/bash set -e HOST=$(hostname -s) DOMAIN=$(hostname -d) CLIENT_PORT=2181 SERVER_PORT=2888 ELECTION_PORT=3888 function createConfig(){ if [[ ! -f "$ZOO_CONF_DIR/${HOST}/zoo.cfg" ]]; then # 根據傳入的變數建立目錄 mkdir -p $ZOO_CONF_DIR/${HOST} mkdir -p $ZOO_DATA_DIR/${HOST} mkdir -p $ZOO_DATA_LOG_DIR/${HOST} # 向 zoo.cfg 中寫入一些必要的配置項,這些變數是在 Dockerfile 中定義好的,如果需要修改可以在 yaml 檔案中定義 env CONFIG="$ZOO_CONF_DIR/${HOST}/zoo.cfg" { echo "dataDir=$ZOO_DATA_DIR/${HOST}" echo "dataLogDir=$ZOO_DATA_LOG_DIR/${HOST}" echo "tickTime=$ZOO_TICK_TIME" echo "initLimit=$ZOO_INIT_LIMIT" echo "syncLimit=$ZOO_SYNC_LIMIT" echo "autopurge.snapRetainCount=$ZOO_AUTOPURGE_SNAPRETAINCOUNT" echo "autopurge.purgeInterval=$ZOO_AUTOPURGE_PURGEINTERVAL" echo "maxClientCnxns=$ZOO_MAX_CLIENT_CNXNS" echo "standaloneEnabled=$ZOO_STANDALONE_ENABLED" echo "admin.enableServer=$ZOO_ADMINSERVER_ENABLED" } >> ${CONFIG} if [[ -n $ZOO_4LW_COMMANDS_WHITELIST ]]; then echo "4lw.commands.whitelist=$ZOO_4LW_COMMANDS_WHITELIST" >> ${CONFIG} fi # 如果需要新增其他配置項,可以在 yaml 檔案的 env 配置中設定 ZOO_CFG_EXTRA 變數,將額外配置項都寫在這一個變數中 # 需要注意的是,新增額外配置項,value 一定要使用 zookeeper 能識別的名稱,因為下面沒有做任何格式轉換 for cfg_extra_entry in $ZOO_CFG_EXTRA; do echo "$cfg_extra_entry" >> ${CONFIG} done fi } # 由於 sts 是以 “服務名稱-編號” 的格式來命名的 Pod,下面用於獲取主機名中的數字編號和服務的名稱 function getHostNum(){ if [[ $HOST =~ (.*)-([0-9]+)$ ]]; then NAME=${BASH_REMATCH[1]} ORD=${BASH_REMATCH[2]} else echo "Fialed to parse name and ordinal of Pod" exit 1 fi } # 建立 Zookeeper 叢集的 myid,這樣可以確保生成的 myid 是唯一且遞增的 function createID(){ ID_FILE="$ZOO_DATA_DIR/${HOST}/myid" MY_ID=$((ORD+1)) echo $MY_ID > $ID_FILE } # 向配置檔案中寫入各個節點的資訊,這樣叢集才能生效。需要注意的是,一定要向容器中傳入 SERVERS 變數,而且這個變數的值要和副本數保持一致 # 所以後續要擴容節點的時候,只需要更改副本數和 SERVERS 變數的值即可 function addServer(){ for (( i=1; i<=$SERVERS; i++ )) do s="server.$i=$NAME-$((i-1)).$DOMAIN:$SERVER_PORT:$ELECTION_PORT;$CLIENT_PORT" [[ $(grep "$s" $ZOO_CONF_DIR/${HOST}/zoo.cfg) ]] || echo $s >
> $ZOO_CONF_DIR/${HOST}/zoo.cfg done } # 為工作目錄和資料目錄授權,允許使用 --user zookeeper 啟動 function userPerm(){ if [[ "$1" = 'zkServer.sh' && "$(id -u)" = '0' ]]; then chown -R zookeeper "$ZOO_DATA_DIR" "$ZOO_DATA_LOG_DIR" "$ZOO_LOG_DIR" "$ZOO_CONF_DIR" exec gosu zookeeper "$0" "$@" fi } # 啟動 Zookeeper,由於更改了配置檔案的路徑,所以此處一定要使用 --config 選項 # 預設的配置檔案目錄是 ZOO_CONF_DIR=/conf,已經在 Dockerfile 中定義好了,所以如果不更換預設路徑的話,可以將 --config 去掉 function startZK(){ /apache-zookeeper-3.6.2-bin/bin/zkServer.sh --config "$ZOO_CONF_DIR/$(hostname -s)" start-foreground } createConfig getHostNum createID addServer userPerm startZK ``` #### 2. 修改 Dockerfile 我這裡對於 Dockerfile 的改動很小,只是將原來的 ENTRYPOINT 配置項註釋掉,CMD 配置項更改為由 docker-entrypoint.sh 啟動: ```dockerfile FROM openjdk:11-jre-slim ENV ZOO_CONF_DIR=/conf \ ZOO_DATA_DIR=/data \ ZOO_DATA_LOG_DIR=/datalog \ ZOO_LOG_DIR=/logs \ ZOO_TICK_TIME=2000 \ ZOO_INIT_LIMIT=5 \ ZOO_SYNC_LIMIT=2 \ ZOO_AUTOPURGE_PURGEINTERVAL=0 \ ZOO_AUTOPURGE_SNAPRETAINCOUNT=3 \ ZOO_MAX_CLIENT_CNXNS=60 \ ZOO_STANDALONE_ENABLED=true \ ZOO_ADMINSERVER_ENABLED=true # Add a user with an explicit UID/GID and create necessary directories RUN set -eux; \ groupadd -r zookeeper --gid=1000; \ useradd -r -g zookeeper --uid=1000 zookeeper; \ mkdir -p "$ZOO_DATA_LOG_DIR" "$ZOO_DATA_DIR" "$ZOO_CONF_DIR" "$ZOO_LOG_DIR"; \ chown zookeeper:zookeeper "$ZOO_DATA_LOG_DIR" "$ZOO_DATA_DIR" "$ZOO_CONF_DIR" "$ZOO_LOG_DIR" # Install required packges RUN set -eux; \ apt-get update; \ DEBIAN_FRONTEND=noninteractive \ apt-get install -y --no-install-recommends \ ca-certificates \ dirmngr \ gosu \ gnupg \ netcat \ wget; \ rm -rf /var/lib/apt/lists/*; \ # Verify that gosu binary works gosu nobody true ARG GPG_KEY=BBE7232D7991050B54C8EA0ADC08637CA615D22C ARG SHORT_DISTRO_NAME=zookeeper-3.6.2 ARG DISTRO_NAME=apache-zookeeper-3.6.2-bin # Download Apache Zookeeper, verify its PGP signature, untar and clean up RUN set -eux; \ ddist() { \ local f="$1"; shift; \ local distFile="$1"; shift; \ local success=; \ local distUrl=; \ for distUrl in \ 'https://www.apache.org/dyn/closer.cgi?action=download&filename=' \ https://www-us.apache.org/dist/ \ https://www.apache.org/dist/ \ https://archive.apache.org/dist/ \ ; do \ if wget -q -O "$f" "$distUrl$distFile" && [ -s "$f" ]; then \ success=1; \ break; \ fi; \ done; \ [ -n "$success" ]; \ }; \ ddist "$DISTRO_NAME.tar.gz" "zookeeper/$SHORT_DISTRO_NAME/$DISTRO_NAME.tar.gz"; \ ddist "$DISTRO_NAME.tar.gz.asc" "zookeeper/$SHORT_DISTRO_NAME/$DISTRO_NAME.tar.gz.asc"; \ export GNUPGHOME="$(mktemp -d)"; \ gpg --keyserver ha.pool.sks-keyservers.net --recv-key "$GPG_KEY" || \ gpg --keyserver pgp.mit.edu --recv-keys "$GPG_KEY" || \ gpg --keyserver keyserver.pgp.com --recv-keys "$GPG_KEY"; \ gpg --batch --verify "$DISTRO_NAME.tar.gz.asc" "$DISTRO_NAME.tar.gz"; \ tar -zxf "$DISTRO_NAME.tar.gz"; \ mv "$DISTRO_NAME/conf/"* "$ZOO_CONF_DIR"; \ rm -rf "$GNUPGHOME" "$DISTRO_NAME.tar.gz" "$DISTRO_NAME.tar.gz.asc"; \ chown -R zookeeper:zookeeper "/$DISTRO_NAME" WORKDIR $DISTRO_NAME VOLUME ["$ZOO_DATA_DIR", "$ZOO_DATA_LOG_DIR", "$ZOO_LOG_DIR"] EXPOSE 2181 2888 3888 8080 ENV PATH=$PATH:/$DISTRO_NAME/bin \ ZOOCFGDIR=$ZOO_CONF_DIR COPY docker-entrypoint.sh / # 將 ENTRYPOINT 內容註釋 # ENTRYPOINT ["/docker-entrypoint.sh"] # 將原 CMD 註釋,並新增下面的配置 # CMD ["zkServer.sh", "start-foreground"] CMD ["/docker-entrypoint.sh"] ``` #### 3. 打包映象並上傳私服 在 Dockerfile 的根目錄下,使用如下命令打包映象,並修改 tag ```bash docker build --tag 10.16.12.204/ops/zookeeper:custom-v3.6.2 -f Dockerfile . ``` 上傳至映象倉庫: ```bash docker push 10.16.12.204/ops/zookeeper:custom-v3.6.2 ``` ### 三、製作 Kafka 映象 製作 Kafka 映象是基於 docker hub 中 wurstmeister 製作的映象,原映象檔案可使用如下命令下載: ```bash docker pull wurstmeister/kafka:2.13-2.6.0 ``` 這個映象中使用 start-kafka.sh 指令碼來初始化 Kafka 的配置並啟動,但是其中有些內容不符合在 K8S 中部署的需求,所以對該指令碼進行修改。 #### 1. 修改 start-kafka.sh 指令碼 原始的 start-kafka.sh 指令碼內容可到 https://github.com/wurstmeister/kafka-docker 中檢視。修改後的內容如下: ```bash #!/bin/bash -e # Allow specific kafka versions to perform any unique bootstrap operations OVERRIDE_FILE="/opt/overrides/${KAFKA_VERSION}.sh" if [[ -x "$OVERRIDE_FILE" ]]; then echo "Executing override file $OVERRIDE_FILE" eval "$OVERRIDE_FILE" fi # Store original IFS config, so we can restore it at various stages ORIG_IFS=$IFS # 設定 zookeeper 連線地址,如果沒有指定該變數會報錯 if [[ -z "$KAFKA_ZOOKEEPER_CONNECT" ]]; then echo "ERROR: missing mandatory config: KAFKA_ZOOKEEPER_CONNECT" exit 1 fi # 設定 kafka 的埠,如果沒有指定埠則使用預設埠 if [[ -z "$KAFKA_PORT" ]]; then export KAFKA_PORT=9092 fi # kafka 啟動後自動建立 topic,如果沒有指定 KAFKA_CREATE_TOPICS 則不會自動建立 topic create-topics.sh & unset KAFKA_CREATE_TOPICS # 如果沒有直接指定 KAFKA_BROKER_ID,則通過 BROKER_ID_COMMAND 變數中包含的命令來自動生成 broker id,這樣可以確保 broker id 是唯一且遞增的 if [[ -z "$KAFKA_BROKER_ID" ]]; then if [[ -n "$BROKER_ID_COMMAND" ]]; then KAFKA_BROKER_ID=$(eval "$BROKER_ID_COMMAND") export KAFKA_BROKER_ID else export KAFKA_BROKER_ID=-1 fi fi # 如果沒有指定 kafka log 目錄,則使用預設的地址,預設的目錄名會帶有當前主機名 if [[ -z "$KAFKA_LOG_DIRS" ]]; then export KAFKA_LOG_DIRS="/kafka/kafka-logs-$HOSTNAME" fi # 如果指定了 KAFKA_HEAP_OPTS 配置,將其寫入到 kafka-server-start.sh 指令碼中 if [[ -n "$KAFKA_HEAP_OPTS" ]]; then sed -r -i 's/(export KAFKA_HEAP_OPTS)="(.*)"/\1="'"$KAFKA_HEAP_OPTS"'"/g' "$KAFKA_HOME/bin/kafka-server-start.sh" unset KAFKA_HEAP_OPTS fi # 此處的作用是如果希望容器在啟動後根據執行指定命令的返回結果作為主機名,那麼就將這個命令賦值給 HOSTNAME_COMMAND # 然後使用 eval 來執行變數中的命令來獲取結果,再賦值給 HOSTNAME_VALUE 變數 if [[ -n "$HOSTNAME_COMMAND" ]]; then HOSTNAME_VALUE=$(eval "$HOSTNAME_COMMAND") # Replace any occurences of _{HOSTNAME_COMMAND} with the value IFS=$'\n' for VAR in $(env); do if [[ $VAR =~ ^KAFKA_ && "$VAR" =~ "_{HOSTNAME_COMMAND}" ]]; then eval "export ${VAR//_\{HOSTNAME_COMMAND\}/$HOSTNAME_VALUE}" fi done IFS=$ORIG_IFS fi # 此處的作用是如果希望容器在啟動後根據執行指定命令的返回結果作為埠號,那麼就將這個命令賦值給 PORT_COMMAND # 然後使用 eval 來執行變數中的命令來獲取結果,再賦值給 PORT_VALUE 變數 if [[ -n "$PORT_COMMAND" ]]; then PORT_VALUE=$(eval "$PORT_COMMAND") # Replace any occurences of _{PORT_COMMAND} with the value IFS=$'\n' for VAR in $(env); do if [[ $VAR =~ ^KAFKA_ && "$VAR" =~ "_{PORT_COMMAND}" ]]; then eval "export ${VAR//_\{PORT_COMMAND\}/$PORT_VALUE}" fi done IFS=$ORIG_IFS fi if [[ -n "$RACK_COMMAND" && -z "$KAFKA_BROKER_RACK" ]]; then KAFKA_BROKER_RACK=$(eval "$RACK_COMMAND") export KAFKA_BROKER_RACK fi # 這裡是檢查是否設定了 KAFKA_LISTENERS 變數,一般將其值設定為 PLAINTEXT://:9092 if [[ -z "$KAFKA_ADVERTISED_HOST_NAME$KAFKA_LISTENERS" ]]; then if [[ -n "$KAFKA_ADVERTISED_LISTENERS" ]]; then echo "ERROR: Missing environment variable KAFKA_LISTENERS. Must be specified when using KAFKA_ADVERTISED_LISTENERS" exit 1 elif [[ -z "$HOSTNAME_VALUE" ]]; then echo "ERROR: No listener or advertised hostname configuration provided in environment." echo " Please define KAFKA_LISTENERS / (deprecated) KAFKA_ADVERTISED_HOST_NAME" exit 1 fi # Maintain existing behaviour # If HOSTNAME_COMMAND is provided, set that to the advertised.host.name value if listeners are not defined. export KAFKA_ADVERTISED_HOST_NAME="$HOSTNAME_VALUE" fi #Issue newline to config file in case there is not one already echo "" >
> "$KAFKA_HOME/config/server.properties" ( function updateConfig() { key=$1 value=$2 file=$3 # Omit $value here, in case there is sensitive information echo "[Configuring] '$key' in '$file'" # If config exists in file, replace it. Otherwise, append to file. if grep -E -q "^#?$key=" "$file"; then sed -r -i "s@^#?$key=.*@$key=$value@g" "$file" #note that no config values may contain an '@' char else echo "$key=$value" >
> "$file" fi } # KAFKA_VERSION + KAFKA_HOME + grep -rohe KAFKA[A-Z0-0_]* /opt/kafka/bin | sort | uniq | tr '\n' '|' # 定義要排除的初始化配置,這些配置已經在配置檔案中存在了,所以不需要更改或新增 EXCLUSIONS="|KAFKA_VERSION|KAFKA_HOME|KAFKA_DEBUG|KAFKA_GC_LOG_OPTS|KAFKA_HEAP_OPTS|KAFKA_JMX_OPTS|KAFKA_JVM_PERFORMANCE_OPTS|KAFKA_LOG|KAFKA_OPTS|" IFS=$'\n' for VAR in $(env) do env_var=$(echo "$VAR" | cut -d= -f1) if [[ "$EXCLUSIONS" = *"|$env_var|"* ]]; then echo "Excluding $env_var from broker config" continue fi if [[ $env_var =~ ^KAFKA_ ]]; then kafka_name=$(echo "$env_var" | cut -d_ -f2- | tr '[:upper:]' '[:lower:]' | tr _ .) updateConfig "$kafka_name" "${!env_var}" "$KAFKA_HOME/config/server.properties" fi if [[ $env_var =~ ^LOG4J_ ]]; then log4j_name=$(echo "$env_var" | tr '[:upper:]' '[:lower:]' | tr _ .) updateConfig "$log4j_name" "${!env_var}" "$KAFKA_HOME/config/log4j.properties" fi done # 主要是添加了這裡的配置,根據 SERVERS 的值,拼接出 BOOTSTRAP_SERVERS 的地址,並將該配置更新到配置檔案中 PODNAME=$(hostname -s | awk -F'-' 'OFS="-"{$NF="";print}' |sed 's/-$//g') for ((i=0;i<$SERVERS;i++)) do BOOTSTRAP_SERVERS+="$PODNAME-$i.$(hostname -d):${KAFKA_PORT}," done BOOTSTRAP_SERVERS=${BOOTSTRAP_SERVERS%?} echo ${BOOTSTRAP_SERVERS} > /opt/log.txt sed -i "s/bootstrap.servers.*$/bootstrap.servers=$BOOTSTRAP_SERVERS/g" $KAFKA_HOME/config/consumer.properties sed -i "s/bootstrap.servers.*$/bootstrap.servers=$BOOTSTRAP_SERVERS/g" $KAFKA_HOME/config/producer.properties ) # 如果還定義了其他初始化的配置指令碼,則執行 if [[ -n "$CUSTOM_INIT_SCRIPT" ]] ; then eval "$CUSTOM_INIT_SCRIPT" fi exec "$KAFKA_HOME/bin/kafka-server-start.sh" "$KAFKA_HOME/config/server.properties" ``` #### 2. 修改 Dockerfile Dockerfile 未做其他修改,只是將修改後的 start-kafka.sh 指令碼新增到映象中,並使用 bash 環境來執行指令碼(否則會有些命令無法執行): ```dockerfile FROM wurstmeister/kafka:2.13-2.6.0 ADD start-kafka.sh / CMD ["bash","start-kafka.sh"] ``` #### 3. 打包映象並上傳私服 使用如下命令重新打包映象並修改 tag: ```bash docker build --tag 10.16.12.204/ops/kafka:custom-v2.13-2.6.0 -f Dockerfile . ``` 將映象上傳至映象倉庫: ```bash docker push 10.16.12.204/ops/kafka:custom-v2.13-2.6.0 ``` ### 四、建立名稱空間 整個 Kafka 和 Zookeeper 叢集都要在同一個名稱空間下,所以使用如下 yaml 檔案建立 ns-kafka 名稱空間: ```yaml --- apiVersion: v1 kind: Namespace metadata: name: ns-kafka labels: name: ns-kafka ``` ### 五、建立 Secret Kubelet 到映象倉庫中拉取映象需要進行驗證,所以建立一個用於驗證 Harbor 倉庫的 Secret: ```bash kubectl create secret docker-registry harbor-secret --namespace=ns-kafka --docker-server=http://10.16.12.204 --docker-username=admin --docker-password=Harbor12345 ``` ### 六、建立 PV 和 PVC 在此次搭建叢集的過程中,計劃讓 Kafka 叢集和 Zookeeper 叢集使用同一個 PV。在前面定義 Pod 初始化指令碼時可以看到,Kafka 和 Zookeeper 中的資料目錄以及日誌目錄,都是在以自己主機名命名的目錄下,所以即便使用同一個 PV,也可以對目錄進行區分。建立 PV 和 PVC 的 yaml 檔案內容如下: ```yaml --- apiVersion: v1 kind: PersistentVolume metadata: name: kafka-data-pv spec: accessModes: - ReadWriteMany capacity: storage: 500Gi local: path: /opt/ops_ceph_data/kafka_data nodeAffinity: required: nodeSelectorTerms: - matchExpressions: - key: kafka-cluster operator: In values: - "true" persistentVolumeReclaimPolicy: Retain --- kind: PersistentVolumeClaim apiVersion: v1 metadata: name: kafka-data-pvc namespace: ns-kafka spec: accessModes: - ReadWriteMany resources: requests: storage: 500Gi ``` > 需要宣告的一點是,我當前使用的儲存是 cephfs,並將其掛載到 K8S 的各個節點的 /opt/ops_ceph_data 目錄下,所以在建立 PV 的時候使用的儲存型別是 local。 ### 七、建立 Labels 由於上面建立 PV 時指定的儲存型別是 local,這個 PV 只能在滿足指定 Label 的節點中進行排程,所以為叢集中的所有節點新增一個 label: ```bash for i in 1 2 3 4 5; do kubectl label nodes k8s-node${i} kafka-cluster=true; done ``` ### 八、建立 Zookeeper 叢集 #### 1. 建立 Service 建立用於 Zookeeper 與其他節點通訊的 Service,yaml 檔案內容如下: ```yaml --- apiVersion: v1 kind: Service metadata: name: zk-inner-service namespace: ns-kafka labels: app: zk spec: selector: app: zk clusterIP: None ports: - name: server port: 2888 - name: leader-election port: 3888 --- apiVersion: v1 kind: Service metadata: name: zk-client-service namespace: ns-kafka labels: app: zk spec: selector: app: zk type: NodePort ports: - name: client port: 2181 nodePort: 31811 ``` #### 2. 建立 StatefulSet Zookeeper 屬於有狀態服務,所以要使用 StatefulSet 來部署,yaml 檔案內容如下: ```yaml --- apiVersion: apps/v1 kind: StatefulSet metadata: name: zk namespace: ns-kafka spec: selector: matchLabels: app: zk serviceName: "zk-inner-service" replicas: 3 updateStrategy: type: RollingUpdate podManagementPolicy: Parallel template: metadata: labels: app: zk spec: containers: - name: zk imagePullPolicy: Always image: 10.16.12.204/ops/zookeeper:custom-v3.6.2 resources: requests: memory: "500Mi" cpu: "0.5" ports: - containerPort: 2181 name: client - containerPort: 2888 name: server - containerPort: 3888 name: leader-election env: - name: SERVERS # 設定 SERVERS 變數,一定要與副本數一致 value: "3" - name: ZOO_CONF_DIR # 設定配置檔案的目錄 value: /opt/conf - name: ZOO_DATA_DIR # 設定資料檔案的目錄 value: /opt/data - name: ZOO_DATA_LOG_DIR # 設定資料日誌檔案的目錄 value: /opt/data_log volumeMounts: # 設定需要持久化儲存資料的目錄 - name: zookeeper-data mountPath: /opt/data subPath: zookeeper-cluster-data/data - name: zookeeper-data mountPath: /opt/data_log subPath: zookeeper-cluster-data/data_log - name: data-conf mountPath: /etc/localtime imagePullSecrets: - name: harbor-secret volumes: - name: zookeeper-data persistentVolumeClaim: claimName: kafka-data-pvc - name: data-conf hostPath: path: /usr/share/zoneinfo/Asia/Shanghai ``` #### 3. 驗證叢集狀態 叢集搭建完成後,檢視 zookeeper 各個節點當前的狀態,使用如下命令: ```bash [@k8s-master1 /]# for i in 0 1 2; do kubectl exec -it zk-$i -n ns-kafka -- zkServer.sh --config /opt/conf/zk-$i status; done ZooKeeper JMX enabled by default Using config: /opt/conf/zk-0/zoo.cfg Client port found: 2181. Client address: localhost. Client SSL: false. Mode: follower ZooKeeper JMX enabled by default Using config: /opt/conf/zk-1/zoo.cfg Client port found: 2181. Client address: localhost. Client SSL: false. Mode: leader ZooKeeper JMX enabled by default Using config: /opt/conf/zk-2/zoo.cfg Client port found: 2181. Client address: localhost. Client SSL: false. Mode: follower ``` 可以看到當前叢集中是一個 leader,兩個follower。接下來驗證叢集各個節點的訊息同步,首先在 zk-0 節點上建立一個資訊: ```bash [@k8s-master1 /]# kubectl exec -it zk-0 -n ns-kafka -- zkCli.sh [zk: localhost:2181(CONNECTED) 0] create /testMessage Hello Created /testMessage ``` 在其他兩個節點上檢視這條訊息: ```bash [@k8s-master1 /]# kubectl exec -it zk-1 -n ns-kafka -- zkCli.sh [zk: localhost:2181(CONNECTED) 0] get /testMessage Hello [@k8s-master1 /]# kubectl exec -it zk-2 -n ns-kafka -- zkCli.sh [zk: localhost:2181(CONNECTED) 0] get /testMessage Hello ``` 可以正常看到訊息,代表叢集當前執行正常。 ### 九、建立 Kafka 叢集 #### 1. 建立 Service 建立用於 Kafka 通訊的 Service,yaml 檔案內容如下: ```yaml --- apiVersion: v1 kind: Service metadata: name: kafka-service namespace: ns-kafka labels: app: kafka spec: ports: - port: 9092 name: server clusterIP: None selector: app: kafka ``` #### 2. 建立 StatefulSet Kafka 屬於有狀態服務,所以要使用 StatefulSet 來部署,yaml 檔案內容如下: ```yaml --- apiVersion: apps/v1 kind: StatefulSet metadata: name: kafka namespace: ns-kafka spec: selector: matchLabels: app: kafka serviceName: "kafka-service" replicas: 3 updateStrategy: type: RollingUpdate podManagementPolicy: Parallel template: metadata: labels: app: kafka spec: imagePullSecrets: - name: harbor-secret containers: - name: kafka imagePullPolicy: Always image: 10.16.12.204/ops/kafka:custom-v2.13-2.6.0 resources: requests: memory: "500Mi" cpu: "0.5" env: - name: SERVERS # 要確保 SERVERS 設定的值與副本數一致 value: "3" - name: KAFKA_LISTENERS value: "PLAINTEXT://:9092" - name: KAFKA_ZOOKEEPER_CONNECT # 設定 Zookeeper 連線地址 value: "zk-inner-service.ns-kafka.svc.cluster.local:2181" - name: KAFKA_PORT value: "9092" - name: KAFKA_MESSAGE_MAX_BYTES value: "20000000" - name: BROKER_ID_COMMAND # 這個變數用於在容器內部生成一個 broker id value: "hostname | awk -F'-' '{print $NF}'" volumeMounts: - name: kafka-log # 只需要將 kafka 的 log 目錄持久化儲存 mountPath: /kafka subPath: kafka-cluster-log - name: data-conf mountPath: /etc/localtime volumes: - name: kafka-log persistentVolumeClaim: claimName: kafka-data-pvc - name: data-conf hostPath: path: /usr/share/zoneinfo/Asia/Shanghai ``` #### 3. 驗證叢集狀態 ##### 3.1 在 Zookeeper 中檢視 broker ```bash [@k8s-master1 ~]# kubectl exec -it zk-0 -n ns-kafka -- zkCli.sh Connecting to localhost:2181 [zk: localhost:2181(CONNECTED) 0] ls / [admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper] [zk: localhost:2181(CONNECTED) 1] ls /brokers [ids, seqid, topics] [zk: localhost:2181(CONNECTED) 2] ls /brokers/ids [0, 1, 2] [zk: localhost:2181(CONNECTED) 3] get /brokers/ids/0 {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-0.kafka-service.ns-kafka.svc.cluster.local:9092"],"jmx_port":-1,"port":9092,"host":"kafka-0.kafka-service.ns-kafka.svc.cluster.local","version":4,"timestamp":"1604644074102"} [zk: localhost:2181(CONNECTED) 4] get /brokers/ids/1 {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-1.kafka-service.ns-kafka.svc.cluster.local:9092"],"jmx_port":-1,"port":9092,"host":"kafka-1.kafka-service.ns-kafka.svc.cluster.local","version":4,"timestamp":"1604644074079"} [zk: localhost:2181(CONNECTED) 5] get /brokers/ids/2 {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-2.kafka-service.ns-kafka.svc.cluster.local:9092"],"jmx_port":-1,"port":9092,"host":"kafka-2.kafka-service.ns-kafka.svc.cluster.local","version":4,"timestamp":"1604644074009"} ``` 可以看到 3 個 broker 都已經在 zookeeper 中註冊了。 ##### 3.2 建立 Topic 在 kafka-0 節點中建立一個名為 Message 的 topic,3個分割槽,3個副本: ```bash [@k8s-master1 ~]# kubectl exec -it kafka-0 -n ns-kafka -- /bin/bash bash-4.4# kafka-topics.sh --create --topic Message --zookeeper zk-inner-service.ns-kafka.svc.cluster.local:2181 --partitions 3 --replication-factor 3 Created topic Message. ``` 在 zk-1 節點中檢視是否存在這個 Topic: ```bash [@k8s-master1 ~]# kubectl exec -it zk-1 -n ns-kafka -- zkCli.sh Connecting to localhost:2181 [zk: localhost:2181(CONNECTED) 0] ls / [admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper] [zk: localhost:2181(CONNECTED) 1] ls /brokers [ids, seqid, topics] [zk: localhost:2181(CONNECTED) 3] ls /brokers/topics [Message] ``` 可以看到 Zookeeper 中已經存在這個 Topic 了。 ##### 3.3 模擬生產者和消費者 首先 在 kafka-1 上模擬生產者向 Message 中寫入訊息: ```bash [@k8s-master1 ~]# kubectl exec -it kafka-1 -n ns-kafka -- /bin/bash bash-4.4# kafka-console-producer.sh --topic Message --broker-list kafka-0.kafka-service.ns-kafka.svc.cluster.local:9092,kafka-1.kafka-service.ns-kafka.svc.cluster.local:9092,kafka-2.kafka-service.ns-kafka.svc.cluster.local:9092 >This is a test message >Welcome to Kafka ``` 然後在 kafka-2 中模擬消費者消費這些資訊: ```bash [@k8s-master1 ~]# kubectl exec -it kafka-2 -n ns-kafka -- /bin/bash bash-4.4# kafka-console-consumer.sh --topic Message --bootstrap-server kafka-0.kafka-service.ns-kafka.svc.cluster.local:9092,kafka-1.kafka-service.ns-kafka.svc.cluster.local:9092,kafka-2.kafka-service.ns-kafka.svc.cluster.local:9092 --from-beginning This is a test message Welcome to Kafka ``` 可以正常生產訊息和消費訊息,代表 Kafka 叢集執行正常。 ### 十、FAQ #### 1. 如何在 yaml 檔案中指定要建立的 Topic 在 yaml 檔案中指定如下 env,即可在 Pod 啟動自動建立 Topic: ```yaml env: - name: KAFKA_CREATE_TOPICS value: "Topic1:1:3,Topic2:1:1:compact" ``` 上面的內容代表 Topic1 會有 1 個分割槽,3個副本,Topic2 會有 1 個分割槽,1 個副本並且副本的 cleanup.policy 設定為 compact。 > 自動建立 Topic 一定要設定 KAFKA_CREATE_TOPICS 變數,然後會由 create_topic.sh 指令碼(映象中存在)根據變數內容自動建立。 #### 2. 為 Topic 設定的 compaction 不生效 可參考網址:https://github.com/wurstmeister/kafka-docker/wiki#topic-compaction-does-