1. 程式人生 > >python 訊息佇列、非同步分散式

python 訊息佇列、非同步分散式

一.訊息佇列

訊息佇列:是在訊息的傳輸過程中儲存訊息的容器。
訊息佇列最經典的用法就是消費者和生成者之間通過訊息管道來傳遞訊息,消費者和生成者是不同的程序。生產者往管道中寫訊息,消費者從管道中讀訊息。

作業系統提供了很多機制來實現程序間的通訊 ,multiprocessing模組就提供了Queue和Pipe兩種方法來實現。

其中P指producer,即生產者;C指consumer,即消費者。中間的紅色表示訊息佇列,例項中表現為HELLO佇列。

1.生產消費例項 Queue 單向進行,即生產者只進行發訊息,消費者只進行收
#匯入模組
from multiprocessing import  Queue
from threading import Thread

import time

#建立生產者
def producer(q):
    print("start producer")
    for i in range(10):
        q.put(i)              #發訊息
        time.sleep(0.5)
    print("end producer")

#建立消費者,消費者一般是個死迴圈,要一直監聽是否有需要處理的資訊。
def customer(q):
    print("start customer")
    while 1:
        data = q.get()        #收訊息
        print("customer has get value {0}".format(data))


if __name__ == '__main__':
    q = Queue()                           #建立一個佇列
    pro = Thread(target=producer,args=(q,))
    cus = Thread(target=customer,args=(q,))
    pro.start()
    cus.start()

結果為:

start producer
start customer
producer has produced value 0
customer has get value 0
producer has produced value 1
customer has get value 1
producer has produced value 2
customer has get value 2
producer has produced value 3
customer has get value 3
producer has produced value 4
customer has get value 4
producer has produced value 5
customer has get value 5
producer has produced value 6
customer has get value 6
producer has produced value 7
customer has get value 7
producer has produced value 8
customer has get value 8
producer has produced value 9
customer has get value 9
end producer

可見生產一個則消費一個。

2.通過Mutiprocess裡面的Pipe來實現訊息佇列:

Pipe方法返回(conn1, conn2)代表一個管道的兩個端。Pipe方法有duplex引數,如果duplex引數為True(預設值),那麼這個管道是全雙工模式,也就是說conn1和conn2均可收發。duplex為False,conn1只負責接受訊息,conn2只負責傳送訊息。

send和recv方法分別是傳送和接受訊息的方法。

close方法表示關閉管道,當訊息接受結束以後,關閉管道。

from multiprocessing import Pipe, Process
from threading import Thread

import time

def proc1(pipe):
    for i in range(10):
        print("send {0}".format(i))
        pipe.send(i)
        time.sleep(0.5)
    print("end proc1")

def proc2(pipe):
    n =10
    while n:
        print("proc2 recv {0}".format(pipe.recv()))
        n -=1

if __name__ == '__main__':
    (p1,p2) = Pipe(duplex=False)
    pr = Process(target=proc1,args=(p2,))
    cu = Process(target=proc2,args=(p1,))
    pr.start()
    cu.start()
結果為:
send 0
proc2 recv 0
send 1
proc2 recv 1
send 2
proc2 recv 2
send 3
proc2 recv 3
send 4
proc2 recv 4
send 5
proc2 recv 5
send 6
proc2 recv 6
send 7
proc2 recv 7
send 8
proc2 recv 8
send 9
proc2 recv 9
end proc1
3.Python提供了Queue模組來專門實現訊息佇列Queue物件
Queue物件實現一個fifo佇列(其他的還有lifo、priority佇列,這裡不再介紹)。queue只有maxsize一個構造引數,用來指定佇列容量,指定為0的時候代表容量無限。主要有以下成員函式:
q = Queue(maxsize=0) #指定佇列大小,0表示無限
q.qsize() #返回當前佇列的空間
q.empty() #判斷當前佇列是否為空
q.full()  #判斷當前佇列是否滿了
q.put()   #發訊息
q.get()   #獲取訊息
q.task_done() #接受訊息的執行緒條用該函式來說明訊息對應的任務是否已經完成
q.join()   #等待佇列為空,再執行別的操作

二.Celery非同步分散式

Celery是一個python開發的非同步分散式任務排程模組。
Celery本身並不提供訊息服務,使用第三方服務,也就是borker來傳遞任務,目前支援rebbimq,redis, 資料庫等。

`這裡我們使用redis
連線url的格式為:
redis://:[email protected]:port/db_number
例如:
BROKER_URL = 'redis://localhost:6379/0'

1.安裝celery
pip install celery
pip install redis

在伺服器上安裝redis伺服器,並啟動redis
第一個簡單的例子:在任意路徑下建立一個檔案。

vim sixgod.py

#/usr/bin/env
#coding=utf-8
from celery import Celery,platforms
platforms.C_FORCE_ROOT = True

broker = "redis://localhost:6379/5"
backend = "redis://localhost:6379/6"
app = Celery("sixgod",broker=broker,backend=backend)

@app.task
def add(x,y):
return x+y
2.啟動worker
#celery -A sixgod worker -l info


3.生產者

在pycharm中將剛才寫的檔案匯入:


4.傳入資訊
from sixgod import add

re = add.delay(10,20)

執行之後觀察伺服器端情況:

5.獲取

re.result#獲取結果
re.ready)#是否處理
re.get#獲取結果

re.id         #獲取id

from sixgod import add

re = add.delay(100,200)
print(re.id)            #獲取id

執行:

823220ed-abff-45cb-a5f4-42c53c4d33e9
6.登入redis檢視


可以看到他們的id部分是相同的。