1. 程式人生 > >在python下比celery更加簡單的非同步任務佇列RQ

在python下比celery更加簡單的非同步任務佇列RQ

廖雪峰 /程式設計 / 2013-8-27 19:33 / 閱讀: 21

Celery是Python開發的分散式任務排程模組,今天抽空看了一下,果然介面簡單,開發容易,5分鐘就寫出了一個非同步傳送郵件的服務。

Celery本身不含訊息服務,它使用第三方訊息服務來傳遞任務,目前,Celery支援的訊息服務有RabbitMQ、Redis甚至是資料庫,當然Redis應該是最佳選擇。

安裝Celery

用pip或easy_install安裝:

$ sudo pip install Celery

或著:

$ sudo easy_install Celery

使用Redis作為Broker時,再安裝一個celery-with-redis。

開始編寫tasks.py:

# tasks.py
import time
from celery import Celery

celery = Celery('tasks', broker='redis://localhost:6379/0')

@celery.task
def sendmail(mail):
    print('sending mail to %s...' % mail['to'])
    time.sleep(2.0)
    print('mail sent.')

然後啟動Celery處理任務:

$ celery -A tasks worker --loglevel=info

上面的命令列實際上啟動的是Worker,如果要放到後臺執行,可以扔給supervisor。

如何傳送任務?非常簡單:

>>> from tasks import sendmail
>>> sendmail.delay(dict(to='[email protected]'))
<AsyncResult: 1a0a9262-7858-4192-9981-b7bf0ea7483b>

可以看到,Celery的API設計真的非常簡單。

然後,在Worker裡就可以看到任務處理的訊息:

[2013-08-27 19:20:23,363: WARNING/MainProcess] celery@MichaeliMac.local ready.
[2013-08-27 19:20:23,367: INFO/MainProcess] consumer: Connected to redis://localhost:6379/0. [2013-08-27 19:20:45,618: INFO/MainProcess] Got task from broker: tasks.sendmail[1a0a9262-7858-4192-9981-b7bf0ea7483b] [2013-08-27 19:20:45,655: WARNING/PoolWorker-4] sending mail to celery@python.org... [2013-08-27 19:20:47,657: WARNING/PoolWorker-4] mail sent. [2013-08-27 19:20:47,658: INFO/MainProcess] Task tasks.sendmail[1a0a9262-7858-4192-9981-b7bf0ea7483b] succeeded in 2.00266814232s: None

Celery預設設定就能滿足基本要求。Worker以Pool模式啟動,預設大小為CPU核心數量,預設序列化機制是pickle,但可以指定為json。由於Python呼叫UNIX/Linux程式實在太容易,所以,用Celery作為非同步任務框架非常合適。

Celery還有一些高階用法,比如把多個任務組合成一個原子任務等,還有一個完善的監控介面,以後有空再繼續研究。


------------------------------

前言:

   這裡介紹一個python下,比celery更加簡單的非同步工具,真的是很簡單,當然他的功能沒有celery多,複雜程度也沒有celery大,文件貌似也沒有celery多,但是為啥會介紹rq這個東西 因為他夠簡單。

當然他雖然簡單,但是也是需要中間人的,也就是 Broker,這裡只能是redis了。 他沒有celery支援的那麼多,比如 redis rabbitmq mongodb mysql之類的。 說回來,咱們用rq,就是看重他的簡單。

安裝redis以及python-rq包,redis的話,直接yum就行,python rq需要pip來搞定。

[root@67 ~]# pip install rq
Downloading/unpacking rq
  Downloading rq-0.4.5.tar.gz
  Running setup.py egg_info for package rq
    warning: no previously-included files matching '*' found under directory 'tests'
Requirement already satisfied (use --upgrade to upgrade): redis>=2.7.0 in /usr/lib/python2.6/site-packages (from rq)
Downloading/unpacking importlib (from rq)
  Downloading importlib-1.0.3.tar.bz2
  Running setup.py egg_info for package importlib
Requirement already satisfied (use --upgrade to upgrade): argparse in /usr/lib/python2.6/site-packages (from rq)
Installing collected packages: rq, importlib
  Running setup.py install for rq
    warning: no previously-included files matching '*' found under directory 'tests'
    Installing rqinfo script to /usr/bin
    Installing rqworker script to /usr/bin
  Running setup.py install for importlib
Successfully installed rq importlib
Cleaning up...

先開始官方的demo:

這個是咱們要後端非同步的模組:

import requests
def count_words_at_url(url):
    resp = requests.get(url)
    return len(resp.text.split())

建立佇列

from redis import Redis
from rq import Queue
q = Queue(connection=Redis())

然後,直接rqworker !

一直往佇列裡面扔任務。

In [238]: result = q.enqueue(
count_words_at_url, 'http://nvie.com'
)
In [241]: result = q.enqueue(
count_words_at_url, 'http://nvie.com'
)
In [244]: result = q.enqueue(
count_words_at_url, 'http://nvie.com'
)
In [247]: result = q.enqueue(
count_words_at_url, 'http://xiaorui.cc'
)
In [250]: result = q.enqueue(
count_words_at_url, 'http://xiaorui.cc'
)
In [253]: result = q.enqueue(
count_words_at_url, 'http://xiaorui.cc'
)
In [256]: result = q.enqueue(
count_words_at_url, 'http://xiaorui.cc'
)

rqworker的介面任務並執行:

(下面的log已經說明了一切,任務確實執行了,而且我在ipython下,很是流暢,我不需要擔心任務是否很好的執行,我只需要把任務一扔,就甩屁股走人了。)

00:42:13 *** Listening on default...
00:42:22 default: nima.count_words_at_url('http://xiaorui.cc') (84f9d30f-8afc-4ea6-b281-4cb75c77779f)
00:42:22 Starting new HTTP connection (1): xiaorui.cc
00:42:23 Starting new HTTP connection (1): rfyiamcool.blog.51cto.com
00:42:23 Job OK, result = 2632
00:42:23 Result is kept for 500 seconds.
00:42:23
00:42:23 *** Listening on default...
00:42:27 default: nima.count_words_at_url('http://xiaorui.cc') (9fdaa934-e996-4719-8fb5-d619a4f15237)
00:42:27 Starting new HTTP connection (1): xiaorui.cc
00:42:28 Starting new HTTP connection (1): rfyiamcool.blog.51cto.com
00:42:28 Job OK, result = 2632
00:42:28 Result is kept for 500 seconds.
00:42:28
00:42:28 *** Listening on default...
00:42:28 default: nima.count_words_at_url('http://xiaorui.cc') (952cc12b-445e-4682-a12a-96e8019bc4a8)
00:42:28 Starting new HTTP connection (1): xiaorui.cc
00:42:28 Starting new HTTP connection (1): rfyiamcool.blog.51cto.com
00:42:28 Job OK, result = 2632
00:42:28 Result is kept for 500 seconds.
00:42:28
00:42:28 *** Listening on default...
00:42:29 default: nima.count_words_at_url('http://xiaorui.cc') (c25803e4-a3ad-4889-bbec-06cf1e77a11e)
00:42:29 Starting new HTTP connection (1): xiaorui.cc
00:42:29 Starting new HTTP connection (1): rfyiamcool.blog.51cto.com
00:42:29 Job OK, result = 2632
00:42:29 Result is kept for 500 seconds.
00:42:29
00:42:29 *** Listening on default..

緊接著咱們再跑一個我自己測試的模組,邏輯很簡單在sleep情況下,是否會很好的執行,來測試他的 非同步 任務執行。 當然你也可以rqworker執行的執行,下面的程式碼更像是event事件的感覺。

[root@67 ~]# cat worker.py
#xiaorui.cc
import os
import redis
from rq import Worker, Queue, Connection
listen = ['high', 'default', 'low']
redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')
conn = redis.from_url(redis_url)
if __name__ == '__main__':
    with Connection(conn):
        worker = Worker(map(Queue, listen))
        worker.work()

下面是自己需要非同步執行的模組程式碼~

[root@67 ~]# cat utils.py
#xiaorui.cc
import requests
import time
def tosleep(num):
    time.sleep(num)
    return num

咱們在ipython測試下吧:

In [53]: from redis import Redis
In [54]: from rq import Queue
In [55]:
In [56]: q = Queue(connection=Redis())
In [57]: from utils import tosleep
In [58]: for i in range(5):
q.enqueue(tosleep,5)
   ....:
   ....:
Out[59]: Job(u'8d71a0ee-695a-4708-b6cf-15821aac7299', enqueued_at=datetime.datetime(2014, 5, 14, 15, 42, 4, 47779))
Out[59]: Job(u'27419b10-8b12-418c-8af1-43c290fc2bf3', enqueued_at=datetime.datetime(2014, 5, 14, 15, 42, 4, 51855))
Out[59]: Job(u'7c98f0d1-7317-4c61-8bfa-10e223033948', enqueued_at=datetime.datetime(2014, 5, 14, 15, 42, 4, 53606))
Out[59]: Job(u'0a84a48f-3372-4ef0-8aa8-d868de2e0c11', enqueued_at=datetime.datetime(2014, 5, 14, 15, 42, 4, 57173))
Out[59]: Job(u'ad1986b9-a2fa-4205-93ab-a1b685d7cf88', enqueued_at=datetime.datetime(2014, 5, 14, 15, 42, 4, 58355))

看到沒有,本來咱們呼叫了一個函式是sleep5s,但他不影響其他的程式碼的堵塞,會扔到佇列裡面後,迅速的執行後面的程式碼。

如果我想像celery那樣,檢視結果的話,也是用result方法的。

#xiaorui.cc
In [67]: job=q.enqueue(tosleep,5)
In [68]: job.result
In [69]: job.result
In [70]: job.result
In [71]: job.result
In [72]: job.result
Out[72]: 5

但是有個缺點,任務是非同步方式的放到了redis的佇列裡面了,但是後端的work貌似是單程序的。。。當然也很好改,用threading針對每個任務進行fork執行緒就可以了。

#xiaorui.cc
In [47]: for i in range(5):
....:	  q.enqueue(tosleep,5)
....:
....:
Out[47]: Job(u'5edb3690-9260-4aba-9eaf-fa75fbf74a13', enqueued_at=datetime.datetime(2014, 5, 14, 15, 24, 54, 229289))
Out[47]: Job(u'e91cfcb8-850b-4da4-8695-13f84a6a0222', enqueued_at=datetime.datetime(2014, 5, 14, 15, 24, 54, 233016))
Out[47]: Job(u'cc6c78d4-e3b5-4c22-b027-8c070b6c43db', enqueued_at=datetime.datetime(2014, 5, 14, 15, 24, 54, 234333))
Out[47]: Job(u'569decc8-7ad2-41eb-83cc-353d7386d2b9', enqueued_at=datetime.datetime(2014, 5, 14, 15, 24, 54, 235954))
Out[47]: Job(u'155c493e-5a2c-4dcf-8d9b-3ae2934bf9e5', enqueued_at=datetime.datetime(2014, 5, 14, 15, 24, 54, 238030))
#xiaorui.cc

這個是worker.py打出來的日誌:

23:24:59 Job OK, result = 5
23:24:59 Result is kept for 500 seconds.
23:24:59
23:24:59 *** Listening on high, default, low...
23:24:59 default: utils.tosleep(5) (e91cfcb8-850b-4da4-8695-13f84a6a0222)
23:25:04 Job OK, result = 5
23:25:04 Result is kept for 500 seconds.
23:25:04
23:25:04 *** Listening on high, default, low...
23:25:04 default: utils.tosleep(5) (cc6c78d4-e3b5-4c22-b027-8c070b6c43db)
23:25:09 Job OK, result = 5
23:25:09 Result is kept for 500 seconds.
23:25:09
23:25:09 *** Listening on high, default, low...
23:25:09 default: utils.tosleep(5) (569decc8-7ad2-41eb-83cc-353d7386d2b9)
23:25:14 Job OK, result = 5
23:25:14 Result is kept for 500 seconds.
23:25:14
23:25:14 *** Listening on high, default, low...
23:25:14 default: utils.tosleep(5) (155c493e-5a2c-4dcf-8d9b-3ae2934bf9e5)
23:25:19 Job OK, result = 5
23:25:19 Result is kept for 500 seconds.
23:25:19
23:25:19 *** Listening on high, default, low...

這裡在看下官方給的例子:

from rq import Connection, Queue
from redis import Redis
from somewhere import count_words_at_url
# 建立redis的一個連線物件
redis_conn = Redis()
q = Queue(connection=redis_conn)  # 預設是用redis的default佇列名
# 封裝任務
job = q.enqueue(count_words_at_url, 'http://xiaorui.cc')
print job.result   # => None
# Now, wait a while, until the worker is finished
time.sleep(2)
print job.result   # => 889

rq可以設定任務的優先級別的,比如一個low級別的。

q = Queue('low', connection=redis_conn)
q.enqueue(count_words_at_url, 'http://nvie.com')

對了,官方提供了一個rq的管理平臺頁面。

wKiom1NzmY_S_IxDAAMkuOWQUhM521.jpg

本文出自 “峰雲,就她了。” 部落格,謝絕轉載!

相關推薦

pythoncelery更加簡單非同步任務佇列RQ

廖雪峰 /程式設計 / 2013-8-27 19:33 / 閱讀: 21 Celery是Python開發的分散式任務排程模組,今天抽空看了一下,果然介面簡單,開發容易,5分鐘就寫出了一個非同步傳送郵件的服務。 Celery本身不含訊息服務,它使用第三方訊息服務來

python學習opengl之簡單窗口

proc .org github 教程 col 默認 nco setw pen 最近在看一個opengl教程:https://learnopengl.com/Introduction,寫的深入淺出,非常不錯,而且有中文的翻譯版:https://learnopengl-cn.

Celery非同步任務佇列的使用

版權宣告:本文為博主原創文章,未經博主允許不得轉載。https://mp.csdn.net/mdeditor/83016539 任務傳送者,任務執行者,中間人 安裝: pip install celery 使用: 1)建立一個Celery類的例項物件 from celer

Django+celery+ RabbitMQ實現非同步任務

一,首先安裝celery pip install django-celery 二,安裝rabbitmq ubuntu環境下執行以下 sudo apt-get install rabbitmq-server 新增使用者,myuser為使用者名稱,mypassword為使用者

django與celery結合實現非同步任務

celery 基本概念 Celery 是一個 基於python開發的分散式非同步訊息任務佇列,通過它可以輕鬆的實現任務的非同步處理, 如果你的業務場景中需要用到非同步任務,就可以考慮使用celery celery的優點       

簡單非同步任務工具——rq 的使用教程

rq是一個簡單的,輕量級的非同步任務工具。如果在網站中使用者發起一個用時很久(大於2分鐘)的請求,如果用同步的方式,伺服器就會返回超時。這時候就需要用非同步請求,使用者發起請求後,服務端把作業扔給另一個程序去執行,然後立刻返回給使用者,使用者再通過輪詢或者其他方式來獲取作業的執行進度和執行結果。rq的作用就相

分散式非同步任務佇列 Celery + rabbitmq (or redis )

最近的專案要使用非同步的任務佇列,初步選用了Celery,比較輕量級,但是對Task,Broker,Worker等概念有些理解的不透徹,找到以下文章,甚是透徹。  當我們需要處理一些比較耗時的任務時,我們就需要考慮啟用“非同步”這個概念。  比如以下兩種情況: 一,頻繁

用 Flask 來寫個輕部落格 (26) — 使用 Flask-Celery-Helper 實現非同步任務

目錄 前文列表 擴充套件閱讀 Celery Celery 是使用 Python 多工庫來編寫的任務佇列工具, 可以 並行 的執行任務. 我們會將執行時間較長但又不那

django+celery+rabbitmq處理非同步任務

版本選擇: celery4不再支援windows,所以在Windows環境下使用請選擇celery==3.1.18 參考文件: 一. 簡介 celery是一個基於python開發的簡單、靈活且可靠的分散式任務佇列框架,支援使用任務佇列的方式在分散式的機器/程序

非同步任務佇列的兩種處理方法

先對這裡的非同步任務做下解釋: 這裡的意思是,該任務有幾種狀態,建立,等待,執行,結束;其中等待是因為,該任務要正常執行,需要其他執行緒(或程序)提供相應的條件(或觸發事件),然後才會執行。    

php +swoole實現非同步任務佇列

假如要發100封郵件,for迴圈100遍,使用者直接揭竿而起,什麼破網站!但實際上,我們很可能有超過1萬的郵件。怎麼處理這個延遲的問題?答案就是用非同步。把“發郵件”這個操作封裝,然後後臺非同步地執行1萬遍。這樣的話,使用者提交網頁後,他所等待的時間只是“把發郵件任務請求推送

java 非同步任務佇列執行--需要注意的地方

參照1寫的非同步任務佇列過程中,發現一些java基礎知識掌握不夠。 1)Iterable 類的forEach方法和for迴圈方法的不同: try { List<Future<Object>> futures = exe

Python--Django使用celery實現非同步任務

Django使用celery實現非同步任務 celery使用: 以傳送簡訊為例 在專案目錄下下建立celery_tasks用於儲存celery非同步任務。 在celery_tasks目錄下建立config.py檔案,用於儲存celery的配置資訊 # redi

風火程式設計--python釋出celery非同步任務

celery釋出非同步任務(redis資料庫) 單一檔案 1.建立celery應用 from celery import Celery celery_app = Celery( "name", bro

Python Django Celery 實現非同步任務(二)使用rabbitmq 作為broker

之前在上一篇文章中Python Celery 實現非同步任務是使用Django預設作為borker (訊息分發),因為升級最新的celery後,不再支援Django作為borker ,所以測試平臺更換為

PythonAPScheduler的簡單使用

python 定時任務 apscheduler 今天準備實現一個功能需要用到定時執行任務,所以就看到了Python的一個定時任務框架APScheduler,試了一下感覺還不錯。1.APScheduler簡介: APScheduler是Python的一個定時任務框架,可以很方便的滿足用戶定時執行或者

Pythonopencv使用筆記(一)(圖像簡單讀取、顯示與儲存)

操作 灰度圖 清晰 ren from tty ims 圖像 type 寫在之前 從去年開始關註python這個軟件,途中間間斷斷看與學過一些關於python的東西。感覺python確實是一個簡單優美、easy上手的腳本編程語言,眾多的第三方庫使得py

opencv-python簡單KNN分類識別

文件 ont and color div feature png image spa KNN是數據挖掘中一種簡單算法常用來分類,此次用來聚類實現對4種花的簡單識別。 環境:python2.7+opencv3.0+windows10 原理:在使用KNN函數提取出4種花特征點以

PythonMongoDB的簡單應用

size isp 等等 name port sed opened 更新數據 學習方法 1.傻瓜安裝失敗請看下面方法 1.mongodb下載zip文件 2.解壓到D或者E盤 3.創建mangodb文件夾,把mongodb自創建文件夾中的所有文件剪切到mongodb 4.此

用java簡單分析特幣區塊鏈

我假設你已經對比特幣的含義有一個模糊的概念,並且你對交易背後的機制有一個簡單的理解:對地址進行支付(這是匿名的,因為它們不能直接連結到特定的個人),所有交易都是公開的。交易以塊的形式收集,塊在區塊鏈中連結在一起。 你可以將區塊鏈視為一個不斷更新且可供所有人訪問的大型資料庫。你可以使用Bi