1. 程式人生 > >python併發之concurrent.futures

python併發之concurrent.futures

concurrent:併發

  Python標準庫為我們提供了threading和multiprocessing模組編寫相應的多執行緒/多程序程式碼。從Python3.2開始,標準庫為我們提供了concurrent.futures模組,它提供了ThreadPoolExecutorProcessPoolExecutor兩個類,實現了對threadingmultiprocessing的更高階的抽象,對編寫執行緒池/程序池提供了直接的支援。 
concurrent.futures基礎模組是executor和future。

  Executor  

  Executor是一個抽象類,它不能被直接使用。它為具體的非同步執行定義了一些基本的方法。 ThreadPoolExecutor和ProcessPoolExecutor繼承了Executor,分別被用來建立執行緒池和程序池的程式碼。

  submit方法

  Executor中定義了submit()方法,這個方法的作用是提交一個可執行的回撥task,並返回一個future例項。future物件代表的就是給定的呼叫。

  我們使用submit方法來往執行緒池中加入一個task,submit返回一個Future物件,對於Future物件可以簡單地理解為一個在未來完成的操作。

  map方法

  Exectuor還為我們提供了map方法,和內建的map用法類似。對映。

  future

  Future例項是由Executor.submit()建立的。可以理解為一個在未來完成的操作,這是非同步程式設計的基礎。通常情況下,我們執行io操作,訪問url時(如下)在等待結果返回之前會產生阻塞,cpu不能做其他事情,而Future的引入幫助我們在等待的這段時間可以完成其他的操作。

  示例:

  

複製程式碼
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os,time,random
def foo(i):
    print('%s is running %s'%(os.getpid(),i))
    time.sleep(random.randint(1, 3))
    return i**2
if __name__ == '__main__':
    print('cpu_num:',os.cpu_count())
    executor=ProcessPoolExecutor()
    print('executor',executor,type(executor))
    # futures=[]
    # for i in range(10):
    #     future=executor.submit(foo,i)
    #     futures.append(future)
    futures=[executor.submit(foo,i) for i in range(10)]
    executor.shutdown()
    #程式執行到這裡有明顯的時間間隔,可見是在shutdown存在的情況下,程式將future全部執行完,才繼續往下走的
    print('主')
    print(futures)
    for future in futures:
        print(future.result())
複製程式碼

  輸出:

複製程式碼
cpu_num: 8
executor <concurrent.futures.process.ProcessPoolExecutor object at 0x00000276745AA978> <class 'concurrent.futures.process.ProcessPoolExecutor'>
11740 is running 0
3156 is running 1
9928 is running 2
2208 is running 3
2324 is running 4
13080 is running 5
1892 is running 6
2964 is running 7
2208 is running 8
2324 is running 9
主
[<Future at 0x27674900e10 state=finished returned int>, <Future at 0x27674949dd8 state=finished returned int>, <Future at 0x27674949e80 state=finished returned int>, <Future at 0x27674949f28 state=finished returned int>, <Future at 0x27674949fd0 state=finished returned int>, <Future at 0x2767495a0b8 state=finished returned int>, <Future at 0x2767495a198 state=finished returned int>, <Future at 0x2767495a278 state=finished returned int>, <Future at 0x2767495a358 state=finished returned int>, <Future at 0x2767495a438 state=finished returned int>]
0
1
4
9
16
25
36
49
64
81
複製程式碼

  

  利用ThreadProcessExecutor爬蟲

  

複製程式碼
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import requests
def get(url):
    r=requests.get(url)
    return {'url':url,'text':r.text}
def parse(future):
    dic=future.result()          #future物件呼叫result方法取其值、
    f=open('db.text','a')
    date='url:%s\n'%len(dic['text'])
    f.write(date)
    f.close()
if __name__ == '__main__':
    executor=ThreadPoolExecutor()
    url_l = ['http://cn.bing.com/', 'http://www.cnblogs.com/wupeiqi/', 'http://www.cnblogs.com/654321cc/',
                 'https://www.cnblogs.com/', 'http://society.people.com.cn/n1/2017/1012/c1008-29581930.html',
                 'http://www.xilu.com/news/shaonianxinzangyou5gedong.html', ]
    futures=[]
    for url in url_l:
        executor.submit(get,url).add_done_callback(parse)         #與Pool程序池回撥函式接收的是A函式的返回值(物件ApplyResult.get()得到的值)。
    executor.shutdown()                                           #這裡回撥函式parse,接收的引數是submit生成的 Future物件。
    print('主')
複製程式碼

  輸出: