1. 程式人生 > >python 3 執行緒/程序池concurrent.futures模組使用

python 3 執行緒/程序池concurrent.futures模組使用

一、Executor和Future

      concurrent.futures模組的基礎是Exectuor,Executor是一個抽象類,ThreadPoolExecutor和ProcessPoolExecutor是其非常有用的兩個子類。Future可以把它理解為一個在未來完成的操作,有非同步程式設計的概念。

二、執行緒池和程序池

1.使用submit來操作執行緒池/程序池:

執行緒池:

# 執行緒池:
from concurrent.futures import ThreadPoolExecutor
import urllib.request
URLS = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']
def load_url(url):
    with urllib.request.urlopen(url, timeout=60) as conn:
        print('%r page is %d bytes' % (url, len(conn.read())))

executor = ThreadPoolExecutor(max_workers=3)

for url in URLS:
    future = executor.submit(load_url,url)
    print(future.done())

print('主執行緒')

# 執行結果:
False
False
False
主執行緒
'https://www.baidu.com/' page is 227 bytes
'http://www.163.com' page is 662047 bytes
'https://github.com/' page is 54629 bytes

 

程序池:

 

# 程序池:同上
from concurrent.futures import ProcessPoolExecutor
import urllib.request
URLS = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']
def load_url(url):
    with urllib.request.urlopen(url, timeout=60) as conn:
        print('%r page is %d bytes' % (url, len(conn.read())))

executor = ProcessPoolExecutor(max_workers=3)
if __name__ == '__main__': # 要加main

    for url in URLS:
        future = executor.submit(load_url,url)
        print(future.done())
    print('主執行緒')

#執行結果:
False  # 子程序只完成建立,並沒有執行完成
False 
False
主執行緒 # 子程序建立完成就會向下執行主執行緒,並不會等待子程序執行完畢
'http://www.163.com' page is 662049 bytes
'https://www.baidu.com/' page is 227 bytes
'https://github.com/' page is 54629 bytes

 

2.使用map來操作執行緒池/程序池:

from concurrent.futures import ThreadPoolExecutor
import urllib.request
URLS = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']
def load_url(url):
    with urllib.request.urlopen(url, timeout=60) as conn:
        print('%r page is %d bytes' % (url, len(conn.read())))

executor = ThreadPoolExecutor(max_workers=3)

executor.map(load_url,URLS)

print('主執行緒')

# 執行結果:
主執行緒
'http://www.163.com' page is 662047 bytes
'https://www.baidu.com/' page is 227 bytes
'https://github.com/' page is 54629 bytes

 

 

  從執行結果可以看出,map是按照URLS列表元素的順序返回的,並且寫出的程式碼更加簡潔直觀,我們可以根據具體的需求任選一種。

3.wait:

  wait方法接會返回一個tuple(元組),tuple中包含兩個set(集合),一個是completed(已完成的)另外一個是uncompleted(未完成的)。使用wait方法的一個優勢就是獲得更大的自由度,它接收三個引數FIRST_COMPLETED, FIRST_EXCEPTION 和ALL_COMPLETE,預設設定為ALL_COMPLETED。

  如果採用預設的ALL_COMPLETED,程式會阻塞直到執行緒池裡面的所有任務都完成,再執行主執行緒:

 

from concurrent.futures import ThreadPoolExecutor,wait,as_completed
import urllib.request
URLS = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']
def load_url(url):
    with urllib.request.urlopen(url, timeout=60) as conn:
        print('%r page is %d bytes' % (url, len(conn.read())))

executor = ThreadPoolExecutor(max_workers=3)

f_list = []
for url in URLS:
    future = executor.submit(load_url,url)
    f_list.append(future)
print(wait(f_list))

print('主執行緒')

# 執行結果:
'http://www.163.com' page is 662047 bytes
'https://www.baidu.com/' page is 227 bytes
'https://github.com/' page is 54629 bytes
DoneAndNotDoneFutures(done={<Future at 0x2d0f898 state=finished returned NoneType>, <Future at 0x2bd0630 state=finished returned NoneType>, <Future at 0x2d27470 state=finished returned NoneType>}, not_done=set())
主執行緒

 

  如果採用FIRST_COMPLETED引數,程式並不會等到執行緒池裡面所有的任務都完成。

 

from concurrent.futures import ThreadPoolExecutor,wait,as_completed
import urllib.request
URLS = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']
def load_url(url):
    with urllib.request.urlopen(url, timeout=60) as conn:
        print('%r page is %d bytes' % (url, len(conn.read())))

executor = ThreadPoolExecutor(max_workers=3)

f_list = []
for url in URLS:
    future = executor.submit(load_url,url)
    f_list.append(future)
print(wait(f_list,return_when='FIRST_COMPLETED'))

print('主執行緒')

# 執行結果:
'http://www.163.com' page is 662047 bytes
DoneAndNotDoneFutures(done={<Future at 0x2bd15c0 state=finished returned NoneType>}, not_done={<Future at 0x2d0d828 state=running>, <Future at 0x2d27358 state=running>})
主執行緒
'https://www.baidu.com/' page is 227 bytes
'https://github.com/' page is 54629 bytes

 

 應用執行緒池:

 

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import requests
import time,os
def get_page(url):
    print('<%s> is getting [%s]'%(os.getpid(),url))
    response = requests.get(url)
    if response.status_code==200:  #200代表狀態:下載成功了
        return {'url':url,'text':response.text}
def parse_page(res):
    res = res.result()
    print('<%s> is getting [%s]'%(os.getpid(),res['url']))
    with open('db.txt','a') as f:
        parse_res = 'url:%s size:%s\n'%(res['url'],len(res['text']))
        f.write(parse_res)
if __name__ == '__main__':
    # p = ThreadPoolExecutor()
    p = ProcessPoolExecutor()
    l = [
        'http://www.baidu.com',
        'http://www.baidu.com',
        'http://www.baidu.com',
        'http://www.baidu.com',
    ]
    for url in l:
        res = p.submit(get_page,url).add_done_callback(parse_page) #這裡的回撥函式拿到的是一個物件。得
        #  先把返回的res得到一個結果。即在前面加上一個res.result() #誰好了誰去掉回撥函式
                                # 回撥函式也是一種程式設計思想。不僅開執行緒池用,開執行緒池也用
    p.shutdown()  #相當於程序池裡的close和join
    print('主',os.getpid())