python—多進程的消息隊列
消息隊列是在消息的傳輸過程中保存消息的容器
消息隊列最經典的用法就是消費者 和生產者之間通過消息管道傳遞消息,消費者和生成者是不同的進程。生產者往管道寫消息,消費者從管道中讀消息
操作系統提供了很多機制來實現進程間的通信,multiprocessing模塊提供了Queue和Pipe兩種方法來實現
一、使用multiprocessing裏面的Queue來實現消息隊列
q = Queue
q.put(data)
data = q.get()
例子:
from multiprocessing import Queue, Process def write(q): for i in ["a","b","c","d"]: q.put(i) print("put {0} to queue".format(i)) def read(q): while 1: result = q.get() print("get {0} from queue".format(result)) def main(): q = Queue() pw = Process(target=write,args=(q,)) pr = Process(target=read,args=(q,)) pw.start() pr.start() pw.join() pr.terminate() if __name__ == "__main__": main()
運行結果:
put a to queue
put b to queueget a from queue
get b from queue
put c to queue
put d to queue
get c from queue
get d from queue
二、通過Multiprocessing裏面的Pipe來實現消息隊列
1)Pipe方法返回(conn1,conn2)代表一個管道的兩個端。Pipe方法有duplex參數,如果duplux參數為True(默認值),那麽這個管道是全雙工模式,即conn1和conn2均可收發。duplex為False,conn1負責接收消息,conn2負責發行消息
2)send和recv方法分別是發送和接收消息的方法。close方法表示關閉管道,當消息接收結束以後,關閉管道。
例子:
from multiprocessing import Process,Pipe import time def proc1(pipe): for i in xrange(1,10): pipe.send(i) time.sleep(3) print("send {0} to pipe".format(i)) def proc2(pipe): n = 9 while n>0: result = pipe.recv() time.sleep(3) print("recv {0} from pipe".format(result)) n -= 1 if __name__ == "__main__": pipe = Pipe(duplex=False) print(type(pipe)) p1 = Process(target=proc1,args=(pipe[1],)) p2 = Process(target=proc2,args=(pipe[0],)) p1.start() p2.start() p1.join() p2.join() pipe[0].close() pipe[1].close()
運行結果:
<type 'tuple'>
send 1 to pipe
recv 1 from pipe
send 2 to pipe
recv 2 from pipe
recv 3 from pipe
send 3 to pipe
send 4 to piperecv 4 from pipe
send 5 to pipe
recv 5 from pipe
recv 6 from pipe
send 6 to pipe
send 7 to pipe
recv 7 from pipe
send 8 to pipe
recv 8 from pipe
recv 9 from pipesend 9 to pipe
三、Queue模塊
python提供了Queue模塊來專門實現消息隊列:
Queue對象實現一個fifo隊列(其他的還有lifo、priority隊列)。queue只有gsize一個構造函數,用來指定隊列容量,指定為0的時候代表容量無限。只要有以下成員函數:
Queue.gsize():返回消息隊列的當前空間。返回的值不一定可靠。
Queue.empty():判斷消息隊列是否為空,返回True或者False。同樣不可靠
Queue.full():判斷消息是否滿
Queue.put(item,block=True,timeout=None):往消息隊列中存放數據。block可以控制是否阻塞,timeout控制阻塞時候的等待時間。如果不阻塞或者超時,會引起一個full exception。
Queue.put_nowait(item):相當於put(item,False)
Queue.get(block=True,timeout=None):獲取一個消息,其他等同put
以兩個函數用來判斷消息對應的任務是否完成:
Queue.task_done():接收消息的線程通過調用這個函來說明消息對應的任務已完成
Queue.join():實際上意味著等到隊列為空,再執行別的操作
python—多進程的消息隊列