1. 程式人生 > >Python並發編程之線程池/進程池--concurrent.futures模塊

Python並發編程之線程池/進程池--concurrent.futures模塊

when nod 模式 進程 d參數 executor 其他 done 對比

一、關於concurrent.futures模塊

  

  Python標準庫為我們提供了threading和multiprocessing模塊編寫相應的多線程/多進程代碼,但是當項目達到一定的規模,頻繁創建/銷毀進程或者線程是非常消耗資源的,這個時候我們就要編寫自己的線程池/進程池,以空間換時間。但從Python3.2開始,標準庫為我們提供了concurrent.futures模塊,它提供了ThreadPoolExecutor和ProcessPoolExecutor兩個類,實現了對threading和multiprocessing的進一步抽象,對編寫線程池/進程池提供了直接的支持。

1.Executor和Future:

  concurrent.futures模塊的基礎是Exectuor,Executor是一個抽象類,它不能被直接使用。但是它提供的兩個子類ThreadPoolExecutor和ProcessPoolExecutor卻是非常有用,顧名思義兩者分別被用來創建線程池和進程池的代碼。我們可以將相應的tasks直接放入線程池/進程池,不需要維護Queue來操心死鎖的問題,線程池/進程池會自動幫我們調度。

  Future這個概念相信有java和nodejs下編程經驗的朋友肯定不陌生了,你可以把它理解為一個在未來完成的操作,這是異步編程的基礎,傳統編程模式下比如我們操作queue.get的時候,在等待返回結果之前會產生阻塞,cpu不能讓出來做其他事情,而Future的引入幫助我們在等待的這段時間可以完成其他的操作。

  p.s: 如果你依然在堅守Python2.x,請先安裝futures模塊。

pip install futures 

二、操作線程池/進程池

1.使用submit來操作線程池/進程池:

# 線程池:
from concurrent.futures import ThreadPoolExecutor
import urllib.request
URLS = [http://www.163.com, https://www.baidu.com/, https://github.com/]
def load_url(url):
    with urllib.request.urlopen(url, timeout
=60) as conn: print(%r page is %d bytes % (url, len(conn.read()))) executor = ThreadPoolExecutor(max_workers=3) for url in URLS: future = executor.submit(load_url,url) print(future.done()) print(主線程) # 運行結果: False False False 主線程 https://www.baidu.com/ page is 227 bytes http://www.163.com page is 662047 bytes https://github.com/ page is 54629 bytes

  我們根據運行結果來分析一下。我們使用submit方法來往線程池中加入一個task,submit返回一個Future對象,對於Future對象可以簡單地理解為一個在未來完成的操作。由於線程池異步提交了任務,主線程並不會等待線程池裏創建的線程執行完畢,所以執行了print(‘主線程‘),相應的線程池中創建的線程並沒有執行完畢,故future.done()返回結果為False。

# 進程池:同上
from concurrent.futures import ProcessPoolExecutor
import urllib.request
URLS = [http://www.163.com, https://www.baidu.com/, https://github.com/]
def load_url(url):
    with urllib.request.urlopen(url, timeout=60) as conn:
        print(%r page is %d bytes % (url, len(conn.read())))

executor = ProcessPoolExecutor(max_workers=3)
if __name__ == __main__: # 要加main

    for url in URLS:
        future = executor.submit(load_url,url)
        print(future.done())
    print(主線程)

#運行結果:
False  # 子進程只完成創建,並沒有執行完成
False 
False
主線程 # 子進程創建完成就會向下執行主線程,並不會等待子進程執行完畢
http://www.163.com page is 662049 bytes
https://www.baidu.com/ page is 227 bytes
https://github.com/ page is 54629 bytes

2.使用map來操作線程池/進程池:

  除了submit,Exectuor還為我們提供了map方法,和內建的map用法類似:

from concurrent.futures import ThreadPoolExecutor
import urllib.request
URLS = [http://www.163.com, https://www.baidu.com/, https://github.com/]
def load_url(url):
    with urllib.request.urlopen(url, timeout=60) as conn:
        print(%r page is %d bytes % (url, len(conn.read())))

executor = ThreadPoolExecutor(max_workers=3)

executor.map(load_url,URLS)

print(主線程)

# 運行結果:
主線程
http://www.163.com page is 662047 bytes
https://www.baidu.com/ page is 227 bytes
https://github.com/ page is 54629 bytes

  從運行結果可以看出,map是按照URLS列表元素的順序返回的,並且寫出的代碼更加簡潔直觀,我們可以根據具體的需求任選一種。

3.wait:

  wait方法接會返回一個tuple(元組),tuple中包含兩個set(集合),一個是completed(已完成的)另外一個是uncompleted(未完成的)。使用wait方法的一個優勢就是獲得更大的自由度,它接收三個參數FIRST_COMPLETED, FIRST_EXCEPTIONALL_COMPLETE,默認設置為ALL_COMPLETED。

  如果采用默認的ALL_COMPLETED,程序會阻塞直到線程池裏面的所有任務都完成,再執行主線程:

from concurrent.futures import ThreadPoolExecutor,wait,as_completed
import urllib.request
URLS = [http://www.163.com, https://www.baidu.com/, https://github.com/]
def load_url(url):
    with urllib.request.urlopen(url, timeout=60) as conn:
        print(%r page is %d bytes % (url, len(conn.read())))

executor = ThreadPoolExecutor(max_workers=3)

f_list = []
for url in URLS:
    future = executor.submit(load_url,url)
    f_list.append(future)
print(wait(f_list))

print(主線程)

# 運行結果:
http://www.163.com page is 662047 bytes
https://www.baidu.com/ page is 227 bytes
https://github.com/ page is 54629 bytes
DoneAndNotDoneFutures(done={<Future at 0x2d0f898 state=finished returned NoneType>, <Future at 0x2bd0630 state=finished returned NoneType>, <Future at 0x2d27470 state=finished returned NoneType>}, not_done=set())
主線程

  如果采用FIRST_COMPLETED參數,程序並不會等到線程池裏面所有的任務都完成。

from concurrent.futures import ThreadPoolExecutor,wait,as_completed
import urllib.request
URLS = [http://www.163.com, https://www.baidu.com/, https://github.com/]
def load_url(url):
    with urllib.request.urlopen(url, timeout=60) as conn:
        print(%r page is %d bytes % (url, len(conn.read())))

executor = ThreadPoolExecutor(max_workers=3)

f_list = []
for url in URLS:
    future = executor.submit(load_url,url)
    f_list.append(future)
print(wait(f_list,return_when=FIRST_COMPLETED))

print(主線程)

# 運行結果:
http://www.163.com page is 662047 bytes
DoneAndNotDoneFutures(done={<Future at 0x2bd15c0 state=finished returned NoneType>}, not_done={<Future at 0x2d0d828 state=running>, <Future at 0x2d27358 state=running>})
主線程
https://www.baidu.com/ page is 227 bytes
https://github.com/ page is 54629 bytes

  ?寫一個小程序對比multiprocessing.pool(ThreadPool)和ProcessPollExecutor(ThreadPoolExecutor)在執行效率上的差距,結合上面提到的Future思考為什麽會造成這樣的結果?

Python並發編程之線程池/進程池--concurrent.futures模塊