1. 程式人生 > >分布式任務隊列celery用法詳解

分布式任務隊列celery用法詳解

後來 這一 als 介紹 type Coding 是把 cat 令行

celery基礎介紹:
技術分享圖片
這個圖我們可以看出,celery基本結構也就是三部分
1 第一部分 broker也就是中間件消息隊列,作用就是用來接收應用的請求
這一部分常見玩法可以是rabbitmq和redis等
2 第二部分 worker 也就是工作隊列 也就是celery本身的任務隊列服務,一般情況下大型的生產應用我們會結合supervisor來管理這麽多的worker
3 第三部分 result 存儲,就是把執行的結果,狀態等信息進行存儲,常規用法我們可以用rabbitmq redis,mysql,mongodb等來做
環境部署:
1pip install celery
2 安裝 rabbitmq 這個我得博客有篇文章做了詳細講解
3 安裝redis ,源碼安裝很簡單,不做介紹

首先做一個基礎的例子體驗什麽是celery
[root@localhost www]# cat tasks.py
#!/usr/bin/python
#coding:utf-8
from celery import Celery
app = Celery(‘tasks‘, broker=‘amqp://‘,backend=‘redis://‘)
#app.config_from_object(‘celeryconfig‘)

@app.task
def add(x, y):
return x + y
broker是接受的消息隊列的地址我這裏用的rabbitmq的地址
backend是後端的存儲我這裏用的是redis

啟動task
celery -A tasks worker --loglevel=info
然後我們新開一個終端進入python命令行去調用task
#python
#from tasks import add
#add.delay(2,4)
技術分享圖片
技術分享圖片

技術分享圖片

可以看出我們每次在python終端調用add這個任務 celery的worker 信息裏面就會收到操作並記錄信息 同時redis裏面記錄相應的狀態

======================================================

celery與tasks分離
技術分享圖片
[root@localhost test]# cat celery.py
#!/usr/bin/python

#coding:utf-8

from future import absolute_import ,unicode_literals
from celery import Celery

app = Celery(
‘test‘,
broker=‘amqp://‘,
backend=‘redis://‘,
include=[‘test.tasks‘]
)

app.conf.update(
result_expires=3500,
)

if name == ‘main‘:
app.start()

[root@localhost test]# cat tasks.py
#!/usr/bin/python
#coding:utf-8

from future import absolute_import ,unicode_literals

from test.celery import app

@app.task
def add(x,y):
return x + [email protected]
br/>@app.task
return x * y

後臺啟動 celery
celery multi start w1 -A proj -l info --logfile=/var/log/celery.log

celery有一堆的配置參數來控制每一個task這裏不做解說詳情見官網

重點是思路:
1 以前批量執行paramiko腳本的時候,一旦執行的機器多了,後來優化成多線程用threading來做,看到這個異步的任務隊列,我們可以把機器按隊列進行分組,進行路由限制數量,流量控制等等來對大批量的task任務進行分批量並發操作
2 還可以結合python框架web界面 applicate的過程,celery的worker的過程狀態和後端的結果狀態都集成到一個頁面,實時監測,界面化操作

分布式任務隊列celery用法詳解