1. 程式人生 > >執行緒池 concurrent.futures

執行緒池 concurrent.futures

concurrent.futures 模組提供一個高級別介面給呼叫非同步執行 非同步執行可以用執行緒執行,使用 ThreadPoolExecutor,或者單獨的程序使用 ProcessPoolExecutor,兩者都實行了相同的介面,它由 抽象  Executor類定義 #也就是說 ThreadPoolExecutorProcessPoolExecutor都繼承了  Executor類可呼叫裡面的方法。  

Executor Objects

class concurrent.futures.Executor     這個抽象類提供方法去執行呼叫非同步。它不能直接使用,而是通過它具體的子類 submit(fn, *args, **kwargs)     它是非同步提交任務,將可呼叫的fn作為fn(*args,**kwargs)執行,並返回表示可呼叫的執行的future函式     withThreadPoolExecutor(max_workers=1) as executor:    future =executor.submit(pow, 323, 1235)    print(future.result())   map(func, *iterables, timeout=None, chunksize=1)     類似 
map(func,
*iterables)除了以下:        迭代器是立即收集的而不是延遲收集的        func是非同步執行的和對func的多個呼叫可以併發執行     timeout:     如果 __next__()的結果不可用在超時幾秒後會返回迭代器引發的  concurrent.futures.TimeoutError,超時時間在 Executor.map()中定義     timeout可以是一個整數或者小數,如果timeout沒有指定或者None,等待時間沒有限制。     如果呼叫引發了一個異常,當迭代器檢索它的值時異常將被引發。     chunksize:    當使用
ProcessPoolExecutor
,這個方法將迭代器分割成許多塊,並將這些塊作為單獨的任務提交給池,可以用正整數指定chunksize的大小。     對於非常長的迭代,與預設大小1相比,使用大的chunksize值可以顯著的提高效能。     如果使用 ThreadPoolExecutor,chunksize將沒有效果。   shutdown(wait=True)    向執行程式發出釋放任何資源的訊號,在關閉後呼叫Executor.submit()和Executor.map()將引發執行時錯誤。如果wait為真,那麼該方法將不會返回,直到所有未執行的期貨都已執行,並且與執行器關聯的資源被釋放。如果wait時flase,該方法將立刻返回,當所有未執行的期貨執行完畢時,與執行器關聯的    資源將被釋放。不管等待的值是多少,整個python程式將不會退出,直到執行完所有未完成的futures。       如果使用with語句,則可以避免呼叫此方法(等待,就像呼叫 
Executor.shutdown()
將wait設定為True)

ThreadPoolExecutor

ThreadPoolExecutor是executor的子類,使用執行緒池非同步執行呼叫。 當與future關聯的可呼叫物件等待另一個future的結果時,可能會發生死鎖。例如: importtime defwait_on_b():time.sleep(5) print(b.result()) # b will never complete because it is waiting on a. return5   defwait_on_a():time.sleep(5) print(a.result()) # a will never complete because it is waiting on b. return6 executor =ThreadPoolExecutor(max_workers=2) a =executor.submit(wait_on_b) b =executor.submit(wait_on_a) And: defwait_on_future():    f =executor.submit(pow, 5, 2)    # This will never complete because there is only one worker thread and    # it is executing this function.    print(f.result()) executor =ThreadPoolExecutor(max_workers=1) executor.submit(wait_on_future)   class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())    max_workers:    一個Executor子類,它使用最多max_workers執行緒池非同步執行呼叫。如果 max_workers是none或者沒給,執行緒數將是機器處理器的數量乘以5,    initializer(初始化器),initargs:    initializer是可選可呼叫項,呼叫在每個工作執行緒的開始,initargs是傳遞給initializer的一組引數,如果初始化器引發異常,所有當前掛起的工作將引發     一個 BrokenThreadPool,以及任何嘗試提交更多工給地址池。假設ThreadPoolExecutor經常用於重複I/O而不是CPU工作,workers的數量應該     高於 ProcessPoolExecutorworker數量.    thread_name_prefix:     更新在3.6版本,thread_name_prefix引數將增加同意控制threding的使用者,為便於除錯,池建立的工作執行緒的執行緒名。     更新在3.7版本:增加initializerand initargs 引數

ThreadPoolExecutor Example

importconcurrent.futures importurllib.requestURLS =[' http://www.foxnews.com/', ' http://www.cnn.com/', ' http://europe.wsj.com/', ' http://www.bbc.co.uk/', ' http://some-made-up-domain.com/']   # Retrieve a single page and report the URL and contents defload_url(url,timeout): withurllib.request.urlopen(url,timeout=timeout) asconn: returnconn.read()   # We can use a with statement to ensure threads are cleaned up promptly withconcurrent.futures.ThreadPoolExecutor(max_workers=5) asexecutor: # Start the load operations and mark each future with its URLfuture_to_url ={executor.submit(load_url,url, 60):url forurl inURLS} forfuture inconcurrent.futures.as_completed(future_to_url):url =future_to_url[future] try:data =future.result() exceptExceptionasexc: print('%rgenerated an exception: %s'%(url,exc)) else: print('%rpage is %dbytes'%(url, len(data)))  

ProcessPoolExecutor

ProcessPoolExecutor類是Executor子類使用程序池非同步執行呼叫。 ProcessPoolExecutor使用 multiprocessing模組,允許他避開 Global Interpreter Lock,但同時意味著只有picklable物件能執行和返回 __main__模組必須能被工作子程序倒入,這意味著 ProcessPoolExecutor將不工作在互動直譯器。 從提交給ProcessPoolExecutor的可呼叫物件呼叫Executor或future的方法將導致死鎖。   class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())

使用最多max_workers程序池非同步執行呼叫的Executor子類,如果max_workers是None或不給,他將預設為機器上的處理器數量,如果max_workers小於或等於0,將會引發ValueError。mp_context可以是多處理上下文,也可以是無處理上下文,他將用於發射worker。如果mp_context是none或者沒給,預設multiprocessing context 被使用
初始化項是可選的可呼叫項,在每個工作程序開始時呼叫;initargs是一個元祖引數傳遞給 initializer,如果initializer引發一個異常,所有當前掛起的任務將引發一個  BrokenProcessPool,以及任何向池提交更多作業的嘗試。
更改在version3.3:當一工作程序突然中止,就會引發一個BrokenProcessPool 錯誤,以前,行為是未定義的,但是對執行者或其未來的操作通常會凍結或死鎖更改在version3.7:新增mp_context引數是為了允許使用者控制由池建立的工作程序的start_method。

ProcessPoolExecutor Example

importconcurrent.futures importmathPRIMES =[ 112272535095293, 112582705942171, 112272535095293, 115280095190773, 115797848077099, 1099726899285419]   defis_prime(n): ifn %2==0: returnFalsesqrt_n =int(math.floor(math.sqrt(n))) fori inrange(3,sqrt_n +1, 2): ifn %i ==0: returnFalse returnTrue   defmain(): withconcurrent.futures.ProcessPoolExecutor() asexecutor: fornumber,prime inzip(PRIMES,executor.map(is_prime,PRIMES)): print('%dis prime: %s'%(number,prime))   if__name__=='__main__':main()  

Future Objects

future類封裝了可呼叫的非同步執行。future例項是由 Executor.submit()建立的 class concurrent.futures.Future封裝了可呼叫的非同步執行。future是由 Executor.submit()建立的,除測試意外,不能夠直接建立cancel() 嘗試去取消呼叫。如果是當前執行呼叫,且無法取消的,則該方法則返回False,否則呼叫將取消並且該方法將返回Truecancelled()如果呼叫是成功取消的,返回True,running() 如果呼叫是當前正在執行的並且沒有取消,返回True,done()如果呼叫成功取消或完成執行返回Trueresult(timeout=None)返回呼叫返回的值,如果這個呼叫還沒有完成這個方法將等待timeout seconds,如果呼叫還沒完成在timeout seconds內,將引發一個 concurrent.futures.TimeoutError,timeout能是整數或浮點數,如果超時沒有指定或None,等待時間將沒有限制 如果在完成之前future取消將引發 CancelledError 如果引發呼叫,此方法將引發相同的異常。 exception(timeout=None)返回呼叫引發的異常。返回呼叫返回的值,如果這個呼叫還沒有完成這個方法將等待timeout seconds,如果呼叫還沒完成在timeout seconds內,將引發一個 concurrent.futures.TimeoutError,timeout能是整數或浮點數,如果超時沒有指定或None,等待時間將沒有限制 如果在完成之前future取消將引發 CancelledError 如果引發呼叫,此方法將引發相同的異常。add_done_callback(fn)將可呼叫的fn附加到future。當future被取消或完成執行時,將呼叫fn,並將future作為其唯一引數。新增的callables按新增順序呼叫,並且始終在屬於新增它們的程序的執行緒中呼叫。如果被呼叫引發一個 Exception子類,他將被記錄或者忽視。如果呼叫方引發一個 BaseException子類,行為是沒有定義的。如果future已經完成或者已經取消,fn將立即被呼叫
下面的方法將用於單元測試和執行器實現。