1. 程式人生 > >python 多執行緒與執行緒池

python 多執行緒與執行緒池

原始地址 http://www.ibm.com/developerworks/cn/aix/library/au-threadingpython/

資料學習:http://www.cnblogs.com/goodhacker/p/3359985.html

引言

對於 Python 來說,並不缺少併發選項,其標準庫中包括了對執行緒、程序和非同步 I/O 的支援。在許多情況下,通過建立諸如非同步、執行緒和子程序之類的高層模組,Python 簡化了各種併發方法的使用。除了標準庫之外,還有一些第三方的解決方案,例如 Twisted、Stackless 和程序模組。本文重點關注於使用 Python 的執行緒,並使用了一些實際的示例進行說明。雖然有許多很好的聯機資源詳細說明了執行緒 API,但本文嘗試提供一些實際的示例,以說明一些常見的執行緒使用模式。

全域性直譯器鎖 (Global Interpretor Lock) 說明 Python 直譯器並不是執行緒安全的。當前執行緒必須持有全域性鎖,以便對 Python 物件進行安全地訪問。因為只有一個執行緒可以獲得 Python 物件/C API,所以直譯器每經過 100 個位元組碼的指令,就有規律地釋放和重新獲得鎖。直譯器對執行緒切換進行檢查的頻率可以通過sys.setcheckinterval() 函式來進行控制。

此外,還將根據潛在的阻塞 I/O 操作,釋放和重新獲得鎖。有關更詳細的資訊,請參見參考資料部分中的 Gil and Threading State 和 Threading the Global Interpreter Lock

需要說明的是,因為 GIL,CPU 受限的應用程式將無法從執行緒的使用中受益。使用 Python 時,建議使用程序,或者混合建立程序和執行緒。

首先弄清程序和執行緒之間的區別,這一點是非常重要的。執行緒與程序的不同之處在於,它們共享狀態、記憶體和資源。對於執行緒來說,這個簡單的區別既是它的優勢,又是它的缺點。一方面,執行緒是輕量級的,並且相互之間易於通訊,但另一方面,它們也帶來了包括死鎖、爭用條件和高複雜性在內的各種問題。幸運的是,由於 GIL 和佇列模組,與採用其他的語言相比,採用 Python 語言線上程實現的複雜性上要低得多。

使用 Python 執行緒

要繼續學習本文中的內容,我假定您已經安裝了 Python 2.5 或者更高版本,因為本文中的許多示例都將使用 Python 語言的新特性,而這些特性僅出現於 Python2.5 之後。要開始使用 Python 語言的執行緒,我們將從簡單的 "Hello World" 示例開始:

hello_threads_example
        import threading
        import datetime
        
        class ThreadClass(threading.Thread):
          def run(self):
            now = datetime.datetime.now()
            print "%s says Hello World at time: %s" % 
            (self.getName(), now)
        
        for i in range(2):
          t = ThreadClass()
          t.start()

如果執行這個示例,您將得到下面的輸出:

      # python hello_threads.py 
      Thread-1 says Hello World at time: 2008-05-13 13:22:50.252069
      Thread-2 says Hello World at time: 2008-05-13 13:22:50.252576

仔細觀察輸出結果,您可以看到從兩個執行緒都輸出了 Hello World 語句,並都帶有日期戳。如果分析實際的程式碼,那麼將發現其中包含兩個匯入語句;一個語句匯入了日期時間模組,另一個語句匯入執行緒模組。類 ThreadClass 繼承自 threading.Thread,也正因為如此,您需要定義一個 run 方法,以此執行您在該執行緒中要執行的程式碼。在這個 run 方法中唯一要注意的是,self.getName() 是一個用於確定該執行緒名稱的方法。

最後三行程式碼實際地呼叫該類,並啟動執行緒。如果注意的話,那麼會發現實際啟動執行緒的是 t.start()。在設計執行緒模組時考慮到了繼承,並且執行緒模組實際上是建立在底層執行緒模組的基礎之上的。對於大多數情況來說,從 threading.Thread 進行繼承是一種最佳實踐,因為它建立了用於執行緒程式設計的常規 API。

使用執行緒佇列

如前所述,當多個執行緒需要共享資料或者資源的時候,可能會使得執行緒的使用變得複雜。執行緒模組提供了許多同步原語,包括訊號量、條件變數、事件和鎖。當這些選項存在時,最佳實踐是轉而關注於使用佇列。相比較而言,佇列更容易處理,並且可以使得執行緒程式設計更加安全,因為它們能夠有效地傳送單個執行緒對資源的所有訪問,並支援更加清晰的、可讀性更強的設計模式。

在下一個示例中,您將首先建立一個以序列方式或者依次執行的程式,獲取網站的 URL,並顯示頁面的前 1024 個位元組。有時使用執行緒可以更快地完成任務,下面就是一個典型的示例。首先,讓我們使用 urllib2 模組以獲取這些頁面(一次獲取一個頁面),並且對程式碼的執行時間進行計時:

URL 獲取序列
        import urllib2
        import time
        
        hosts = ["http://yahoo.com", "http://google.com", "http://amazon.com",
        "http://ibm.com", "http://apple.com"]
        
        start = time.time()
        #grabs urls of hosts and prints first 1024 bytes of page
        for host in hosts:
          url = urllib2.urlopen(host)
          print url.read(1024)
        
        print "Elapsed Time: %s" % (time.time() - start)

在執行以上示例時,您將在標準輸出中獲得大量的輸出結果。但最後您將得到以下內容:

        Elapsed Time: 2.40353488922

讓我們仔細分析這段程式碼。您僅匯入了兩個模組。首先,urllib2 模組減少了工作的複雜程度,並且獲取了 Web 頁面。然後,通過呼叫time.time(),您建立了一個開始時間值,然後再次呼叫該函式,並且減去開始值以確定執行該程式花費了多長時間。最後分析一下該程式的執行速度,雖然“2.5 秒”這個結果並不算太糟,但如果您需要檢索數百個 Web 頁面,那麼按照這個平均值,就需要花費大約 50 秒的時間。研究如何建立一種可以提高執行速度的執行緒化版本:

URL 獲取執行緒化
          #!/usr/bin/env python
          import Queue
          import threading
          import urllib2
          import time
          
          hosts = ["http://yahoo.com", "http://google.com", "http://amazon.com",
          "http://ibm.com", "http://apple.com"]
          
          queue = Queue.Queue()
          
          class ThreadUrl(threading.Thread):
          """Threaded Url Grab"""
            def __init__(self, queue):
              threading.Thread.__init__(self)
              self.queue = queue
          
            def run(self):
              while True:
                #grabs host from queue
                host = self.queue.get()
            
                #grabs urls of hosts and prints first 1024 bytes of page
                url = urllib2.urlopen(host)
                print url.read(1024)
            
                #signals to queue job is done
                self.queue.task_done()
          
          start = time.time()
          def main():
          
            #spawn a pool of threads, and pass them queue instance 
            for i in range(5):
              t = ThreadUrl(queue)
              t.setDaemon(True)
              t.start()
              
           #populate queue with data   
              for host in hosts:
                queue.put(host)
           
           #wait on the queue until everything has been processed     
           queue.join()
          
          main()
          print "Elapsed Time: %s" % (time.time() - start)

對於這個示例,有更多的程式碼需要說明,但與第一個執行緒示例相比,它並沒有複雜多少,這正是因為使用了佇列模組。在 Python 中使用執行緒時,這個模式是一種很常見的並且推薦使用的方式。具體工作步驟描述如下:

  1. 建立一個 Queue.Queue() 的例項,然後使用資料對它進行填充。
  2. 將經過填充資料的例項傳遞給執行緒類,後者是通過繼承 threading.Thread 的方式建立的。
  3. 生成守護執行緒池。
  4. 每次從佇列中取出一個專案,並使用該執行緒中的資料和 run 方法以執行相應的工作。
  5. 在完成這項工作之後,使用 queue.task_done() 函式向任務已經完成的佇列傳送一個訊號。
  6. 對佇列執行 join 操作,實際上意味著等到佇列為空,再退出主程式。

在使用這個模式時需要注意一點:通過將守護執行緒設定為 true,將允許主執行緒或者程式僅在守護執行緒處於活動狀態時才能夠退出。這種方式建立了一種簡單的方式以控制程式流程,因為在退出之前,您可以對佇列執行 join 操作、或者等到佇列為空。佇列模組文件詳細說明了實際的處理過程,請參見參考資料

join()
保持阻塞狀態,直到處理了佇列中的所有專案為止。在將一個專案新增到該佇列時,未完成的任務的總數就會增加。當使用者執行緒呼叫 task_done() 以表示檢索了該專案、並完成了所有的工作時,那麼未完成的任務的總數就會減少。當未完成的任務的總數減少到零時,join()就會結束阻塞狀態。

使用多個佇列

因為上面介紹的模式非常有效,所以可以通過連線附加執行緒池和佇列來進行擴充套件,這是相當簡單的。在上面的示例中,您僅僅輸出了 Web 頁面的開始部分。而下一個示例則將返回各執行緒獲取的完整 Web 頁面,然後將結果放置到另一個佇列中。然後,對加入到第二個佇列中的另一個執行緒池進行設定,然後對 Web 頁面執行相應的處理。這個示例中所進行的工作包括使用一個名為 Beautiful Soup 的第三方 Python 模組來解析 Web 頁面。使用這個模組,您只需要兩行程式碼就可以提取所訪問的每個頁面的 title 標記,並將其列印輸出。

多佇列資料探勘網站
import Queue
import threading
import urllib2
import time
from BeautifulSoup import BeautifulSoup

hosts = ["http://yahoo.com", "http://google.com", "http://amazon.com",
        "http://ibm.com", "http://apple.com"]

queue = Queue.Queue()
out_queue = Queue.Queue()

class ThreadUrl(threading.Thread):
    """Threaded Url Grab"""
    def __init__(self, queue, out_queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.out_queue = out_queue

    def run(self):
        while True:
            #grabs host from queue
            host = self.queue.get()

            #grabs urls of hosts and then grabs chunk of webpage
            url = urllib2.urlopen(host)
            chunk = url.read()

            #place chunk into out queue
            self.out_queue.put(chunk)

            #signals to queue job is done
            self.queue.task_done()

class DatamineThread(threading.Thread):
    """Threaded Url Grab"""
    def __init__(self, out_queue):
        threading.Thread.__init__(self)
        self.out_queue = out_queue

    def run(self):
        while True:
            #grabs host from queue
            chunk = self.out_queue.get()

            #parse the chunk
            soup = BeautifulSoup(chunk)
            print soup.findAll(['title'])

            #signals to queue job is done
            self.out_queue.task_done()

start = time.time()
def main():

    #spawn a pool of threads, and pass them queue instance
    for i in range(5):
        t = ThreadUrl(queue, out_queue)
        t.setDaemon(True)
        t.start()

    #populate queue with data
    for host in hosts:
        queue.put(host)

    for i in range(5):
        dt = DatamineThread(out_queue)
        dt.setDaemon(True)
        dt.start()


    #wait on the queue until everything has been processed
    queue.join()
    out_queue.join()

main()
print "Elapsed Time: %s" % (time.time() - start)

如果執行指令碼的這個版本,您將得到下面的輸出:

  # python url_fetch_threaded_part2.py 

  [<title>Google</title>]
  [<title>Yahoo!</title>]
  [<title>Apple</title>]
  [<title>IBM United States</title>]
  [<title>Amazon.com: Online Shopping for Electronics, Apparel,
 Computers, Books, DVDs & more</title>]
  Elapsed Time: 3.75387597084

分析這段程式碼時您可以看到,我們添加了另一個佇列例項,然後將該佇列傳遞給第一個執行緒池類 ThreadURL。接下來,對於另一個執行緒池類DatamineThread,幾乎複製了完全相同的結構。在這個類的 run 方法中,從佇列中的各個執行緒獲取 Web 頁面、文字塊,然後使用 Beautiful Soup 處理這個文字塊。在這個示例中,使用 Beautiful Soup 提取每個頁面的 title 標記、並將其列印輸出。可以很容易地將這個示例推廣到一些更有價值的應用場景,因為您掌握了基本搜尋引擎或者資料探勘工具的核心內容。一種思想是使用 Beautiful Soup 從每個頁面中提取連結,然後按照它們進行導航。

總結

本文研究了 Python 的執行緒,並且說明了如何使用佇列來降低複雜性和減少細微的錯誤、並提高程式碼可讀性的最佳實踐。儘管這個基本模式比較簡單,但可以通過將佇列和執行緒池連線在一起,以便將這個模式用於解決各種各樣的問題。在最後的部分中,您開始研究如何建立更復雜的處理管道,它可以用作未來專案的模型。參考資料部分提供了很多有關常規併發性和執行緒的極好的參考資料。

最後,還有很重要的一點需要指出,執行緒並不能解決所有的問題,對於許多情況,使用程序可能更為合適。特別是,當您僅需要建立許多子程序並對響應進行偵聽時,那麼標準庫子程序模組可能使用起來更加容易。有關更多的官方說明文件,請參考參考資料部分。

下載

描述 名字 大小
Sample threading code for this article 24KB

參考資料

學習

  • 您可以參閱本文在 developerWorks 全球站點上的 英文原文 。
  • 這個執行緒模組為多執行緒的使用提供了底層原語。
  • 這個執行緒化模組在較低層次執行緒模組的基礎上構造了高層次的執行緒介面。
  • PMOTW 執行緒模組允許您在相同的程序空間中併發地執行多項操作。
  • Asyncore 模組提供了以非同步的方式寫入套接字服務客戶端和伺服器的基礎結構。
  • 瞭解 Wikipedia 如何定義執行緒
  • 佇列模組
  • Beautiful Soup 是一種面向 Python 語言的 HTML/XML 解析器,它甚至可以將無效的標記轉換為解析樹。
  • 子執行緒模組允許您生成新的程序,連線到它們的輸入/輸出/錯誤管道,並獲取它們的返回程式碼。
  • AIX and UNIX 專區:developerWorks 的“AIX and UNIX 專區”提供了大量與 AIX 系統管理的所有方面相關的資訊,您可以利用它們來擴充套件自己的 UNIX 技能。
  • AIX and UNIX 新手入門:訪問“AIX and UNIX 新手入門”頁面可瞭解更多關於 AIX 和 UNIX 的內容。
  • AIX and UNIX 專題彙總:AIX and UNIX 專區已經為您推出了很多的技術專題,為您總結了很多熱門的知識點。我們在後面還會繼續推出很多相關的熱門專題給您,為了方便您的訪問,我們在這裡為您把本專區的所有專題進行彙總,讓您更方便的找到您需要的內容。
  • Podcast:收聽 Podcast 並與 IBM 技術專家保持同步。

獲得產品和技術

  • IBM 試用軟體:從 developerWorks 可直接下載這些試用軟體,您可以利用它們開發您的下一個專案。

討論