1. 程式人生 > >你還在用單執行緒?Python的併發處理庫的入門與使用詳解!

你還在用單執行緒?Python的併發處理庫的入門與使用詳解!

concurrent提供了兩種併發模型,一個是多執行緒ThreadPoolExecutor,一個是多程序ProcessPoolExecutor。對於IO密集型任務宜使用多執行緒模型。對於計算密集型任務應該使用多程序模型。

多執行緒模式適合IO密集型運算,這裡我要使用sleep來模擬一下慢速的IO任務。同時為了方便編寫命令列程式,這裡使用Google fire開源庫來簡化命令列引數處理。

我們看到計算總共花費了大概5s,總共sleep了10s由兩個執行緒分擔,所以是5s。讀者也許會問,為什麼輸出亂了,這是因為print操作不是原子的,它是兩個連續的write操作合成的,第一個write輸出內容,第二個write輸出換行符,write操作本身是原子的,但是在多執行緒環境下,這兩個write操作會交錯執行,所以輸出就不整齊了。如果將程式碼稍作修改,將print改成單個write操作,輸出就整齊了(關於write是否絕對原子性還需要進一步深入討論)

可以看到1s中就完成了所有的任務。這就是多執行緒的魅力,可以將多個IO操作並行化,減少整體處理時間。

多程序

相比多執行緒適合處理IO密集型任務,多程序適合計算密集型。接下來我們要模擬一下計算密集型任務。我的個人電腦有2個核心,正好可以體驗多核心計算的優勢。

那這個密集型計算任務怎麼模擬呢,我們可以使用圓周率計算公式。

通過擴大級數的長度n,就可以無限逼近圓周率。當n特別大時,計算會比較緩慢,這時候CPU就會一直處於繁忙狀態,這正是我們所期望的。

好,下面開寫多程序平行計算程式碼

通過程式碼可以看出多程序模式在程式碼的編寫上和多執行緒沒有多大差異,僅僅是換了一個類名,其它都一摸一樣。這也是concurrent庫的魅力所在,將多執行緒和多程序模型抽象出了一樣的使用介面。

看來耗時不能繼續節約了,因為只有2個計算核心,2個程序已經足以榨乾它們了,即使再多加程序也只有2個計算核心可用。

深入原理

concurrent用的時候非常簡單,但是內部實現並不是很好理解。在深入分析內部的結構之前,我們需要先理解一下Future這個物件。在前面的例子中,executor提交(submit)任務後都會返回一個Future物件,它表示一個結果的坑,在任務剛剛提交時,這個坑是空的,一旦子執行緒執行任務結束,就會將執行的結果塞到這個坑裡,主執行緒就可以通過Future物件獲得這個結果。簡單一點說,Future物件是主執行緒和子執行緒通訊的媒介。

Future物件的內部邏輯簡單一點可以使用下面的程式碼進行表示

我覺得作者的這張圖還不夠好懂,所以也單獨畫了一張圖,請讀者們仔細結合上面兩張圖,一起來過一邊完整的任務處理過程。

  1. 主執行緒將任務塞進TaskQueue(普通記憶體佇列),拿到Future物件
  2. 唯一的管理執行緒從TaskQueue獲取任務,塞進CallQueue(分散式跨程序佇列)
  3. 子程序從CallQueue中爭搶任務進行處理
  4. 子程序將處理結果塞進ResultQueue(分散式跨程序佇列)
  5. 管理執行緒從ResultQueue中獲取結果,塞進Future物件
  6. 主執行緒從Future物件中拿到結果

跨程序佇列

程序池模型中的跨程序佇列是用multiprocessing.Queue實現的。那這個跨程序佇列內部細節是怎樣的,它又是用什麼高科技來實現的呢

筆者仔細閱讀了multiprocessing.Queue的原始碼發現,它使用無名套接字sockerpair來完成的跨程序通訊,socketpair和socket的區別就在於socketpair不需要埠,不需要走網路協議棧,通過核心的套接字讀寫緩衝區直接進行跨程序通訊。

當父程序要傳遞任務給子程序時,先使用pickle將任務物件進行序列化成位元組陣列,然後將位元組陣列通過socketpair的寫描述符寫入核心的buffer中。子程序接下來就可以從buffer中讀取到位元組陣列,然後再使用pickle對位元組陣列進行反序列化來得到任務物件,這樣總算可以執行任務了。同樣子程序將結果傳遞給父程序走的也是一樣的流程,只不過這裡的socketpair是ResultQueue內部建立的無名套接字。

multiprocessing.Queue是支援雙工通訊,資料流向可以是父到子,也可以是子到父,只不過在concurrent的程序池實現中只用到了單工通訊。CallQueue是從父到子,ResultQueue是從子到父。

總結

concurrent.futures框架非常好用,雖然內部實現機制異常複雜,讀者也無需完全理解內部細節就可以直接使用了。但是需要特別注意的是不管是執行緒池還是程序池其內部的任務佇列都是無界的,一定要避免消費者處理不及時記憶體持續攀升的情況發生。

進群:125240963  即可獲取原始碼