1. 程式人生 > >Django2.1+celery4.2.1+rabbitMQ3.7.5使用

Django2.1+celery4.2.1+rabbitMQ3.7.5使用

celery是一個基於python開發的簡單、靈活且可靠的分散式任務佇列框架,支援使用任務佇列的方式在分散式的機器/程序/執行緒上執行任務排程。採用典型的生產者-消費者模型,主要由三部分組成:

  1. 訊息佇列broker:broker實際上就是一個MQ佇列服務,可以使用redis、rabbitmq等作為broker
  2. 處理任務的消費者workers:broker通知worker佇列中有任務,worker去佇列中取出任務執行,每一個worker就是一個程序
  3. 儲存結果的backend:執行結果儲存在backend,預設也會儲存在broker使用的MQ佇列服務中,也可以單獨配置用何種服務做backend

pip install eventlet(加入協程支援)如果不安裝,celery -A project worker -l info執行會出現下面問題,詳情見:這裡

安裝後使用  芹菜 - 專案工人-l info -P eventlet來執行

安裝的RabbitMQ伺服器(不多介紹,忘記了),執行的RabbitMQ服務,沒有執行的RabbitMQ的話會出現下圖所示問題:

配置settings.py檔案

CELERY_BROKER_URL = 'amqp://guest:[email protected]:5672'

在專案的根路徑下,也就是與settings.py檔案同級目錄下建立celery.py

from __future__ import absolute_import, unicode_literals
from celery import Celery
import os

# 為celery程式設定預設的Django設定模組。
os.environ.setdefault("DJANGO_SETTINGS_MODULE", 'Demo1.settings')

# 註冊Celery的APP
app = Celery('Demo1')
# 繫結配置檔案
app.config_from_object('django.conf.settings', namespace='CELERY')

# 自動發現各個app下的tasks.py檔案
app.autodiscover_tasks()

上面的demo1的是我的專案名,

在設定的同級目錄中的init.py檔案中新增下面內容

from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app

__all__ = ['celery_app']

在你的Django app下建立tasks.py檔案

注意tasks.py必須建在各應用程式的根目錄下,且只能叫tasks.py,不能隨意命名

# tasks.py
from celery.task import task
import time


@task
def test_celery():
    print("開始")
    time.sleep(6)
    return "test celery"

然後在你的views.py中呼叫這個函式

# views.py
from .tasks import test_celery


def index(request):
    result = test_celery.delay()
    print(result)
    return Httpresponse("test")

如果test_celery()中有引數,例如有引數A,那麼在views.py中這樣使用----> test_celery.delay(A)

啟動Django的專案

啟動芹菜  芹菜-A Demo1 worker -l info -P eventlet(在pycharm的終端中啟動)

會出現下面內容

(Django_env2) E:\Work\Django-test\Django2_1\Demo1>celery -A Demo1 worker -l info -P eventlet

 -------------- [email protected] v4.2.1 (windowlicker)
---- **** -----
--- * ***  * -- Windows-10-10.0.17134-SP0 2018-12-27 20:18:24
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         Demo1:0x19fa742d630
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 4 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . model.tasks.test_celery

[2018-12-27 20:18:24,201: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2018-12-27 20:18:24,211: INFO/MainProcess] mingle: searching for neighbors
[2018-12-27 20:18:25,241: INFO/MainProcess] mingle: all alone
[2018-12-27 20:18:25,257: INFO/MainProcess] pidbox: Connected to amqp://guest:**@127.0.0.1:5672//.
[2018-12-27 20:18:25,260: WARNING/MainProcess] e:\work\django-test\django_env2\lib\site-packages\celery\fixups\django.py:200: UserWarning: Using setting
s.DEBUG leads to a memory leak, never use this setting in production environments!
  warnings.warn('Using settings.DEBUG leads to a memory leak, never '
[2018-12-27 20:18:25,261: INFO/MainProcess] [email protected] ready.

當我們進行到views.py中的index函式中,重新整理頁面不會有任何延遲。

終端中

參考:https://www.jianshu.com/p/3de08b043467

待續...