1. 程式人生 > >Kubernetes部署Kafka集群

Kubernetes部署Kafka集群

image peer 兩個 boot client eat won 進入容器 flow

主要參考了https://stackoverflow.com/questions/44651219/kafka-deployment-on-minikube和https://github.com/ramhiser/kafka-kubernetes兩個項目,但是這兩個項目都是單節點的Kafka,我這裏嘗試將單節點Kafka擴展為多節點的Kafka集群。

一、單節點Kafka

要搭建Kafka集群,還是要從單節點開始。

1.創建Zookeeper服務zookeeper-svc.yaml和zookeeper-deployment.yaml,用kubectl create -f創建:

apiVersion: v1
kind: Service
metadata:
  labels:
    app: zookeeper-service
  name: zookeeper-service
spec:
  ports:
  - name: zookeeper-port
    port: 2181
    targetPort: 2181
  selector:
    app: zookeeper

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  labels:
    app: zookeeper
  name: zookeeper
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: zookeeper
    spec:
      containers:
      - image: wurstmeister/zookeeper
        imagePullPolicy: IfNotPresent
        name: zookeeper
        ports:
        - containerPort: 2181

2.等pod跑起來,service的endpoint配置成功後,就可以繼續創建kafka的kafka-svc.yaml和kafka-deployment.yaml了:

apiVersion: v1
kind: Service
metadata:
  name: kafka-service
  labels:
    app: kafka
spec:
  type: NodePort
  ports:
  - port: 9092
    name: kafka-port
    targetPort: 9092
    nodePort: 30092
    protocol: TCP
  selector:
    app: kafka

kind: Deployment
apiVersion: extensions/v1beta1
metadata:
  name: kafka-deployment
spec:
  replicas: 1
  selector:
    matchLabels:
      name: kafka
  template:
    metadata:
      labels:
        name: kafka
        app: kafka
    spec:
      containers:
      - name: kafka
        image: wurstmeister/kafka
        imagePullPolicy: IfNotPresent
        ports:
        - containerPort: 9092
        env:
        - name: KAFKA_ADVERTISED_PORT
          value: "9092"
        - name: KAFKA_ADVERTISED_HOST_NAME
          value: "[kafka的service的clusterIP]"
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: [zookeeper的service的clusterIP]:2181
        - name: KAFKA_BROKER_ID
          value: "1"

clusterIP通過kubectl get svc進行查看。KAFKA_ZOOKEEPER_CONNECT的值也可以改為zookeeper-service:2181。

3.創建後,需要對服務進行測試。參考了https://blog.csdn.net/boling_cavalry/article/details/78309050的方法。

在此之前,針對虛擬化的Kafka,需要先執行下面的命令以進入容器:

kubectl exec -it [Kafka的pod名稱] /bin/bash

進入容器後,Kafka的命令存儲在opt/kafka/bin目錄下,用cd命令進入:

cd opt/kafka/bin

後面的操作就跟上面的博客中寫的類似了。針對單節點Kafka,需要將同一個節點作為生產者和消費者。執行命令如下:

kafka-console-producer.sh --broker-list [kafka的service的clusterIP]:9092 --topic test

運行正常的話,下方會出現>標記以提示輸入消息。這樣這個終端就成為了生產者。

另外打開一個linux終端,執行相同的命令進入容器。這次將這個終端作為消費者。註意,上面的博客中寫的創建消費者的方法在新版的Kafka中已經改變,需要執行下面的命令:

kafka-console-consumer.sh --bootstrap-server [kafka的service的clusterIP]:9092 --topic test --from-beginning

之後,在生產者輸入信息,查看消費者是否能夠接收到。如果接收到,說明運行成功。

最後,還可以執行下面的命令以測試列出所有的消息主題:

kafka-topics.sh --list --zookeeper [zookeeper的service的clusterIP]:2181

註意,有時需要用Kafka的端口,有時需要用Zookeeper的端口,應註意區分。

二、多節點Kafka集群

單節點服務運行成功後,就可以嘗試增加Kafka的節點以建立集群。我的Kubernetes集群包含3個節點,所以我搭建的Kafka集群也包含3個節點,分別運行在三臺機器上。

1.搭建Zookeeper集群

創建zookeeper的yaml文件zookeeper-svc2.yaml和zookeeper-deployment2.yaml如下:

apiVersion: v1
kind: Service
metadata:
  name: zoo1
  labels:
    app: zookeeper-1
spec:
  ports:
  - name: client
    port: 2181
    protocol: TCP
  - name: follower
    port: 2888
    protocol: TCP
  - name: leader
    port: 3888
    protocol: TCP
  selector:
    app: zookeeper-1
---
apiVersion: v1
kind: Service
metadata:
  name: zoo2
  labels:
    app: zookeeper-2
spec:
  ports:
  - name: client
    port: 2181
    protocol: TCP
  - name: follower
    port: 2888
    protocol: TCP
  - name: leader
    port: 3888
    protocol: TCP
  selector:
    app: zookeeper-2
---
apiVersion: v1
kind: Service
metadata:
  name: zoo3
  labels:
    app: zookeeper-3
spec:
  ports:
  - name: client
    port: 2181
    protocol: TCP
  - name: follower
    port: 2888
    protocol: TCP
  - name: leader
    port: 3888
    protocol: TCP
  selector:
    app: zookeeper-3

kind: Deployment
apiVersion: extensions/v1beta1
metadata:
  name: zookeeper-deployment-1
spec:
  replicas: 1
  selector:
    matchLabels:
      app: zookeeper-1
      name: zookeeper-1
  template:
    metadata:
      labels:
        app: zookeeper-1
        name: zookeeper-1
    spec:
      containers:
      - name: zoo1
        image: digitalwonderland/zookeeper
        imagePullPolicy: IfNotPresent
        ports:
        - containerPort: 2181
        env:
        - name: ZOOKEEPER_ID
          value: "1"
        - name: ZOOKEEPER_SERVER_1
          value: zoo1
        - name: ZOOKEEPER_SERVER_2
          value: zoo2
        - name: ZOOKEEPER_SERVER_3
          value: zoo3
---
kind: Deployment
apiVersion: extensions/v1beta1
metadata:
  name: zookeeper-deployment-2
spec:
  replicas: 1
  selector:
    matchLabels:
      app: zookeeper-2
      name: zookeeper-2
  template:
    metadata:
      labels:
        app: zookeeper-2
        name: zookeeper-2
    spec:
      containers:
      - name: zoo2
        image: digitalwonderland/zookeeper
        imagePullPolicy: IfNotPresent
        ports:
        - containerPort: 2181
        env:
        - name: ZOOKEEPER_ID
          value: "2"
        - name: ZOOKEEPER_SERVER_1
          value: zoo1
        - name: ZOOKEEPER_SERVER_2
          value: zoo2
        - name: ZOOKEEPER_SERVER_3
          value: zoo3
---
kind: Deployment
apiVersion: extensions/v1beta1
metadata:
  name: zookeeper-deployment-3
spec:
  replicas: 1
  selector:
    matchLabels:
      app: zookeeper-3
      name: zookeeper-3
  template:
    metadata:
      labels:
        app: zookeeper-3
        name: zookeeper-3
    spec:
      containers:
      - name: zoo3
        image: digitalwonderland/zookeeper
        imagePullPolicy: IfNotPresent
        ports:
        - containerPort: 2181
        env:
        - name: ZOOKEEPER_ID
          value: "3"
        - name: ZOOKEEPER_SERVER_1
          value: zoo1
        - name: ZOOKEEPER_SERVER_2
          value: zoo2
        - name: ZOOKEEPER_SERVER_3
          value: zoo3

這裏創建了3個deployment和3個service,一一對應。這樣,三個實例都可以對外提供服務。

創建完成後,需要用kubectl logs查看一下三個Zookeeper的pod的日誌,確保沒有錯誤發生,並且在3個節點的日誌中,有類似下面的語句,則表明Zookeeper集群已順利搭建成功。

2016-10-06 14:04:05,904 [myid:2] - INFO [QuorumPeer[myid=2]/0:0:0:0:0:0:0:0:2181:Leader@358] - LEADING - 
LEADER ELECTION TOOK - 2613

2.搭建Kafka集群

同樣創建3個deployment和3個service,編寫kafka-svc2.yaml和kafka-deployment2.yaml如下:

apiVersion: v1
kind: Service
metadata:
  name: kafka-service-1
  labels:
    app: kafka-service-1
spec:
  type: NodePort
  ports:
  - port: 9092
    name: kafka-service-1
    targetPort: 9092
    nodePort: 30901
    protocol: TCP
  selector:
    app: kafka-service-1
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-service-2
  labels:
    app: kafka-service-2
spec:
  type: NodePort
  ports:
  - port: 9092
    name: kafka-service-2
    targetPort: 9092
    nodePort: 30902
    protocol: TCP
  selector:
    app: kafka-service-2
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-service-3
  labels:
    app: kafka-service-3
spec:
  type: NodePort
  ports:
  - port: 9092
    name: kafka-service-3
    targetPort: 9092
    nodePort: 30903
    protocol: TCP
  selector:
    app: kafka-service-3

kind: Deployment
apiVersion: extensions/v1beta1
metadata:
  name: kafka-deployment-1
spec:
  replicas: 1
  selector:
    matchLabels:
      name: kafka-service-1
  template:
    metadata:
      labels:
        name: kafka-service-1
        app: kafka-service-1
    spec:
      containers:
      - name: kafka-1
        image: wurstmeister/kafka
        imagePullPolicy: IfNotPresent
        ports:
        - containerPort: 9092
        env:
        - name: KAFKA_ADVERTISED_PORT
          value: "9092"
        - name: KAFKA_ADVERTISED_HOST_NAME
          value: [kafka-service1的clusterIP]
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zoo1:2181,zoo2:2181,zoo3:2181
        - name: KAFKA_BROKER_ID
          value: "1"
        - name: KAFKA_CREATE_TOPICS
          value: mytopic:2:1
---
kind: Deployment
apiVersion: extensions/v1beta1
metadata:
  name: kafka-deployment-2
spec:
  replicas: 1
  selector:
  selector:
    matchLabels:
      name: kafka-service-2
  template:
    metadata:
      labels:
        name: kafka-service-2
        app: kafka-service-2
    spec:
      containers:
      - name: kafka-2
        image: wurstmeister/kafka
        imagePullPolicy: IfNotPresent
        ports:
        - containerPort: 9092
        env:
        - name: KAFKA_ADVERTISED_PORT
          value: "9092"
        - name: KAFKA_ADVERTISED_HOST_NAME
          value: [kafka-service2的clusterIP]
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zoo1:2181,zoo2:2181,zoo3:2181
        - name: KAFKA_BROKER_ID
          value: "2"
---
kind: Deployment
apiVersion: extensions/v1beta1
metadata:
  name: kafka-deployment-3
spec:
  replicas: 1
  selector:
  selector:
    matchLabels:
      name: kafka-service-3
  template:
    metadata:
      labels:
        name: kafka-service-3
        app: kafka-service-3
    spec:
      containers:
      - name: kafka-3
        image: wurstmeister/kafka
        imagePullPolicy: IfNotPresent
        ports:
        - containerPort: 9092
        env:
        - name: KAFKA_ADVERTISED_PORT
          value: "9092"
        - name: KAFKA_ADVERTISED_HOST_NAME
          value: [kafka-service3的clusterIP]
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zoo1:2181,zoo2:2181,zoo3:2181
        - name: KAFKA_BROKER_ID
          value: "3"

在deployment1中執行了創建一個新topic的操作。

3.測試

測試方法基本同單集群的情況,這裏就不贅述了。不同的是,這次可以將不同的節點作為生產者和消費者。

至此,Kubernetes的Kafka集群搭建就大功告成了!

Kubernetes部署Kafka集群