1. 程式人生 > >python 64式: 第9式、執行緒池

python 64式: 第9式、執行緒池

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time

from concurrent import futures

'''
關鍵:
1、執行緒池提出原因:同時建立很多執行緒是需要消耗資源的,可以建立幾個執行緒,其他任務在等待執行緒池中執行緒
完成,就可以繼續處理
本質:將任務提交到執行緒池的任務佇列中
組成:等待佇列和一系列執行緒
作用: 主執行緒可以獲取執行緒狀態,返回值;一個執行緒完成,主執行緒可以知道

2、 concurrent.futures.Executor
作用:抽象類,有非同步執行呼叫方法。有兩個子類:
ThreadPoolExecutor(max_workers)和ProcessPoolExecutor(max_workers)
max_workers:表示有多少worker並行執行該任務,非同步呼叫,若為None,則設定為機器的處理器數目

3、 Executor.submit(fn, *args, **kwargs)
作用:排程函式的執行
引數: fn: 非同步執行的函式,*args: fn的引數,**kwargs: fn的引數
返回值: 返回一個Future物件,表示可呼叫的執行
注意: submit是立即返回的

4、 Executor.map(function, *iterables, timeout=None):
作用:將argument作為引數執行函式,以非同步方式執行;相當於map(func, *iterables)
但是func是非同步執行,如果操作超時,返回錯誤;不指定timeout,則不設定超時
引數: func:非同步執行函式,*iterables:可迭代物件,如列表,每一次func執行,都會從iterables中取引數

5、 Executor.shutdown(wait=True)
作用:釋放系統資源,在submit()或map()等非同步操作之後呼叫,使用with語句可以避免顯示呼叫該方法


6、 concurrent.futures.as_completed(fs, timeout=None)
作用:接收一個future列表,返回一個迭代器,在執行結束後刪除future,一次取出所有任務的結果
本質:是生成器,任務還沒有完成,會阻塞;先完成任務會先通知主執行緒

7 關於concurrent.futures.Future
concurrent.future: 未來完成的操作,非同步程式設計
cancel():取消呼叫,若執行,不能取消;返回值表示是否可以取消
cancelled():返回是否已經取消
done():返回任務是否已經成功完成
result(timeout=None):返回呼叫的結果,如果還沒有完成,將會等待一定時間
exception(timeout=None):返回呼叫的異常
wait(fs, timeout=None, return_when=ALL_COMPLETED):讓主執行緒阻塞,直到滿足設定的要求
引數:等待的任務序列,超時時間,等待條件。ALL_COMPLETED表示要等待所有任務完成。

總結:
程序池:不受GIL全域性直譯器鎖的限制,縮短執行時間,使用多核處理的模組,推薦使用
執行緒池:不管多少處理器,執行的時候只有一個執行緒執行。【協程:多個執行緒之間互相渡讓cpu的控制權】
執行緒池/程序池 適用:處理多個客戶端請求的服務端部分

參考:
[1] https://www.jianshu.com/p/b9b3d66aa0be
[2] http://lovesoo.org/analysis-of-asynchronous-concurrent-python-module-concurrent-futures.html
[3] https://python-parallel-programmning-cookbook.readthedocs.io/zh_CN/latest/chapter4/02_Using_the_concurrent.futures_Python_modules.html
[4] https://docs.python.org/3/library/concurrent.futures.html
[5] https://docs.python.org/3/library/concurrent.futures.html
'''


def run(num):
    time.sleep(num)
    return num


def threadPoolExecutor_submit():
    with futures.ThreadPoolExecutor(max_workers=1) as executor:
        future = executor.submit(run, 1)
        result = future.result()
        print result


def threadPoolExecutorMap():
    data = [1, 2, 3]
    with futures.ThreadPoolExecutor(max_workers=1) as executor:
        results = list(executor.map(run, data))
        return results


def process():
    results = threadPoolExecutorMap()
    print results


if __name__ == "__main__":
    process()