1. 程式人生 > >[Python標準庫]Queue——執行緒安全的 FIFO 實現

[Python標準庫]Queue——執行緒安全的 FIFO 實現

優先佇列
        有些情況下,佇列中元素的處理順序需要根據這些元素的特性來決定,而不只是在佇列中建立或插入的順序。例如,財務部門的列印作業可能要優先於一個開發人員列印的程式碼清單。PriorityQueue 使用佇列內容的有序順序來決定獲取哪一個元素。
import Queue
import threading

class Job(object):
    def __init__(self, priority, description):
        self.priority = priority
        self.description = description
        print 'New job:', description
        return
    def __cmp__(self, other):
        return cmp(self.priority, other.priority)

q = Queue.PriorityQueue()

q.put( Job(3, 'Mid-level job') )
q.put( Job(10, 'Low-level job') )
q.put( Job(1, 'Important job') )

def process_job(q):
    while True:
        next_job = q.get()
        print 'Processing job:', next_job.description
        q.task_done()

workers = [ threading.Thread(target=process_job, args=(q,)),
            threading.Thread(target=process_job, args=(q,)),
            ]

for w in workers:
    w.setDaemon(True)
    w.start()

q.join()
        這個例子有多個執行緒在處理作業,要根據呼叫 get() 時佇列中元素的優先順序來處理。執行消費者執行緒時,增加到佇列中的元素的處理順序取決於執行緒上下文切換。
構建一個多執行緒部落格客戶程式
        接下來的例子將構建一個部落格客戶程式,程式的原始碼展示瞭如何用多個執行緒使用 Queue 類。這個程式要讀入一個或多個 RSS 提要,對專輯排隊來顯示最新的 5 集以供下載,並使用執行緒並行地處理多個下載。這裡沒有提供足夠的錯誤處理,所有不能在實際生產環境中使用,不過這個骨架實現可以作為一個很好的例子來說明如果使用 Queue 模組。
        首先要建立一些操作引數。正常情況下,這些引數來自使用者輸入(首選項、資料庫,等等)。不過在這個例子中,執行緒數和要獲取的 URL 列表都使用了硬編碼值。
from Queue import Queue
from threading import Thread
import time
import urllib
import urlparse
import feedparser

# Set up some global variables
num_fetch_threads = 2
enclosure_queue = Queue()

# A real app wouldn't use hard-coded data...
feed_urls = [ 'http://advocacy.python.org/podcasts/littlebit.rss',
              ]
        函式 downloadEnclosures() 在工作執行緒中執行,使用 urllib 來處理下載。
def downloadEnclosures(i, q):
    """This is the worker thread function.
    It processes items in the queue one after
    another. These daemon threads go into an
    infinite loop, and only exit when
    the main thread ends.
    """

    while True:
        print '%s: Looking for the next enclosure' % i
        url = q.get()
        parsed_url = urlparse.urlparse(url)
        print '%s: Downloading:' % i, parsed_url.path
        response = urllib.urlopen(url)
        data = response.read()
        # Save the downloaded file to the current directory
        outfile_name = url.rpartition('/')[-1]
        with open(outfile_name, 'wb') as outfile:
            outfile.write(data)
        q.task_done()
        一旦定義了執行緒的目標函式,接下來可以啟動工作執行緒。downloadEnclosures() 處理語句 url = q.get() 時,會阻塞並等待,直到佇列返回某個結果。這說明,即使佇列中沒有任何內容,也可以安全地啟動執行緒。
# Set up some threads to fetch the enclosures
for i in range(num_fetch_threads):
    worker = Thread(target=downloadEnclosures,
                    args=(i, enclosure_queue,))
    worker.setDaemon(True)
    worker.start()
        下一步使用 Mark Pilgrim 的 feedparser 模組(www.feedparser.org)獲取提要內容,並將這些專輯的 URL 入隊。一旦第一個 URL 增加到佇列,就會有某個工作執行緒提取出這個 URL,開始下載。這個迴圈會繼續增加元素,直到提要全部利用,工作執行緒會以此將 URL 出隊來下載這些提要。
# Download the feed(s) and put the enclosure URLs into
# the queue.
for url in feed_urls:
    response = feedparser.parse(url, agent='fetch_podcasts.py')
for entry in response['entries'][-5:]:
    print entry.get('enclosure', [])
    for enclosure in entry.get('enclosure', []):
        parsed_url = urlparse.urlparse(enclosure['url'])
        print 'Queuing:', parsed_url.path
        enclosure_queue.put(enclosure['url'])
        還有一件事要做,要使用 join() 再次等待佇列騰空。
# Now wait for the queue to be empty, indicating that we have
# processed all the downloads.
print '*** Main thread waiting'
enclosure_queue.join()
print '*** Done'
        具體的輸出取決於所使用的 RSS 提要的內容。