1. 程式人生 > >python程序間通訊--Queue

python程序間通訊--Queue

multiprocessing模組中的Queue可以實現多程序之間的資料傳遞

初始化Queue()物件時 (例如:q = Qqueue()),若括號中沒有指定最大可接受的訊息數量或者數量為負數,則代表可接受的訊息數量沒有上線(直到記憶體的盡頭)
Queue類下的幾個常用方法:

Queue.qsize()      返回當前佇列包含的訊息數量

Queue.empty()      若佇列為空,返回True,反之False 

Queue.full()       若佇列滿了,返回True,反之False 
Queue.get([block[,timeout]])  獲取佇列中的一條訊息,並將其從佇列中移除,block預設True

#    1.block(阻塞)使用預設值,並沒有設定timeout(單位:秒),若訊息佇列為空,程式將
# 被阻塞(停在讀取狀態),直到從訊息佇列讀到訊息為止,若設定了timeout,則會等待timeout秒, # 若還沒有讀到任何訊息則丟擲"Queue.Empty異常" # 2.若block值為False,訊息佇列為空,則立即丟擲"Queue.Empty異常" Queue.get_nowait(): 相當於Queue.get(False)
Queue.put(item,[block[,timeout]])  將item訊息寫入佇列,block預設值True

#    1.若block使用預設值,且沒有設定timeout(單位:秒),訊息佇列如果已經沒有空間可寫入,此時程式會
# 被阻塞(停在寫入狀態),直到訊息佇列騰出空間為止,如果設定了timeout,則會等待timeout秒,如果 # 還沒有空間,則丟擲"Queue.Full"異常 # 2.若block值為False,訊息佇列如果沒有空間寫入,則立即丟擲"Queue.Full"異常 Queue.put_nowait(item): 相當於Queue.put(item,False)

用一個小示例演示Queue工作原理:

from multiprocessing import Queue

if __name__ == '__main__':
    #初始化
    q = Queue(3
) # 初始化一個Queue物件,最多可接受三條put訊息 #佇列儲存訊息 q.put("msg1") q.put("msg2") print(q.full())# 結果為False q.put("msg3") print(q.full())# 結果為True # 因為訊息佇列已滿,下面的兩個 try函式會丟擲異常 try: q.put("msg4",block=True,timeout=2) except: print("訊息佇列已滿,現在有訊息數量:%d"%q.qsize()) try: q.put_nowait("msg4") except: print("訊息佇列已滿,現在有訊息數量:%d"%q.qsize()) #推薦這種方式 if not q.full(): q.put("msg5") #讀取訊息 #格式:訊息佇列物件.get() print(q.get()) print("現在有訊息數量:%d" % q.qsize()) q.put("msg6") while q.qsize()>0: print(q.get())

再寫一個例項,在父程序中建立兩個子程序,一個向Queue寫資料,一個從Queue讀資料

from multiprocessing import Process,Queue
# 寫入資料
def write(q):
    values = ['A','B', 'C', 'D', 'E']
    for i in values:
        q.put(i)
        print('put %s to queue...' % i)
# 讀取資料
def read(q):
    while True:
        if not q.empty():
            print('get %s from queue!!' % q.get())
        else:
            break

if __name__ == '__main__':
    q = Queue()
    p1 = Process(target = write,args=(q,))
    p1.start()
    p1.join()

    p2 = Process(target = read,args=(q,))
    p2.start()
    p2.join()
    print('所有資料讀取完成')

執行結果為:

C:\ProgramData\Anaconda3\python.exe D:/PycharmProjects/firstproject/03_21/test02.py
put A to queue...
put B to queue...
put C to queue...
put D to queue...
put E to queue...
get A from queue!!
get B from queue!!
get C from queue!!
get D from queue!!
get E from queue!!
所有資料讀取完成

Process finished with exit code 0
程序池中的Queue

如果使用Pool建立程序,就需要用multiprocessing.Manager()中的Queue(), 而不是multiprocessing.Queue()

給一個具體例項說明

from multiprocessing import Manager,Pool
import os

def writer(q):
    print('writer啟動%s' % os.getpid())
    for i in 'jiaobaba':
        q.put(i)

def reader(q):
    print('reader啟動%s' % os.getpid())
    for i in range(q.qsize()):
        print('reader從Queue中讀到訊息: %s' % q.get())

if __name__ == '__main__':
    q = Manager().Queue() # 用於在程序池中使用的訊息佇列
    po = Pool()
    po.apply_async(func = writer, args = (q,))
    po.apply_async(func = reader, args = (q,))

    po.close()
    po.join()

執行結果:

C:\ProgramData\Anaconda3\python.exe D:/PycharmProjects/firstproject/03_21/pool.py
writer啟動1044
reader啟動1044
reader從Queue中讀到訊息: j
reader從Queue中讀到訊息: i
reader從Queue中讀到訊息: a
reader從Queue中讀到訊息: o
reader從Queue中讀到訊息: b
reader從Queue中讀到訊息: a
reader從Queue中讀到訊息: b
reader從Queue中讀到訊息: a

Process finished with exit code 0