1. 程式人生 > >Kubernetes(k8s)中文文件 示例: 分散式任務佇列 Celery, RabbitMQ和Flower_Kubernetes中文社群

Kubernetes(k8s)中文文件 示例: 分散式任務佇列 Celery, RabbitMQ和Flower_Kubernetes中文社群

譯者:White

介紹

Celery是基於分散式訊息傳遞的非同步任務佇列。它可以用來建立一些執行單元(例如,一個任務),這些任務可以同步,非同步的在一個或者多個工作節點執行。

Celery基於Python實現。

因為Celery基於訊息傳遞,需要一些叫作訊息代理的中介軟體(他們用來在傳送者和接受者之間處理傳遞的訊息)。RabbitMQ是一種和Celery聯合使用的訊息中介軟體。

下面的示例將向你展示,如何使用Kubernetes來建立一個基於Celery作為任務佇列,RabbitMQ作為訊息代理的分散式任務佇列系統。同時,還要展示如何建立一個基於Flower的任務監控前端。

目標

在例子的最後,我們可以看到:

  • 3個pods:
    • 一個Celery任務佇列
    • 一個RabbitMQ訊息代理
    • Flower前端
  • 一個提供訪問訊息代理的服務
  • 可以傳遞給工作節點的級別Celery任務

先決條件

你應該已經擁有一個Kubernetes叢集。要完成大部分的例子,確保Kubernetes建立一個以上的節點(例如,通過設定NUM_MINIONS環境變數為2或者更多)。

第一步:啟動RabbitMQ服務

Celery任務佇列需要連線到RabbitMQ代理。RabbitMQ佇列最終會出現在一個獨立的pod上,但是,由於pod是短暫存在的,需要一個服務來透明的路由請求到RabbitMQ。

使用這個檔案examples/celery-rabbitmq/rabbitmq-service.yaml

apiVersion: v1
kind: Service
metadata:
  labels:
    name: rabbitmq
  name: rabbitmq-service
spec:
  ports:
  - port: 5672
  selector:
    app: taskQueue
    component: rabbitmq

這樣執行一個服務:

$ kubectl create -f examples/celery-rabbitmq/rabbitmq-service.yaml

這個服務允許其他pods連線到rabbitmq。對於它們可以使用5672埠,服務也會將流量路由到容器(也通過5672埠)。

第二步:啟動RabbitMQ

RabbitMQ代理可以通過這個檔案啟動examples/celery-rabbitmq/rabbitmq-controller.yaml:

apiVersion: v1
kind: ReplicationController
metadata:
  labels:
    name: rabbitmq
  name: rabbitmq-controller
spec:
  replicas: 1
  selector:
    component: rabbitmq
  template:
    metadata:
      labels:
        app: taskQueue
        component: rabbitmq
    spec:
      containers:
      - image: rabbitmq
        name: rabbitmq
        ports:
        - containerPort: 5672
        resources:
          limits:
            cpu: 100m

執行$ kubectl create -f examples/celery-rabbitmq/rabbitmq-controller.yaml這個命令來建立副本控制器,確保當一個RabbitMQ例項執行時,一個pod已經存在。

請注意建立這個pod需要一些時間來拉取一個docker映象。在這個例子中,這些操作也適用於其他pods。

第三步:啟動Celery

通過$ kubectl create -f examples/celery-rabbitmq/celery-controller.yaml來建立一個celery worker,檔案如下:

apiVersion: v1
kind: ReplicationController
metadata:
  labels:
    name: celery
  name: celery-controller
spec:
  replicas: 1
  selector:
    component: celery
  template:
    metadata:
      labels:
        app: taskQueue
        component: celery
    spec:
      containers:
      - image: endocode/celery-app-add
        name: celery
        ports:
        - containerPort: 5672
        resources:
          limits:
            cpu: 100m

一些地方需要指出…:

像RabbitMQ控制器,需要確保總是有一個pod執行著Celery worker例項。

celery-app-add這個映象是對標準的Celery Docker映象的擴充套件。Dockerfile如下:

FROM library/celery

ADD celery_conf.py /data/celery_conf.py
ADD run_tasks.py /data/run_tasks.py
ADD run.sh /usr/local/bin/run.sh

ENV C_FORCE_ROOT 1

CMD ["/bin/bash", "/usr/local/bin/run.sh"]

celery_conf.py檔案包含了一個簡單的celery加法運算任務。最後一行啟動Celery worker。

注意:ENV C_FORCE_ROOT 1用來確保Celery以root使用者身份執行,不提倡在生產環境中採用這種方式。

celery_conf.py檔案的內容如下:

import os

from celery import Celery

# Get Kubernetes-provided address of the broker service
broker_service_host = os.environ.get('RABBITMQ_SERVICE_SERVICE_HOST')

app = Celery('tasks', broker='amqp://[email protected]%s//' % broker_service_host, backend='amqp')

@app.task
def add(x, y):
    return x + y

假設你已經熟悉Celery的執行機制,除了這個os.environ.get('RABBITMQ_SERVICE_SERVICE_HOST')部分。第一步建立的RabbitMQ服務的IP地址已經在環境變數中設定。Kubernetes會自動對所有定義了名為RabbitMQ服務的應用程式標籤的容器提供環境變數(這個例子中叫“任務佇列”)。上面那段Python程式碼,會在pod執行時自動填充代理地址。

第二個python指令碼(run_tasks.py) ,會以五秒為間隔,週期性的執行add隨機數字的任務。

現在的問題是,你怎麼看發生了什麼?

第四步:弄一個前端

Flower是一個基於web的工具,用來監控和管理Celery叢集。通過連線到一個包含Celery的節點,你可以實時看到所有worker以及他們的任務的工作情況。

首先,通過$ kubectl create -f examples/celery-rabbitmq/flower-service.yaml.命令來啟動一個Flower服務。這個服務定義如下:

apiVersion: v1
kind: Service
metadata:
  labels:
    name: flower
  name: flower-service
spec:
  ports:
  - port: 5555
  selector:
    app: taskQueue
    component: flower
  type: LoadBalancer

它會被標記為一個外部負載均衡器。然而,在許多平臺上,必須新增一個明確的防火牆規則,開啟5555埠。GCE上可以這麼操作:

 $ gcloud compute firewall-rules create --allow=tcp:5555 --target-tags=kubernetes-minion kubernetes-minion-5555

請記住在執行完這個例子後刪除這條規則(on GCE: $ gcloud compute firewall-rules delete kubernetes-minion-5555)。

執行下面命令來啟動pods,$ kubectl create -f examples/celery-rabbitmq/flower-controller.yaml。這個控制器是這麼定義的:

apiVersion: v1
kind: ReplicationController
metadata:
  labels:
    name: flower
  name: flower-controller
spec:
  replicas: 1
  selector:
    component: flower
  template:
    metadata:
      labels:
        app: taskQueue
        component: flower
    spec:
      containers:
      - image: endocode/flower
        name: flower
        resources:
          limits:
            cpu: 100m

這將建立一個新的pod,這個pod安裝了Flower,服務節點會暴露5555埠(Flower的預設埠)。這個映象使用下面命令啟動Flower:

flower --broker=amqp://guest:[email protected]${RABBITMQ_SERVICE_SERVICE_HOST:localhost}:5672//

同樣,它使用Kubernetes提供的環境變數來獲取RabbitMQ服務的IP地址。

一旦所有的pods啟動並且執行,執行kubectl get pods命令會顯示下面內容:

NAME                                           READY     REASON       RESTARTS   AGE
celery-controller-wqkz1                        1/1       Running      0          8m
flower-controller-7bglc                        1/1       Running      0          7m
rabbitmq-controller-5eb2l                      1/1       Running      0          13m

kubectl get service flower-service命令會幫助你獲取Flower服務的外部IP地址。

NAME             LABELS        SELECTOR                         IP(S)            PORT(S)
flower-service   name=flower   app=taskQueue,component=flower   10.0.44.166      5555/TCP
                                                                162.222.181.180

在你的網頁瀏覽器中輸入正確的flower服務地址和5555埠,(在這個例子中,http://162.222.181.180:5555)。如果你點選叫作“任務”的標籤,你應該會看到一個不斷增長的名為”celery_conf.add”的任務表單,它顯示了run_tasks.py指令碼的排程情況。

K8S中文社群微信公眾號