1. 程式人生 > >python—多進程的消息隊列

python—多進程的消息隊列

python

消息隊列

消息隊列是在消息的傳輸過程中保存消息的容器

消息隊列最經典的用法就是消費者 和生產者之間通過消息管道傳遞消息,消費者和生成者是不同的進程。生產者往管道寫消息,消費者從管道中讀消息

操作系統提供了很多機制來實現進程間的通信,multiprocessing模塊提供了Queue和Pipe兩種方法來實現


一、使用multiprocessing裏面的Queue來實現消息隊列

q = Queue

q.put(data)

data = q.get()


例子:

from multiprocessing import Queue, Process
def write(q):
    for i in ["a","b","c","d"]:
        q.put(i)
        print("put {0} to queue".format(i))
def read(q):
    while 1:
        result = q.get()
        print("get {0} from queue".format(result))
def main():
    q = Queue()
    pw = Process(target=write,args=(q,))
    pr = Process(target=read,args=(q,))
    pw.start()
    pr.start()
    pw.join()
    pr.terminate()
if __name__ == "__main__":
    main()

運行結果:

put a to queue

put b to queueget a from queue

get b from queue

put c to queue

put d to queue

get c from queue

get d from queue



二、通過Multiprocessing裏面的Pipe來實現消息隊列

1)Pipe方法返回(conn1,conn2)代表一個管道的兩個端。Pipe方法有duplex參數,如果duplux參數為True(默認值),那麽這個管道是全雙工模式,即conn1和conn2均可收發。duplex為False,conn1負責接收消息,conn2負責發行消息

2)send和recv方法分別是發送和接收消息的方法。close方法表示關閉管道,當消息接收結束以後,關閉管道。


例子:

from multiprocessing import Process,Pipe
import time
def proc1(pipe):
    for i in xrange(1,10):
        pipe.send(i)
        time.sleep(3)
        print("send {0} to pipe".format(i))
def proc2(pipe):
    n = 9
    while n>0:
        result = pipe.recv()
        time.sleep(3)
        print("recv {0} from pipe".format(result))
        n -= 1
if __name__ == "__main__":
    pipe = Pipe(duplex=False)
    print(type(pipe))
    p1 = Process(target=proc1,args=(pipe[1],))
    p2 = Process(target=proc2,args=(pipe[0],))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    pipe[0].close()
    pipe[1].close()


運行結果:

<type 'tuple'>

send 1 to pipe

recv 1 from pipe

send 2 to pipe

recv 2 from pipe

recv 3 from pipe

send 3 to pipe

send 4 to piperecv 4 from pipe

send 5 to pipe

recv 5 from pipe

recv 6 from pipe

send 6 to pipe

send 7 to pipe

recv 7 from pipe

send 8 to pipe

recv 8 from pipe

recv 9 from pipesend 9 to pipe


三、Queue模塊

python提供了Queue模塊來專門實現消息隊列:

Queue對象實現一個fifo隊列(其他的還有lifo、priority隊列)。queue只有gsize一個構造函數,用來指定隊列容量,指定為0的時候代表容量無限。只要有以下成員函數:

Queue.gsize():返回消息隊列的當前空間。返回的值不一定可靠。

Queue.empty():判斷消息隊列是否為空,返回True或者False。同樣不可靠

Queue.full():判斷消息是否滿

Queue.put(item,block=True,timeout=None):往消息隊列中存放數據。block可以控制是否阻塞,timeout控制阻塞時候的等待時間。如果不阻塞或者超時,會引起一個full exception。

Queue.put_nowait(item):相當於put(item,False)

Queue.get(block=True,timeout=None):獲取一個消息,其他等同put


以兩個函數用來判斷消息對應的任務是否完成:

Queue.task_done():接收消息的線程通過調用這個函來說明消息對應的任務已完成

Queue.join():實際上意味著等到隊列為空,再執行別的操作


python—多進程的消息隊列