1. 程式人生 > >Python多執行緒3:queue

Python多執行緒3:queue

queue模組實現了多生產者,多消費者佇列。在多執行緒環境下,該佇列能實現多個執行緒間安全的資訊交換。

queue模組介紹

模組實現了3種類型的佇列,區別在於佇列中條目檢索的順序不同。在FIFO佇列中,按照先進先出的順序檢索條目。在LIFO佇列中,最後新增的條目最先檢索到(操作類似一個棧)。在優先順序佇列中,條目被儲存為有序的(使用heapq模組)並且最小值的條目被最先檢索。
queue模組定義了下面的類和異常:

class queue.Queue(maxsize=0)

FIFO佇列的構造器。maxsize為一個整數,表示佇列的最大條目數。一旦佇列滿,插入將被阻塞直到佇列中存在空閒空間。如果maxsize小於等於0,佇列大小為無限。

class queue.LifoQueue(maxsize=0)

LIFO佇列的構造器。maxsize是一個整數,表示佇列的最大條目數。一旦佇列滿,插入將被阻塞直到佇列中存在空閒空間。如果maxsize小於等於0,佇列大小為無限。

class queue.PriorityQueue(maxsize=0)

優先順序佇列的構造器。maxsize是一個整數,表示佇列的最大條目數。一旦佇列滿,插入將被阻塞直到佇列中存在空閒空間。如果maxsize小於等於0,佇列大小為無限。
最小值的條目被最先檢索到(最小值的條目即為被sorted(list(entries))[0]返回的條目)。通常一個條目被儲存為下面的形式:(priority_number, data)。

exception queue.Empty

當Queue為空時,非阻塞的get()或者get_nowait()被呼叫時,將丟擲該異常。

exception queue.Full

當佇列滿時,非阻塞的put()或者put_nowait()被呼叫,將丟擲該異常。

Queue物件

Queue物件(Queue、LifoQueue或者PriorityQueue)提供了以下方法:


Queue.qsize()
返回佇列的近似大小。注意,qsize() > 0並不能保證接下來的get()方法不被阻塞;同樣,qsize() < maxsize也不能保證put()將不被阻塞。

Queue.empty()
如果佇列是空的,則返回True,否則False。如果empty()返回True,並不能保證接下來的put()呼叫將不被阻塞。類似的,empty()返回False也不能保證接下來的get()呼叫將不被阻塞。

Queue.full()

如果佇列滿則返回True,否則返回False。如果full()返回True,並不能保證接下來的get()呼叫將不被阻塞。類似的,full()返回False也不能保證接下來的put()呼叫將不被阻塞。

Queue.put(item, block=True, timeout=None)
放item到佇列中。如果block是True,且timeout是None,該方法將一直等待直到有佇列有空餘空間。如果timeout是一個正整數,該方法則最多阻塞timeout秒並丟擲Full異常。如果block是False並且佇列滿,則直接丟擲Full異常(這時timeout將被忽略)。

Queue.put_nowait(item)
等價於put(item, False)。

Queue.get(block=True, timeout=None)
從佇列中移除被返回一個條目。如果block是True並且timeout是None,該方法將阻塞直到佇列中有條目可用。如果timeout是正整數,該方法將最多阻塞timeout秒並丟擲Empty異常。如果block是False並且佇列為空,則直接丟擲Empty異常(這時timeout將被忽略)。

Queue.get_nowait()
等價於get(False)。

如果需要跟蹤進入佇列中的任務是否已經被精靈消費者執行緒處理完成,可以使用下面提供的兩個方法:


Queue.task_done()
表示一個先前的佇列中的任務完成了。被佇列消費者執行緒使用。對於每個get()獲取到的任務,接下來的task_done()的呼叫告訴佇列該任務的處理已經完成。
如果join()呼叫正在阻塞,當佇列中所有的條目被處理後它將恢復執行(意味著task_done()呼叫將被放入佇列中的每個條目接收到)。
如果呼叫次數超過了佇列中放置的條目數目,將丟擲ValueError異常。

Queue.join()
阻塞直到佇列中所有條目都被獲取並處理。
當一個條目被增加到佇列時,未完成任務的計數將增加。當一個消費者執行緒呼叫task_done()時,未完成任務的計數將減少。當未完成任務的計數減少到0時,join()解鎖。

下面是一個具體的例子,用於說明怎麼等待佇列任務完成:
def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = Queue()
for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # 阻塞直到所有任務完成