1. 程式人生 > >Celery 分散式任務佇列快速入門 Celery 分散式任務佇列快速入門

Celery 分散式任務佇列快速入門 Celery 分散式任務佇列快速入門

Celery 分散式任務佇列快速入門

 

本節內容

Celery介紹和基本使用

在專案中如何使用celery

啟用多個workers

Celery 定時任務

與django結合

通過django配置celery periodic task

 

 

一、Celery介紹和基本使用 

Celery 是一個 基於python開發的分散式非同步訊息任務佇列,通過它可以輕鬆的實現任務的非同步處理, 如果你的業務場景中需要用到非同步任務,就可以考慮使用celery, 舉幾個例項場景中可用的例子:

  1. 你想對100臺機器執行一條批量命令,可能會花很長時間 ,但你不想讓你的程式等著結果返回,而是給你返回 一個任務ID,你過一段時間只需要拿著這個任務id就可以拿到任務執行結果, 在任務執行ing進行時,你可以繼續做其它的事情。 
  2. 你想做一個定時任務,比如每天檢測一下你們所有客戶的資料,如果發現今天 是客戶的生日,就給他發個簡訊祝福

 

Celery 在執行任務時需要通過一個訊息中介軟體來接收和傳送任務訊息,以及儲存任務結果, 一般使用rabbitMQ or Redis,後面會講

1.1 Celery有以下優點:

  1. 簡單:一單熟悉了celery的工作流程後,配置和使用還是比較簡單的
  2. 高可用:當任務執行失敗或執行過程中發生連線中斷,celery 會自動嘗試重新執行任務
  3. 快速:一個單程序的celery每分鐘可處理上百萬個任務
  4. 靈活: 幾乎celery的各個元件都可以被擴充套件及自定製

Celery基本工作流程圖

 

 

1.2 Celery安裝使用

Celery的預設broker是RabbitMQ, 僅需配置一行就可以

1 broker_url 
=  'amqp://guest:[email protected]:5672//'

rabbitMQ 沒裝的話請裝一下,安裝看這裡  http://docs.celeryproject.org/en/latest/getting-started/brokers/rabbitmq.html#id3

 

使用Redis做broker也可以

安裝redis元件

1 $ pip install  - "celery[redis]"

配置

Configuration is easy, just configure the location of your Redis database:

app.conf.broker_url = 'redis://localhost:6379/0' 

Where the URL is in the format of:

redis://:[email protected]:port/db_number

all fields after the scheme are optional, and will default to localhost on port 6379, using database 0.

 

如果想獲取每個任務的執行結果,還需要配置一下把任務結果存在哪

If you also want to store the state and return values of tasks in Redis, you should configure these settings:

app.conf.result_backend = 'redis://localhost:6379/0' 

 

1. 3 開始使用Celery啦  

安裝celery模組

1 $ pip install celery

 

建立一個celery application 用來定義你的任務列表

建立一個任務檔案就叫tasks.py吧

1 2 3 4 5 6 7 8 9 10 from  celery  import  Celery   app  =  Celery( 'tasks' ,               broker = 'redis://localhost' ,               backend = 'redis://localhost' )   @app .task def  add(x,y):      print ( "running..." ,x,y)      return  x + y

 

啟動Celery Worker來開始監聽並執行任務

1 $ celery -A tasks worker --loglevel=info

  

呼叫任務

再開啟一個終端, 進行命令列模式,呼叫任務  

1 2 >>>  from  tasks  import  add >>> add.delay( 4 4 )

看你的worker終端會顯示收到 一個任務,此時你想看任務結果的話,需要在呼叫 任務時 賦值個變數

1 >>> result  =  add.delay( 4 4 )

The ready() method returns whether the task has finished processing or not:

>>> result.ready()
False 

You can wait for the result to complete, but this is rarely used since it turns the asynchronous call into a synchronous one:

>>> result.get(timeout=1) 8 

In case the task raised an exception, get() will re-raise the exception, but you can override this by specifying the propagate argument:

>>> result.get(propagate=False) 

If the task raised an exception you can also gain access to the original traceback:

>>> result.traceback

  

二、在專案中如何使用celery 

可以把celery配置成一個應用

目錄格式如下

1 2 3 proj /__init__ .py      /celery .py      /tasks .py

proj/celery.py內容

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from  __future__  import  absolute_import, unicode_literals from  celery  import  Celery   app  =  Celery( 'proj' ,               broker = 'amqp://' ,               backend = 'amqp://' ,               include = [ 'proj.tasks' ])   # Optional configuration, see the application user guide. app.conf.update(      result_expires = 3600 , )   if  __name__  = =  '__main__' :      app.start()

proj/tasks.py中的內容

複製程式碼
from __future__ import absolute_import, unicode_literals
from .celery import app


@app.task
def add(x, y):
    return x + y


@app.task
def mul(x, y):
    return x * y


@app.task
def xsum(numbers):
    return sum(numbers)
複製程式碼

啟動worker 

1 $ celery -A proj worker -l info

輸出

1 2 3 4 5 6 7 8 9 10 11 12 13 -------------- [email protected] local  v4.0.2 (latentcall) ---- **** ----- --- * ***  * -- Darwin-15.6.0-x86_64-i386-64bit 2017-01-26 21:50:24 -- * - **** --- - ** ---------- [config] - ** ---------- .> app:         proj:0x103a020f0 - ** ---------- .> transport:   redis: //localhost :6379 // - ** ---------- .> results:     redis: //localhost/ - *** --- * --- .> concurrency: 8 (prefork) -- ******* ---- .> task events: OFF ( enable  -E to monitor tasks  in  this worker) --- ***** -----   -------------- [queues]                  .> celery           exchange=celery(direct) key=celery

  

後臺啟動worker

In the background

In production you’ll want to run the worker in the background, this is described in detail in the daemonization tutorial.

The daemonization scripts uses the celery multi command to start one or more workers in the background:

$ celery multi start w1 -A proj -l info
celery multi v4.0.0 (latentcall)
> Starting nodes...
    > w1.halcyon.local: OK

You can restart it too:

$ celery  multi restart w1 -A proj -l info
celery multi v4.0.0 (latentcall)
> Stopping nodes...
    > w1.halcyon.local: TERM -> 64024
> Waiting for 1 node.....  > w1.halcyon.local: OK > Restarting node w1.halcyon.local: OK celery multi v4.0.0 (latentcall) > Stopping nodes...  > w1.halcyon.local: TERM -> 64052 

or stop it:

$ celery multi stop w1 -A proj -l info

The stop command is asynchronous so it won’t wait for the worker to shutdown. You’ll probably want to use the stopwait command instead, this ensures all currently executing tasks is completed before exiting:

$ celery multi stopwait w1 -A proj -l info

  

 

三、Celery 定時任務

 celery支援定時任務,設定好任務的執行時間,celery就會定時自動幫你執行, 這個定時任務模組叫celery beat

  寫一個指令碼 叫periodic_task.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from  celery  import  Celery from  celery.schedules  import  crontab   app  =  Celery()   @app .on_after_configure.connect def  setup_periodic_tasks(sender,  * * kwargs):      # Calls test('hello') every 10 seconds.      sender.add_periodic_task( 10.0 , test.s( 'hello' ), name = 'add every 10' )        # Calls test('world') every 30 seconds      sender.add_periodic_task( 30.0 , test.s( 'world' ), expires = 10 )        # Executes every Monday morning at 7:30 a.m.      sender.add_periodic_task(          crontab(hour = 7 , minute = 30 , day_of_week = 1 ),          test.s( 'Happy Mondays!' ),      )   @app .task def  test(arg):      print (arg)

add_periodic_task 會新增一條定時任務

 

上面是通過呼叫函式新增定時任務,也可以像寫配置檔案 一樣的形式新增, 下面是每30s執行的任務

1 2 3 4 5 6 7 8 app.conf.beat_schedule  =  {      'add-every-30-seconds' : {          'task' 'tasks.add' ,          'schedule' 30.0 ,          'args' : ( 16 16 )      }, } app.conf.timezone  =  'UTC'

  

任務新增好了,需要讓celery單獨啟動一個程序來定時發起這些任務, 注意, 這裡是發起任務,不是執行,這個程序只會不斷的去檢查你的任務計劃, 每發現有任務需要執行了,就發起一個任務呼叫訊息,交給celery worker去執行

啟動任務排程器 celery beat

1 $ celery -A periodic_task beat

輸出like below

1 2 3 4 5 6 7 8 9 10 celery beat v4.0.2 (latentcall) is starting. __    -    ... __   -        _ LocalTime -> 2017-02-08 18:39:31 Configuration ->      . broker -> redis: //localhost :6379 //      . loader -> celery.loaders.app.AppLoader      . scheduler -> celery.beat.PersistentScheduler      . db -> celerybeat-schedule      . logfile -> [stderr]@%WARNING      . maxinterval -> 5.00 minutes (300s)

此時還差一步,就是還需要啟動一個worker,負責執行celery beat發起的任務

啟動celery worker來執行任務

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 $ celery -A periodic_task worker      -------------- [email protected] local  v4.0.2 (latentcall) ---- **** ----- --- * ***  * -- Darwin-15.6.0-x86_64-i386-64bit 2017-02-08 18:42:08 -- * - **** --- - ** ---------- [config] - ** ---------- .> app:         tasks:0x104d420b8 - ** ---------- .> transport:   redis: //localhost :6379 // - ** ---------- .> results:     redis: //localhost/ - *** --- * --- .> concurrency: 8 (prefork) -- ******* ---- .> task events: OFF ( enable  -E to monitor tasks  in  this worker) --- ***** -----   -------------- [queues]                  .> celery           exchange=celery(direct) key=celery

  

好啦,此時觀察worker的輸出,是不是每隔一小會,就會執行一次定時任務呢!

注意:Beat needs to store the last run times of the tasks in a local database file (named celerybeat-schedule by default), so it needs access to write in the current directory, or alternatively you can specify a custom location for this file:

1 $ celery -A periodic_task beat -s  /home/celery/var/run/celerybeat-schedule

 

更復雜的定時配置  

上面的定時任務比較簡單,只是每多少s執行一個任務,但如果你想要每週一三五的早上8點給你發郵件怎麼辦呢?哈,其實也簡單,用crontab功能,跟linux自帶的crontab功能是一樣的,可以個性化定製任務執行時間

linux crontab http://www.cnblogs.com/peida/archive/2013/01/08/2850483.html 

 

1 2 3 4 5 6 7 8 9 10 from  celery.schedules  import  crontab   app.conf.beat_schedule  =  {      # Executes every Monday morning at 7:30 a.m.      'add-every-monday-morning' : {          'task' 'tasks.add' ,          'schedule' : crontab(hour = 7 , minute = 30 , day_of_week = 1 ),          'args' : ( 16 16 ),      }, }

上面的這條意思是每週1的早上7.30執行tasks.add任務

還有更多定時配置方式如下:

Example Meaning
crontab() Execute every minute.
crontab(minute=0, hour=0) Execute daily at midnight.
crontab(minute=0, hour='*/3') Execute every three hours: midnight, 3am, 6am, 9am, noon, 3pm, 6pm, 9pm.
crontab(minute=0,
hour='0,3,6,9,12,15,18,21')
Same as previous.
crontab(minute='*/15') Execute every 15 minutes.
crontab(day_of_week='sunday') Execute every minute (!) at Sundays.
crontab(minute='*',
hour='*', day_of_week='sun')
Same as previous.
crontab(minute='*/10',
hour='3,17,22', day_of_week='thu,fri')
Execute every ten minutes, but only between 3-4 am, 5-6 pm, and 10-11 pm on Thursdays or Fridays.
crontab(minute=0,hour='*/2,*/3') Execute every even hour, and every hour divisible by three. This means: at every hour except: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm
crontab(minute=0, hour='*/5') Execute hour divisible by 5. This means that it is triggered at 3pm, not 5pm (since 3pm equals the 24-hour clock value of “15”, which is divisible by 5).
crontab(minute=0, hour='*/3,8-17') Execute every hour divisible by 3, and every hour during office hours (8am-5pm).
crontab(0, 0,day_of_month='2') Execute on the second day of every month.
crontab(0, 0,
day_of_month='2-30/3')
Execute on every even numbered day.
crontab(0, 0,
day_of_month='1-7,15-21')
Execute on the first and third weeks of the month.
crontab(0, 0,day_of_month='11',
month_of_year='5')
Execute on the eleventh of May every year.
crontab(0, 0,
month_of_year='*/3')
Execute on the first month of every quarter.

 

上面能滿足你絕大多數定時任務需求了,甚至還能根據潮起潮落來配置定時任務, 具體看 http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#solar-schedules   

   

  

四、最佳實踐之與django結合 

django 可以輕鬆跟celery結合實現非同步任務,只需簡單配置即可

If you have a modern Django project layout like:

- proj/
  - proj/__init__.py - proj/settings.py - proj/urls.py - manage.py