python 3 執行緒/程序池concurrent.futures模組使用
阿新 • • 發佈:2018-12-09
一、Executor和Future
concurrent.futures模組的基礎是Exectuor,Executor是一個抽象類,ThreadPoolExecutor和ProcessPoolExecutor是其非常有用的兩個子類。Future可以把它理解為一個在未來完成的操作,有非同步程式設計的概念。
二、執行緒池和程序池
1.使用submit來操作執行緒池/程序池:
執行緒池:
# 執行緒池: from concurrent.futures import ThreadPoolExecutor import urllib.request URLS = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/'] def load_url(url): with urllib.request.urlopen(url, timeout=60) as conn: print('%r page is %d bytes' % (url, len(conn.read()))) executor = ThreadPoolExecutor(max_workers=3) for url in URLS: future = executor.submit(load_url,url) print(future.done()) print('主執行緒') # 執行結果: False False False 主執行緒 'https://www.baidu.com/' page is 227 bytes 'http://www.163.com' page is 662047 bytes 'https://github.com/' page is 54629 bytes
程序池:
# 程序池:同上 from concurrent.futures import ProcessPoolExecutor import urllib.request URLS = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/'] def load_url(url): with urllib.request.urlopen(url, timeout=60) as conn: print('%r page is %d bytes' % (url, len(conn.read()))) executor = ProcessPoolExecutor(max_workers=3) if __name__ == '__main__': # 要加main for url in URLS: future = executor.submit(load_url,url) print(future.done()) print('主執行緒') #執行結果: False # 子程序只完成建立,並沒有執行完成 False False 主執行緒 # 子程序建立完成就會向下執行主執行緒,並不會等待子程序執行完畢 'http://www.163.com' page is 662049 bytes 'https://www.baidu.com/' page is 227 bytes 'https://github.com/' page is 54629 bytes
2.使用map來操作執行緒池/程序池:
from concurrent.futures import ThreadPoolExecutor import urllib.request URLS = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/'] def load_url(url): with urllib.request.urlopen(url, timeout=60) as conn: print('%r page is %d bytes' % (url, len(conn.read()))) executor = ThreadPoolExecutor(max_workers=3) executor.map(load_url,URLS) print('主執行緒') # 執行結果: 主執行緒 'http://www.163.com' page is 662047 bytes 'https://www.baidu.com/' page is 227 bytes 'https://github.com/' page is 54629 bytes
從執行結果可以看出,map是按照URLS列表元素的順序返回的,並且寫出的程式碼更加簡潔直觀,我們可以根據具體的需求任選一種。
3.wait:
wait方法接會返回一個tuple(元組),tuple中包含兩個set(集合),一個是completed(已完成的)另外一個是uncompleted(未完成的)。使用wait方法的一個優勢就是獲得更大的自由度,它接收三個引數FIRST_COMPLETED, FIRST_EXCEPTION 和ALL_COMPLETE,預設設定為ALL_COMPLETED。
如果採用預設的ALL_COMPLETED,程式會阻塞直到執行緒池裡面的所有任務都完成,再執行主執行緒:
from concurrent.futures import ThreadPoolExecutor,wait,as_completed
import urllib.request
URLS = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']
def load_url(url):
with urllib.request.urlopen(url, timeout=60) as conn:
print('%r page is %d bytes' % (url, len(conn.read())))
executor = ThreadPoolExecutor(max_workers=3)
f_list = []
for url in URLS:
future = executor.submit(load_url,url)
f_list.append(future)
print(wait(f_list))
print('主執行緒')
# 執行結果:
'http://www.163.com' page is 662047 bytes
'https://www.baidu.com/' page is 227 bytes
'https://github.com/' page is 54629 bytes
DoneAndNotDoneFutures(done={<Future at 0x2d0f898 state=finished returned NoneType>, <Future at 0x2bd0630 state=finished returned NoneType>, <Future at 0x2d27470 state=finished returned NoneType>}, not_done=set())
主執行緒
如果採用FIRST_COMPLETED引數,程式並不會等到執行緒池裡面所有的任務都完成。
from concurrent.futures import ThreadPoolExecutor,wait,as_completed
import urllib.request
URLS = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']
def load_url(url):
with urllib.request.urlopen(url, timeout=60) as conn:
print('%r page is %d bytes' % (url, len(conn.read())))
executor = ThreadPoolExecutor(max_workers=3)
f_list = []
for url in URLS:
future = executor.submit(load_url,url)
f_list.append(future)
print(wait(f_list,return_when='FIRST_COMPLETED'))
print('主執行緒')
# 執行結果:
'http://www.163.com' page is 662047 bytes
DoneAndNotDoneFutures(done={<Future at 0x2bd15c0 state=finished returned NoneType>}, not_done={<Future at 0x2d0d828 state=running>, <Future at 0x2d27358 state=running>})
主執行緒
'https://www.baidu.com/' page is 227 bytes
'https://github.com/' page is 54629 bytes
應用執行緒池:
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import requests
import time,os
def get_page(url):
print('<%s> is getting [%s]'%(os.getpid(),url))
response = requests.get(url)
if response.status_code==200: #200代表狀態:下載成功了
return {'url':url,'text':response.text}
def parse_page(res):
res = res.result()
print('<%s> is getting [%s]'%(os.getpid(),res['url']))
with open('db.txt','a') as f:
parse_res = 'url:%s size:%s\n'%(res['url'],len(res['text']))
f.write(parse_res)
if __name__ == '__main__':
# p = ThreadPoolExecutor()
p = ProcessPoolExecutor()
l = [
'http://www.baidu.com',
'http://www.baidu.com',
'http://www.baidu.com',
'http://www.baidu.com',
]
for url in l:
res = p.submit(get_page,url).add_done_callback(parse_page) #這裡的回撥函式拿到的是一個物件。得
# 先把返回的res得到一個結果。即在前面加上一個res.result() #誰好了誰去掉回撥函式
# 回撥函式也是一種程式設計思想。不僅開執行緒池用,開執行緒池也用
p.shutdown() #相當於程序池裡的close和join
print('主',os.getpid())