1. 程式人生 > >[python] ThreadPoolExecutor執行緒池和ProcessPoolExecutor程序池

[python] ThreadPoolExecutor執行緒池和ProcessPoolExecutor程序池

引言

Python標準庫為我們提供了threading和multiprocessing模組編寫相應的多執行緒/多程序程式碼,但是當專案達到一定的規模,頻繁建立/銷燬程序或者執行緒是非常消耗資源的,這個時候我們就要編寫自己的執行緒池/程序池,以空間換時間。但從Python3.2開始,標準庫為我們提供了concurrent.futures模組,它提供了ThreadPoolExecutor和ProcessPoolExecutor兩個類,實現了對threading和multiprocessing的進一步抽象,對編寫執行緒池/程序池提供了直接的支援。

Executor和Future

concurrent.futures模組的基礎是Exectuor,Executor是一個抽象類,它不能被直接使用。但是它提供的兩個子類ThreadPoolExecutor和ProcessPoolExecutor卻是非常有用,顧名思義兩者分別被用來建立執行緒池和程序池的程式碼。我們可以將相應的tasks直接放入執行緒池/程序池,不需要維護Queue來操心死鎖的問題,執行緒池/程序池會自動幫我們排程。

Future這個概念相信有java和nodejs下程式設計經驗的朋友肯定不陌生了,你可以把它理解為一個在未來完成的操作,這是非同步程式設計的基礎,傳統程式設計模式下比如我們操作queue.get的時候,在等待返回結果之前會產生阻塞,cpu不能讓出來做其他事情,而Future的引入幫助我們在等待的這段時間可以完成其他的操作。關於在Python中進行非同步IO可以閱讀完本文之後參考我的Python併發程式設計之協程/非同步IO。

p.s: 如果你依然在堅守Python2.x,請先安裝futures模組。

pip install futures
ProcessPoolExecutor(n):n表示池裡面存放多少個程序,之後的連線最大就是n的值

submit(fn,*args,**kwargs)  非同步提交任務

map(func, *iterables, timeout=None, chunksize=1) 取代for迴圈submit的操作

shutdown(wait=True) 相當於程序池的pool.close()+pool.join()操作
wait=True,等待池內所有任務執行完畢回收完資源後才繼續,--------》預設
wait=False,立即返回,並不會等待池內的任務執行完畢
但不管wait引數為何值,整個程式都會等到所有任務執行完畢
submit和map必須在shutdown之前

result(timeout=None)  #取得結果

add_done_callback(fn)  #回撥函式

使用submit來操作執行緒池/程序池

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor,as_completed
import time

#模擬網路請求的網路延遲
def get_html(times):
    time.sleep(times)
    print("get page {}s finished".format(times))
    return times


#建立一個大小為2的執行緒池
pool = ThreadPoolExecutor(max_workers=2)

#將上個任務提交到執行緒池,因為執行緒池的大小是2,所以必須等task1和task2中有一個完成之後才會將第三個任務提交到執行緒池
task1 = pool.submit(get_html,3)
task2 = pool.submit(get_html,2)
task3 = pool.submit(get_html,4)

#列印該任務是否執行完畢
print(task1.done())
#只有未被提交的到執行緒池(在等待提交的佇列中)的任務才能夠取消
print(task3.cancel())
time.sleep(4)#休眠4秒鐘之後,執行緒池中的任務全部執行完畢,可以列印狀態
print(task1.done())

print(task1.result())#該任務的return 返回值  該方法是阻塞的。
  1. ThreadPoolExecutor構造例項的時候,傳入max_workers引數來設定執行緒池中最多能同時執行的執行緒數目。
  2. 使用submit函式來提交執行緒需要執行的任務(函式名和引數)到執行緒池中,並返回該任務的控制代碼(類似於檔案、畫圖),注意submit()不是阻塞的,而是立即返回。
  3. 通過submit函式返回的任務控制代碼,能夠使用done()方法判斷該任務是否結束。上面的例子可以看出,由於任務有2s的延時,在task1提交後立刻判斷,task1還未完成,而在延時4s之後判斷,task1就完成了。
  4. 使用cancel()方法可以取消提交的任務,如果任務已經線上程池中運行了,就取消不了。這個例子中,執行緒池的大小設定為2,任務已經在運行了,所以取消失敗。如果改變執行緒池的大小為1,那麼先提交的是task1,task2還在排隊等候,這是時候就可以成功取消。
  5. 使用result()方法可以獲取任務的返回值。檢視內部程式碼,發現這個方法是阻塞的。

as_completed

上面雖然提供了判斷任務是否結束的方法,但是不能在主執行緒中一直判斷啊。有時候我們是得知某個任務結束了,就去獲取結果,而不是一直判斷每個任務有沒有結束。這是就可以使用as_completed方法一次取出所有任務的結果。

pool = ThreadPoolExecutor(max_workers=2)
urls = [2,3,4]
all_task = [pool.submit(get_html,url) for url in urls]

for future in as_completed(all_task):
    data = future.result()
    print("in main: get page {}s success".format(data))
 
 #echo
 # 執行結果 
 # get page 2s finished 
 # in main: get page 2s success 
 # get page 3s finished 
 # in main: get page 3s success 
 # get page 4s finished 
 # in main: get page 4s success

as_completed()方法是一個生成器,在沒有任務完成的時候,會阻塞,在有某個任務完成的時候,會yield這個任務,就能執行for迴圈下面的語句,然後繼續阻塞住,迴圈到所有的任務結束。從結果也可以看出,先完成的任務會先通知主執行緒。

map

除了上面的as_completed方法,還可以使用executor.map方法,但是有一點不同。

from concurrent.futures import ThreadPoolExecutor 
import time 
# 引數times用來模擬網路請求的時間 
def get_html(times):
	time.sleep(times) 
	print("get page {}s finished".format(times)) 
	return times 
executor = ThreadPoolExecutor(max_workers=2) 
urls = [3, 2, 4] # 並不是真的url 
	for data in executor.map(get_html, urls): 
		print("in main: get page {}s success".format(data))

#echo
#執行結果
# get page 2s finished 
# get page 3s finished 
# in main: get page 3s success 
# in main: get page 2s success 
# get page 4s finished 
# in main: get page 4s success

wait

wait方法可以讓主執行緒阻塞,直到滿足設定的要求。

from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED 
import time
# 引數times用來模擬網路請求的時間 
def get_html(times): 
	time.sleep(times) 
	print("get page {}s finished".format(times)) 
	return times 
executor = ThreadPoolExecutor(max_workers=2) 
urls = [3, 2, 4] # 並不是真的url 
all_task = [executor.submit(get_html, (url)) for url in urls] 
wait(all_task, return_when=ALL_COMPLETED) 
print("main")

#echo
# 執行結果 
# get page 2s finished
# get page 3s finished
# get page 4s finished
# main

wait方法接收3個引數,等待的任務序列、超時時間以及等待條件。等待條件return_when預設為ALL_COMPLETED,表明要等待所有的任務都結束。可以看到執行結果中,確實是所有任務都完成了,主執行緒才打印出main。等待條件還可以設定為FIRST_COMPLETED,表示第一個任務完成就停止等待。

ProcessPoolExecutor使用

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time,random,os

def task(n):
    print('%s is running'% os.getpid())
    time.sleep(random.randint(1,3))
    return n
def handle(res):
    res=res.result()
    print("handle res %s"%res)

if __name__ == '__main__':
    #同步呼叫
    # pool=ProcessPoolExecutor(8)
    #
    # for i in range(13):
    #     pool.submit(task, i).result() #變成同步呼叫,串行了,等待結果
    # # pool.shutdown(wait=True) #關門等待所有程序完成
    # pool.shutdown(wait=False)#預設wait就等於True
    # # pool.submit(task,3333) #shutdown後不能使用submit命令
    #
    # print('主')

    #非同步呼叫
    pool=ProcessPoolExecutor(8)
    for i in range(13):
         obj=pool.submit(task,i)
         obj.add_done_callback(handle) #這裡用到了回撥函式
    pool.shutdown(wait=True) #關門等待所有程序完成
    print('主')
##注意,建立程序池必須在if __name__ == '__main__':中,否則會報錯
##其他的用法和建立執行緒池的一樣
from concurrent.futures import ThreadPoolExecutor
from urllib import request
from threading import current_thread
import time

def get(url):
    print('%s get %s'%(current_thread().getName(),url))
    response=request.urlopen(url)
    time.sleep(2)
    # print(response.read().decode('utf-8'))
    return{'url':url,'content':response.read().decode('utf-8')}

def parse(res):
    res=res.result()
    print('parse:[%s] res:[%s]'%(res['url'],len(res['content'])))

# get('http://www.baidu.com')
if __name__ == '__main__':
    pool=ThreadPoolExecutor(2)

    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://www.openstack.org',
        'https://www.openstack.org',
        'https://www.openstack.org',
        'https://www.openstack.org',
        'https://www.openstack.org',

    ]

    for url in urls:
        pool.submit(get,url).add_done_callback(parse)