Python並發編程之線程池/進程池--concurrent.futures模塊
一、關於concurrent.futures模塊
Python標準庫為我們提供了threading和multiprocessing模塊編寫相應的多線程/多進程代碼,但是當項目達到一定的規模,頻繁創建/銷毀進程或者線程是非常消耗資源的,這個時候我們就要編寫自己的線程池/進程池,以空間換時間。但從Python3.2開始,標準庫為我們提供了concurrent.futures模塊,它提供了ThreadPoolExecutor和ProcessPoolExecutor兩個類,實現了對threading和multiprocessing的進一步抽象,對編寫線程池/進程池提供了直接的支持。
1.Executor和Future:
concurrent.futures模塊的基礎是Exectuor,Executor是一個抽象類,它不能被直接使用。但是它提供的兩個子類ThreadPoolExecutor和ProcessPoolExecutor卻是非常有用,顧名思義兩者分別被用來創建線程池和進程池的代碼。我們可以將相應的tasks直接放入線程池/進程池,不需要維護Queue來操心死鎖的問題,線程池/進程池會自動幫我們調度。
Future這個概念相信有java和nodejs下編程經驗的朋友肯定不陌生了,你可以把它理解為一個在未來完成的操作,這是異步編程的基礎,傳統編程模式下比如我們操作queue.get的時候,在等待返回結果之前會產生阻塞,cpu不能讓出來做其他事情,而Future的引入幫助我們在等待的這段時間可以完成其他的操作。
p.s: 如果你依然在堅守Python2.x,請先安裝futures模塊。
pip install futures
二、操作線程池/進程池
1.使用submit來操作線程池/進程池:
# 線程池: from concurrent.futures import ThreadPoolExecutor import urllib.request URLS = [‘http://www.163.com‘, ‘https://www.baidu.com/‘, ‘https://github.com/‘] def load_url(url): with urllib.request.urlopen(url, timeout=60) as conn: print(‘%r page is %d bytes‘ % (url, len(conn.read()))) executor = ThreadPoolExecutor(max_workers=3) for url in URLS: future = executor.submit(load_url,url) print(future.done()) print(‘主線程‘) # 運行結果: False False False 主線程 ‘https://www.baidu.com/‘ page is 227 bytes ‘http://www.163.com‘ page is 662047 bytes ‘https://github.com/‘ page is 54629 bytes
我們根據運行結果來分析一下。我們使用submit方法來往線程池中加入一個task,submit返回一個Future對象,對於Future對象可以簡單地理解為一個在未來完成的操作。由於線程池異步提交了任務,主線程並不會等待線程池裏創建的線程執行完畢,所以執行了print(‘主線程‘),相應的線程池中創建的線程並沒有執行完畢,故future.done()返回結果為False。
# 進程池:同上 from concurrent.futures import ProcessPoolExecutor import urllib.request URLS = [‘http://www.163.com‘, ‘https://www.baidu.com/‘, ‘https://github.com/‘] def load_url(url): with urllib.request.urlopen(url, timeout=60) as conn: print(‘%r page is %d bytes‘ % (url, len(conn.read()))) executor = ProcessPoolExecutor(max_workers=3) if __name__ == ‘__main__‘: # 要加main for url in URLS: future = executor.submit(load_url,url) print(future.done()) print(‘主線程‘) #運行結果: False # 子進程只完成創建,並沒有執行完成 False False 主線程 # 子進程創建完成就會向下執行主線程,並不會等待子進程執行完畢 ‘http://www.163.com‘ page is 662049 bytes ‘https://www.baidu.com/‘ page is 227 bytes ‘https://github.com/‘ page is 54629 bytes
2.使用map來操作線程池/進程池:
除了submit,Exectuor還為我們提供了map方法,和內建的map用法類似:
from concurrent.futures import ThreadPoolExecutor import urllib.request URLS = [‘http://www.163.com‘, ‘https://www.baidu.com/‘, ‘https://github.com/‘] def load_url(url): with urllib.request.urlopen(url, timeout=60) as conn: print(‘%r page is %d bytes‘ % (url, len(conn.read()))) executor = ThreadPoolExecutor(max_workers=3) executor.map(load_url,URLS) print(‘主線程‘) # 運行結果: 主線程 ‘http://www.163.com‘ page is 662047 bytes ‘https://www.baidu.com/‘ page is 227 bytes ‘https://github.com/‘ page is 54629 bytes
從運行結果可以看出,map是按照URLS列表元素的順序返回的,並且寫出的代碼更加簡潔直觀,我們可以根據具體的需求任選一種。
3.wait:
wait方法接會返回一個tuple(元組),tuple中包含兩個set(集合),一個是completed(已完成的)另外一個是uncompleted(未完成的)。使用wait方法的一個優勢就是獲得更大的自由度,它接收三個參數FIRST_COMPLETED, FIRST_EXCEPTION 和ALL_COMPLETE,默認設置為ALL_COMPLETED。
如果采用默認的ALL_COMPLETED,程序會阻塞直到線程池裏面的所有任務都完成,再執行主線程:
from concurrent.futures import ThreadPoolExecutor,wait,as_completed import urllib.request URLS = [‘http://www.163.com‘, ‘https://www.baidu.com/‘, ‘https://github.com/‘] def load_url(url): with urllib.request.urlopen(url, timeout=60) as conn: print(‘%r page is %d bytes‘ % (url, len(conn.read()))) executor = ThreadPoolExecutor(max_workers=3) f_list = [] for url in URLS: future = executor.submit(load_url,url) f_list.append(future) print(wait(f_list)) print(‘主線程‘) # 運行結果: ‘http://www.163.com‘ page is 662047 bytes ‘https://www.baidu.com/‘ page is 227 bytes ‘https://github.com/‘ page is 54629 bytes DoneAndNotDoneFutures(done={<Future at 0x2d0f898 state=finished returned NoneType>, <Future at 0x2bd0630 state=finished returned NoneType>, <Future at 0x2d27470 state=finished returned NoneType>}, not_done=set()) 主線程
如果采用FIRST_COMPLETED參數,程序並不會等到線程池裏面所有的任務都完成。
from concurrent.futures import ThreadPoolExecutor,wait,as_completed import urllib.request URLS = [‘http://www.163.com‘, ‘https://www.baidu.com/‘, ‘https://github.com/‘] def load_url(url): with urllib.request.urlopen(url, timeout=60) as conn: print(‘%r page is %d bytes‘ % (url, len(conn.read()))) executor = ThreadPoolExecutor(max_workers=3) f_list = [] for url in URLS: future = executor.submit(load_url,url) f_list.append(future) print(wait(f_list,return_when=‘FIRST_COMPLETED‘)) print(‘主線程‘) # 運行結果: ‘http://www.163.com‘ page is 662047 bytes DoneAndNotDoneFutures(done={<Future at 0x2bd15c0 state=finished returned NoneType>}, not_done={<Future at 0x2d0d828 state=running>, <Future at 0x2d27358 state=running>}) 主線程 ‘https://www.baidu.com/‘ page is 227 bytes ‘https://github.com/‘ page is 54629 bytes
?寫一個小程序對比multiprocessing.pool(ThreadPool)和ProcessPollExecutor(ThreadPoolExecutor)在執行效率上的差距,結合上面提到的Future思考為什麽會造成這樣的結果?
Python並發編程之線程池/進程池--concurrent.futures模塊