1. 程式人生 > >Python多執行緒與佇列

Python多執行緒與佇列

Python多執行緒與Queue佇列多執行緒在感官上類似於同時執行多個程式,雖然由於GIL的存在,在Python中無法實現執行緒的真正並行,但是對於某些場景,多執行緒仍不失為一個有效的處理方法:

1,不緊急的,無需阻塞主執行緒的任務,此時可以利用多執行緒在後臺慢慢處理;
2,IO密集型操作,比如檔案讀寫、使用者輸入和網路請求等,此時多執行緒可以近似達到甚至優於多程序的表現;

多執行緒的基本使用不再贅述,以下語法便可輕鬆實現:

1 def task(args1, args2):
2     pass
3 
4 Thread(
5     target=task,
6     args=(args1, args2)
7 ).start()


這裡我們重點關注執行緒通訊。

假設有這麼一種場景:有一批源資料,指定一個操作係數N,需要分別對其進行與N的加減乘除操作,並將結果彙總。
當然這裡的加減乘除只是一種簡單處理,在實際的生產環境中,它其實代表了一步較為複雜的業務操作,幷包含了較多的IO處理。

自然我們想到可以開啟多執行緒處理,那麼緊接著的問題便是:如何劃分執行緒,是根據處理步驟劃分,還是根據源資料劃分?

對於前者,我們把涉及的業務操作單獨劃分位一個執行緒,即有4個執行緒分別進行加減乘除的操作,顯然上一個執行緒的結果是下一個執行緒的輸入,這類似於流水線操作;

而後者則是把源資料分為若干份,每份啟動一個執行緒進行處理,最終把結果彙總。

一般來說,我們推薦第一種方式。因為在一個執行緒中完成所有的操作不如每步一個執行緒清晰明瞭,

尤其是在一些複雜的場景下,會加大單個執行緒的出錯概率和測試難度。

那麼我們將開闢4個執行緒,分別執行加減乘除操作。最後一個除法執行緒結束則任務完成:

 

 1 #!/usr/bin/env python
 2 # -*- coding: utf-8 -*-
 3 
 4 from Queue import Queue
 5 from threading import Thread
 6 
 7 
 8 class NumberHandler(object):
 9     def __init__(self, n):
10         self.n = n
11 
12     def add(self, num):
13         return num + self.n
14 
15     def subtract(self, num):
16         return num - self.n
17 
18     def multiply(self, num):
19         return num * self.n * self.n
20 
21     def divide(self, num):
22         return num / self.n
23 
24 
25 class ClosableQueue(Queue):
26     SENTINEL = object()
27 
28     def close(self):
29         self.put(self.SENTINEL)
30 
31     def __iter__(self):
32         while True:
33             item = self.get()
34             try:
35                 if item is self.SENTINEL:
36                     return
37                 yield item
38             finally:
39                 self.task_done()
40 
41 
42 class StoppableWorker(Thread):
43     def __init__(self, func, in_queue, out_queue):
44         super(StoppableWorker, self).__init__()
45         self.in_queue = in_queue
46         self.out_queue = out_queue
47         self.func = func
48 
49     def run(self):
50         for item in self.in_queue:
51             result = self.func(item)
52             self.out_queue.put(result)
53             print self.func
54 
55 
56 if __name__ == '__main__':
57     source_queue = ClosableQueue()
58     add_queue = ClosableQueue()
59     subtract_queue = ClosableQueue()
60     multiply_queue = ClosableQueue()
61     divide_queue = ClosableQueue()
62     result_queue = ClosableQueue()
63 
64     number_handler = NumberHandler(5)
65 
66     threads = [
67         StoppableWorker(number_handler.add, add_queue, subtract_queue),
68         StoppableWorker(number_handler.subtract, subtract_queue, multiply_queue),
69         StoppableWorker(number_handler.multiply, multiply_queue, divide_queue),
70         StoppableWorker(number_handler.divide, divide_queue, result_queue),
71     ]
72 
73     for _thread in threads:
74         _thread.start()
75 
76     for i in range(10):
77         add_queue.put(i)
78 
79     add_queue.close()
80     add_queue.join()
81     print 'add job done...'
82     subtract_queue.close()
83     subtract_queue.join()
84     print 'subtract job done...'
85     multiply_queue.close()
86     multiply_queue.join()
87     print 'multiply job done...'
88     divide_queue.close()
89     divide_queue.join()
90     print 'divide job done...'
91     result_queue.close()
92 
93     print "%s items finished, result: %s" % (result_queue.qsize(), result_queue)
94 
95     for i in result_queue:
96         print i

執行結果:

執行緒執行日誌:

 

 

 總的結果:

 

 可見執行緒交叉執行,但是任務卻是順序結束,這符合我們的預期。

值得注意的是,我們在ClosableQueue定義了一個close()方法,通過放入一個特殊的類變數SENTINEL告訴佇列應該關閉。此外,由於直接加減乘除結果不變,因此我特意乘了兩次來便於我們判斷結果。

總結:

1. Queue是一種高效的任務處理方式,它可以把任務處理流程劃分為若干階段,並使用多條python執行緒來同時執行這些子任務;

2. Queue類具備阻塞式的佇列操作、能夠指定緩衝區尺寸,而且還支 持join方法,這使得開發者可以構建出健壯的流水線。

&n