1. 程式人生 > >python 進程間通信Queue

python 進程間通信Queue

targe end item iter pytho TP AS 阻塞 等待

  • Queue的使用

    Queue.qsize()    #返回當前隊列包含的消息數量

    Queue.empty()   #如果隊列為空,返回True,反之False
    Queue.full()     #如果隊列滿了,返回True,反之False

    Queue.get([block[, timeout]])  #獲取隊列中的一條消息,然後將其從列隊中移除,block默認值為True,沒取到會阻塞

                   #如果block值為False,消息列隊如果為空,則會立刻拋出"Queue.Empty"異常

    Queue.get_nowait()      #相當Queue.get(False)

    Queue.put(item,[block[, timeout]])   #將item消息寫入隊列,block默認值為True,

                      #消息列隊如果已經沒有空間可寫入,此時程序將被阻塞(停在寫入狀態),直到從消息列隊騰出空間為止

                      #如果block值為False,消息列隊如果沒有空間可寫入,則會立刻拋出"Queue.Full"異常

    Queue.put_nowait(item)    #相當Queue.put(item, False)

  • 實例

    Process

    

from multiprocessing import
Process, Queue import os, time, random # 寫數據進程執行的代碼: def write(q): for value in [A, B, C]: print(Put %s to queue... % value) q.put(value) time.sleep(random.random()) # 讀數據進程執行的代碼: def read(q): while True: if not q.empty(): value = q.get(True)
print(Get %s from queue. % value) time.sleep(random.random()) else: break if __name__==__main__: # 父進程創建Queue,並傳給各個子進程: q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) # 啟動子進程pw,寫入: pw.start() # 等待pw結束: pw.join() # 啟動子進程pr,讀取: pr.start() pr.join() # pr進程裏是死循環,無法等待其結束,只能強行終止: print (所有數據都寫入並且讀完)

技術分享圖片

  Pool

#coding=utf-8

#修改import中的Queue為Manager
from multiprocessing import Manager,Pool
import os,time,random

def reader(q):
    print("reader啟動(%s),父進程為(%s)"%(os.getpid(),os.getppid()))
    for i in range(q.qsize()):
        print("reader從Queue獲取到消息:%s"%q.get(True))

def writer(q):
    print("writer啟動(%s),父進程為(%s)"%(os.getpid(),os.getppid()))
    for i in "dongGe":
        q.put(i)

if __name__=="__main__":
    print("(%s) start"%os.getpid())
    q=Manager().Queue() #使用Manager中的Queue來初始化
    po=Pool()
    #使用阻塞模式創建進程,這樣就不需要在reader中使用死循環了,可以讓writer完全執行完成後,再用reader去讀取
    po.apply(writer,(q,))
    po.apply(reader,(q,))
    po.close()
    po.join()
    print("(%s) End"%os.getpid())

python 進程間通信Queue