python 64式: 第16式、程序池
阿新 • • 發佈:2018-11-27
#!/usr/bin/env python # -*- coding: utf-8 -*- import multiprocessing import time from concurrent import futures ''' 關鍵: 1、程序池提出原因:同時建立很多程序是需要消耗資源的,可以建立幾個程序,其他任務在等待執行緒池中執行緒 完成,就可以繼續處理 本質:將任務提交到程序池的任務佇列中 組成:等待佇列和一系列程序 2、 concurrent.futures.Executor 作用:抽象類,有非同步執行呼叫方法。有兩個子類: ThreadPoolExecutor(max_workers)和ProcessPoolExecutor(max_workers) max_workers:表示有多少worker並行執行該任務,非同步呼叫,若為None,則設定為機器的處理器數目 3、 Executor.submit(fn, *args, **kwargs) 作用:排程函式的執行 引數: fn: 非同步執行的函式,*args: fn的引數,**kwargs: fn的引數 返回值: 返回一個Future物件,表示可呼叫的執行 注意: submit是立即返回的 4、 Executor.map(function, *iterables, timeout=None): 作用:將argument作為引數執行函式,以非同步方式執行;相當於map(func, *iterables) 但是func是非同步執行,如果操作超時,返回錯誤;不指定timeout,則不設定超時 引數: func:非同步執行函式,*iterables:可迭代物件,如列表,每一次func執行,都會從iterables中取引數 5、 Executor.shutdown(wait=True) 作用:釋放系統資源,在submit()或map()等非同步操作之後呼叫,使用with語句可以避免顯示呼叫該方法 6、 concurrent.futures.as_completed(fs, timeout=None) 作用:接收一個future列表,返回一個迭代器,在執行結束後刪除future,一次取出所有任務的結果 本質:是生成器,任務還沒有完成,會阻塞;先完成任務會先通知主執行緒 as_completed()是按照完成時間輸出的 map()是按照輸入引數的順序輸出的,有順序要求請用map 7 關於concurrent.futures.Future concurrent.future: 未來完成的操作,非同步程式設計 cancel():取消呼叫,若執行,不能取消;返回值表示是否可以取消 cancelled():返回是否已經取消 done():返回任務是否已經成功完成 result(timeout=None):返回呼叫的結果,如果還沒有完成,將會等待一定時間 exception(timeout=None):返回呼叫的異常 wait(fs, timeout=None, return_when=ALL_COMPLETED):讓主執行緒阻塞,直到滿足設定的要求 引數:等待的任務序列,超時時間,等待條件。ALL_COMPLETED表示要等待所有任務完成。 總結: 程序池:不受GIL全域性直譯器鎖的限制,縮短執行時間,使用多核處理的模組,推薦使用 執行緒池:不管多少處理器,執行的時候只有一個執行緒執行。【協程:多個執行緒之間互相渡讓cpu的控制權】 執行緒池/程序池 適用:處理多個客戶端請求的服務端部分 參考: [1] https://www.jianshu.com/p/b9b3d66aa0be [2] http://lovesoo.org/analysis-of-asynchronous-concurrent-python-module-concurrent-futures.html [3] https://python-parallel-programmning-cookbook.readthedocs.io/zh_CN/latest/chapter4/02_Using_the_concurrent.futures_Python_modules.html [4] https://docs.python.org/3/library/concurrent.futures.html [5] https://docs.python.org/3/library/concurrent.futures.html [6]https://www.jianshu.com/p/4fab1ffe8665 [7]https://docs.python.org/3.2/library/concurrent.futures.html ''' def run(num, **kwargs): time.sleep(num) return kwargs def processPoolExecutorAsCompleted(): with futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor: futureList = [executor.submit(run, i, **{str(i): i} ) for i in range(6, 0, -1)] results = [] # as_completed()是按照完成時間輸出的 # map()是按照輸入引數的順序輸出的 for future in futures.as_completed(futureList): result = future.result() results.append(result) return results def valid(): result = processPoolExecutor_as_completed() print result def runMap(num): time.sleep(num) return num def processPoolExecutorMap(): datas = [i for i in range(6, 0, -1)] with futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor: results = list(executor.map(runMap, datas)) return results def process(): result = processPoolExecutorAsCompleted() print result results = processPoolExecutorMap() print results if __name__ == "__main__": process()