1. 程式人生 > >django+celery+rabbitmq處理非同步任務

django+celery+rabbitmq處理非同步任務

版本選擇:

celery4不再支援windows,所以在Windows環境下使用請選擇celery==3.1.18

參考文件:

一. 簡介

celery是一個基於python開發的簡單、靈活且可靠的分散式任務佇列框架,支援使用任務佇列的方式在分散式的機器/程序/執行緒上執行任務排程。採用典型的生產者-消費者模型,主要由三部分組成: 
1. 訊息佇列broker:broker實際上就是一個MQ佇列服務,可以使用redis、rabbitmq等作為broker 
2. 處理任務的消費者workers:broker通知worker佇列中有任務,worker去佇列中取出任務執行,每一個worker就是一個程序 
3. 儲存結果的backend:執行結果儲存在backend,預設也會儲存在broker使用的MQ佇列服務中,也可以單獨配置用何種服務做backend
--------------------- 

二. 配置到Django

典型的django專案框架:

- proj/
  - proj/__init__.py
  - proj/settings.py
  - proj/urls.py
- manage.py

1. 建立/proj/proj/celery.py來定義celery例項

from __future__ import absolute_import

import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'dbmon.settings')

from django.conf import settings  # noqa

app = Celery('dbmon')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
# 此處celery將會找到在每一個app下的tasks.py檔案並將之將在
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

2. 在/proj/proj/__init__.py中匯入這個app,確保django每次啟動都能載入到從而才能使用@shared_task裝飾器

from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app  # noqa

3. 在每個app下建立task檔案

就像這樣

- app1/
    - app1/tasks.py
    - app1/models.py
- app2/
    - app2/tasks.py
    - app2/models.py

tasks.py

#! /usr/bin/python
# encoding:utf-8

# Create your tasks here

from __future__ import absolute_import,unicode_literals
from celery import shared_task
import frame.oracle_do as oracle
import frame.mysql_do as mysql

@shared_task
def add(x,y):
    return x+y

@shared_task
def oracle_shutdown(host,user,password):
    oracle.oracle_shutdown(host,user,password)

@shared_task
def oracle_startup(host,user,password):
    oracle.oracle_startup(host,user,password)

在views中呼叫

def oracle_ctl(request):
    # 告警
    messageinfo_list = models_frame.TabAlarmInfo.objects.all()
    oper_type = request.GET.get('oper_type')
    host = request.GET.get('host')

    if oper_type:
        log_type = 'Oracle啟停'
        sql = '''select user,password from tab_linux_servers where host='%s' ''' % host
        oracle = tools.mysql_query(sql)
        user = oracle[0][0]
        password = oracle[0][1]
        password = base64.decodestring(password)
        if oper_type == 'startup':
            task.oracle_startup.delay(host, user, password)
            return HttpResponseRedirect('/oracle_ctl/')
        elif oper_type == 'shutdown':
            # ora_do.oracle_shutdown(host, user, password)
            task.oracle_shutdown.delay(host,user,password)
            return HttpResponseRedirect('/oracle_ctl/')
        else:
            task.oracle_shutdown.delay(host, user, password)
            task.oracle_startup.delay(host, user, password)
            return HttpResponseRedirect('/oracle_ctl/')
    else:
        # 資料庫操作面板
        oracle_ctl_sql = '''select t1.tags,
             t1.host,
             t1.port,
             t1.service_name,
             (case t2.mon_status
             when 'connected' then 'running' else 'suspend' end) run_status,
             (case t2.mon_status
             when 'connected' then 'success' else 'danger' end) is_run,
               (case t2.mon_status
             when 'connected' then 'red' else 'green' end) run_color,
             (case t2.mon_status
             when 'connected' then 'shutdown' else 'startup' end) oper_type
        from tab_oracle_servers t1
        left join oracle_db t2
          on t1.tags = t2.tags'''

        oracle_ctl_list = tools.mysql_django_query(oracle_ctl_sql)

        paginator_oracle_ctl = Paginator(oracle_ctl_list, 5)
        page_oracle_ctl = request.GET.get('page_oracle_ctl')
        try:
            oracle_ctls = paginator_oracle_ctl.page(page_oracle_ctl)
        except PageNotAnInteger:
            # If page is not an integer, deliver first page.
            oracle_ctls = paginator_oracle_ctl.page(1)
        except EmptyPage:
            # If page is out of range (e.g. 9999), deliver last page of results.
            oracle_ctls = paginator_oracle_ctl.page(page_oracle_ctl.num_pages)

        now = tools.now()
        if request.method == 'POST':
            logout(request)
            return HttpResponseRedirect('/login/')

        if messageinfo_list:
            msg_num = len(messageinfo_list)
            msg_last = models_frame.TabAlarmInfo.objects.latest('id')
            msg_last_content = msg_last.alarm_content
            tim_last = (datetime.datetime.now() - msg_last.alarm_time).seconds / 60
            return render_to_response('oracle_ctl.html',
                                      {'messageinfo_list': messageinfo_list, 'oracle_ctls': oracle_ctls,
                                       'msg_num': msg_num, 'now': now,
                                       'msg_last_content': msg_last_content, 'tim_last': tim_last})
        else:
            msg_num = 0
            msg_last_content = ''
            tim_last = ''
            return render_to_response('oracle_ctl.html',
                                      {'messageinfo_list': messageinfo_list, 'oracle_ctls': oracle_ctls, 'now': now,
                                       'msg_last_content': msg_last_content, 'tim_last': tim_last})

4. 將django orm作為celery結果儲存

--需要安裝django-celery庫

pip install django-celery

--將djcelery加入到settings檔案中的installed_apps


INSTALLED_APPS = (
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'django_crontab',
    'frame',
    'login',
    'linux_mon',
    'oracle_mon',
    'mysql_mon',
    'djcelery',
)

--建立celery資料庫表

python manage.py migrate djcelery

5. 修改setting檔案,新增celery設定

這裡根據官方強烈建議使用rabbiltmq作為celery的訊息佇列,使用預設的guest賬號

settings.py

# celery setting
BROKER_URL = 'amqp://guest:[email protected]//'

6. 配置celery以使用django-celery作為結果儲存

cellery.py

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.conf.update(
    CELERY_RESULT_BACKEND='djcelery.backends.database:DatabaseBackend',
)

7. 啟動web服務和celery worker

C:\Users\Think\Desktop\dbmon>celery -A dbmon worker -l info
c:\python27\lib\site-packages\celery\apps\worker.py:161: CDeprecationWarning:
Starting from version 3.2 Celery will refuse to accept pickle by default.

The pickle serializer is a security concern as it may give attackers
the ability to execute any command.  It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice.

If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2::

    CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']

You must only enable the serializers that you will actually use.


  warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))

[2018-10-23 14:47:57,635: WARNING/MainProcess] c:\python27\lib\site-packages\celery\apps\worker.py:161: CDeprecationWarning:
Starting from version 3.2 Celery will refuse to accept pickle by default.

The pickle serializer is a security concern as it may give attackers
the ability to execute any command.  It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice.

If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2::

    CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']

You must only enable the serializers that you will actually use.


  warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))


 -------------- [email protected] v3.1.18 (Cipater)
---- **** -----
--- * ***  * -- Windows-10-10.0.17134
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         dbmon:0x3365ed0
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     djcelery.backends.database:DatabaseBackend
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery


[tasks]
  . dbmon.celery.debug_task
  . frame.tasks.add
  . frame.tasks.oracle_shutdown
  . frame.tasks.oracle_startup

[2018-10-23 14:47:58,444: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2018-10-23 14:47:58,710: INFO/MainProcess] mingle: searching for neighbors
[2018-10-23 14:47:59,836: INFO/MainProcess] mingle: all alone

在前端點選對應的頁面上時可以在celery工作視窗看到這樣的日誌

[2018-10-23 14:50:30,683: INFO/MainProcess] Received task: frame.tasks.oracle_shutdown[cf538cea-ef13-4cae-9230-bdf1452f59f5]
[2018-10-23 14:50:30,729: WARNING/Worker-1] C:\Users\Think\Desktop\dbmon/frame/oracle_tools/ora_shutdown.sh
[2018-10-23 14:50:30,740: INFO/Worker-1] Connected (version 2.0, client OpenSSH_5.3)
[2018-10-23 14:50:31,236: INFO/Worker-1] Authentication (password) successful!
[2018-10-23 14:50:31,342: INFO/Worker-1] [chan 0] Opened sftp connection (server version 3)
[2018-10-23 14:50:31,473: INFO/Worker-1] Connected (version 2.0, client OpenSSH_5.3)
[2018-10-23 14:50:31,938: INFO/Worker-1] Authentication (publickey) failed.
[2018-10-23 14:50:31,979: INFO/Worker-1] Authentication (password) successful!
[2018-10-23 14:50:44,351: INFO/MainProcess] Task frame.tasks.oracle_shutdown[cf538cea-ef13-4cae-9230-bdf1452f59f5] succeeded in 13.6649999619s: None

另外,還有個坑

在啟動celery worker的時候,出現這個報錯:

  File "c:\python27\lib\site-packages\django\apps\config.py", line 198, in import_models
    self.models_module = import_module(models_module_name)
  File "c:\python27\lib\importlib\__init__.py", line 37, in import_module
    __import__(name)
  File "c:\python27\lib\site-packages\django_celery_results\models.py", line 8, in <module>
    from celery.five import python_2_unicode_compatible
ImportError: cannot import name python_2_unicode_compatible

出錯地方程式碼:

from celery.five import python_2_unicode_compatible

使用這個帖子上的方法修復了這個問題,原理暫時不清楚

三. 配置使用celery實時web監控:fower

1. 安裝flower

pip install flower

2. 啟動

C:\Users\Think\Desktop\dbmon>celery -A dbmon flower
[I 181023 15:51:11 command:139] Visit me at http://localhost:5555
[I 181023 15:51:11 command:144] Broker: amqp://guest:**@localhost:5672//
[I 181023 15:51:11 command:147] Registered tasks:
    ['celery.backend_cleanup',
     'celery.chain',
     'celery.chord',
     'celery.chord_unlock',
     'celery.chunks',
     'celery.group',
     'celery.map',
     'celery.starmap',
     'dbmon.celery.debug_task',
     'frame.tasks.add',
     'frame.tasks.get_report',
     'frame.tasks.mysql_install',
     'frame.tasks.oracle_exec_sql',
     'frame.tasks.oracle_install',
     'frame.tasks.oracle_shutdown',
     'frame.tasks.oracle_startup',
     'frame.tasks.oracle_switchover']
[I 181023 15:51:11 mixins:231] Connected to amqp://guest:**@127.0.0.1:5672//

啟動之後就可以通過網頁訪問了

相關推薦

django+celery+rabbitmq處理非同步任務

版本選擇: celery4不再支援windows,所以在Windows環境下使用請選擇celery==3.1.18 參考文件: 一. 簡介 celery是一個基於python開發的簡單、靈活且可靠的分散式任務佇列框架,支援使用任務佇列的方式在分散式的機器/程序

Django+celery+ RabbitMQ實現非同步任務

一,首先安裝celery pip install django-celery 二,安裝rabbitmq ubuntu環境下執行以下 sudo apt-get install rabbitmq-server 新增使用者,myuser為使用者名稱,mypassword為使用者

djangocelery結合實現非同步任務

celery 基本概念 Celery 是一個 基於python開發的分散式非同步訊息任務佇列,通過它可以輕鬆的實現任務的非同步處理, 如果你的業務場景中需要用到非同步任務,就可以考慮使用celery celery的優點       

RabbitMQ 實踐之在處理非同步任務中的流程

一、背景: 我司的系統,使用者可以建立任務,啟動任務,但任務的執行需要很長的時間,所以採用訊息佇列的方式,後臺非同步處理。 這裡所用到的是 RabbitMQ 。 二、MQ 處理任務的流程 ① ② ③ ④ ⑤ :從前端發來 HTTP 請求,被 Producer(express) 處理,經過 Route -

用 Flask 來寫個輕部落格 (26) — 使用 Flask-Celery-Helper 實現非同步任務

目錄 前文列表 擴充套件閱讀 Celery Celery 是使用 Python 多工庫來編寫的任務佇列工具, 可以 並行 的執行任務. 我們會將執行時間較長但又不那

RabbitMQ非同步任務

一、前言     上一篇部落格介紹了用執行緒池實現非同步任務。這一篇部落格談一談用MQ實現非同步任務。MQ的產品有灰常多,像什麼MSMQ、activeMQ、RocketMQ、RabbitMQ、kafak等。在此之前先談一談對訊息佇列的理解。二、MQ       MQ是一種應用

在C#中使用 CancellationToken 處理非同步任務

在 .NET Core 中使用非同步程式設計已經很普遍了, 你在專案中隨處可見 async 和 await,它簡化了非同步操作,允許開發人員,使用同步的方式編寫非同步程式碼,你會發現在大部分的非同步方法中,都提供了CancellationToken引數,本文主要介紹下 CancellationTokenSou

django非同步任務celery+rabbitmq+flower視覺化)

文章目錄 安裝rabbitmq 安裝django-celery,flower 配置過程 安裝rabbitmq sudo apt-get install rabbitmq-server 新增使用者,myuser為

Python Django Celery 實現非同步任務(二)使用rabbitmq 作為broker

之前在上一篇文章中Python Celery 實現非同步任務是使用Django預設作為borker (訊息分發),因為升級最新的celery後,不再支援Django作為borker ,所以測試平臺更換為

Python--Django使用celery實現非同步任務

Django使用celery實現非同步任務 celery使用: 以傳送簡訊為例 在專案目錄下下建立celery_tasks用於儲存celery非同步任務。 在celery_tasks目錄下建立config.py檔案,用於儲存celery的配置資訊 # redi

Django中通過celery完成非同步任務

開發環境: Django:1.11 專案佈局 安裝 celery: pip install celery 非同步任務模組 celery_tasks; 非同步任務: sms (傳送簡訊); config: celery非同步任務的配置(這裡是用來指名非同步任務儲存的位置)

django-celery定時任務以及非同步任務and伺服器部署並且執行全部過程

Celery 應用Celery之前,我想大家都已經瞭解了,什麼是Celery,Celery可以做什麼,等等一些關於Celery的問題,在這裡我就不一一解釋了。 應用之前,要確保環境中添加了Celery包。 pip install celery pip install dajngo-celery

Django 使用 Celery 實現非同步任務

對於網站來說,給使用者一個較好的體驗是很重要的事情,其中最重要的指標就是網站的瀏覽速度。因此服務端要從各個方面對網站效能進行優化,比如可採用CDN載入一些公共靜態檔案,如js和css;合併css或者js從而減少靜態檔案的請求等等…..還有一種方法是將一些不需要立即返回給使用者,

關於用celerydjango內的非同步任務的注意事項

首先簡單介紹一下,Celery 是一個強大的分散式任務佇列,它可以讓任務的執行完全脫離主程式,甚至可以被分配到其他主機上執行。我們通常使用它來實現非同步任務(async task)和定時任務(crontab)。 如果你覺得自己的django專案有些行為邏輯實在沒必要耽誤在頁

分散式非同步任務佇列 Celery + rabbitmq (or redis )

最近的專案要使用非同步的任務佇列,初步選用了Celery,比較輕量級,但是對Task,Broker,Worker等概念有些理解的不透徹,找到以下文章,甚是透徹。  當我們需要處理一些比較耗時的任務時,我們就需要考慮啟用“非同步”這個概念。  比如以下兩種情況: 一,頻繁

Django中使用django-celery完成非同步任務(1)

許多Django應用需要執行非同步任務, 以便不耽誤http request的執行. 我們也可以選擇許多方法來完成非同步任務, 使用Celery是一個比較好的選擇, 因為Celery有著大量的社群支援, 能夠完美的擴充套件, 和Django結合的也很好. Cel

django —— Celery實現非同步和定時任務

1. 環境 python==2.7 djang==1.11.2 # 1.8, 1.9, 1.10應該都沒問題 celery-with-redis==3.0 # 需要用到redis作為中間人

Django配置celery執行非同步任務和定時任務

原生celery,非djcelery模組,所有演示均基於Django2.0 celery是一個基於python開發的簡單、靈活且可靠的分散式任務佇列框架,支援使用任務佇列的方式在分散式的機器/程序/執行緒上執行任務排程。採用典型的生產者-消費者模型,主

結合Django+celery二次開發定時周期任務

由於 抓取 而是 文檔 code 機會 編輯 越來越大 內存泄漏 需求: 前端時間由於開發新上線一大批系統,上完之後沒有配套的報表系統、監控,於是乎開發、測試、產品、運營、業務部、財務等等各個部門就跟那饑渴的餓狼一樣需要 各種各樣的系統數據滿足他們。剛開

Celery學習--- Celery 最佳實踐之與django結合實現異步任務

tar load modules bin min sta 版本差異 status linux django 可以輕松跟celery結合實現異步任務,只需簡單配置即可 同步執行和異步執行 註意:即使Celery的任務沒有執行完成,但是已經創建了任務ID。可以利用前臺的定時