1. 程式人生 > >爬蟲進階之非同步協程

爬蟲進階之非同步協程

一、背景

  之前爬蟲使用的是requests+多執行緒/多程序,後來隨著前幾天的深入瞭解,才發現,對於爬蟲來說,真正的瓶頸並不是CPU的處理速度,而是對於網頁抓取時候的往返時間,因為如果採用requests+多執行緒/多程序,他本身是阻塞式的程式設計,所以時間都花費在了等待網頁結果的返回和對爬取到的資料的寫入上面。而如果採用非阻塞程式設計,那麼就沒有這個困擾。這邊首先要理解一下阻塞和非阻塞的區別。

(1)阻塞呼叫是指呼叫結果返回之前,當前執行緒會被掛起(執行緒進入非可執行狀態,在這個狀態下,CPU不會給執行緒分配時間片,即執行緒暫停執行)。函式只有在得到結果之後才會返回。

(2)對於非阻塞則不會掛起,直接執行接下去的程式,返回結果後再回來處理返回值。

 

其實爬蟲的本質就是client發請求批量獲取server的響應資料,如果我們有多個url待爬取,只用一個執行緒且採用序列的方式執行,那隻能等待爬取一個結束後才能繼續下一個,效率會非常低。需要強調的是:對於單執行緒下序列N個任務,並不完全等同於低效,如果這N個任務都是純計算的任務,那麼該執行緒對cpu的利用率仍然會很高,之所以單執行緒下序列多個爬蟲任務低效,是因為爬蟲任務是明顯的IO密集型(阻塞)程式。那麼該如何提高爬取效能呢?

 

二、基本概念

2.1 阻塞

阻塞狀態指程式未得到所需計算資源時被掛起的狀態。程式在等待某個操作完成期間,自身無法繼續幹別的事情,則稱該程式在該操作上是阻塞的。

常見的阻塞形式有:網路 I/O 阻塞、磁碟 I/O 阻塞、使用者輸入阻塞等。阻塞是無處不在的,包括 CPU 切換上下文時,所有的程序都無法真正幹事情,它們也會被阻塞。如果是多核 CPU 則正在執行上下文切換操作的核不可被利用。

 

2.2 非阻塞

程式在等待某操作過程中,自身不被阻塞,可以繼續執行幹別的事情,則稱該程式在該操作上是非阻塞的。

非阻塞並不是在任何程式級別、任何情況下都可以存在的。僅當程式封裝的級別可以囊括獨立的子程式單元時,它才可能存在非阻塞狀態。

非阻塞的存在是因為阻塞存在,正因為某個操作阻塞導致的耗時與效率低下,我們才要把它變成非阻塞的。

 

2.3 同步

不同程式單元為了完成某個任務,在執行過程中需靠某種通訊方式以協調一致,稱這些程式單元是同步執行的。例如購物系統中更新商品庫存,需要用“行鎖”作為通訊訊號,讓不同的更新請求強制排隊順序執行,那更新庫存的操作是同步的。簡言之,同步意味著有序。

 

2.4 非同步

為完成某個任務,不同程式單元之間過程中無需通訊協調,也能完成任務的方式,不相關的程式單元之間可以是非同步的。例如,爬蟲下載網頁。排程程式呼叫下載程式後,即可排程其他任務,而無需與該下載任務保持通訊以協調行為。不同網頁的下載、儲存等操作都是無關的,也無需相互通知協調。這些非同步操作的完成時刻並不確定。簡言之,非同步意味著無序。

 

2.5 多程序

多程序就是利用 CPU 的多核優勢,在同一時間並行地執行多個任務,可以大大提高執行效率。

 

2.6 協程

協程,英文叫做 Coroutine,又稱微執行緒,纖程,協程是一種使用者態的輕量級執行緒。

協程擁有自己的暫存器上下文和棧。協程排程切換時,將暫存器上下文和棧儲存到其他地方,在切回來的時候,恢復先前儲存的暫存器上下文和棧。因此協程能保留上一次呼叫時的狀態,即所有區域性狀態的一個特定組合,每次過程重入時,就相當於進入上一次呼叫的狀態。

協程本質上是個單程序,協程相對於多程序來說,無需執行緒上下文切換的開銷,無需原子操作鎖定及同步的開銷,程式設計模型也非常簡單。

我們可以使用協程來實現非同步操作,比如在網路爬蟲場景下,我們發出一個請求之後,需要等待一定的時間才能得到響應,但其實在這個等待過程中,程式可以幹許多其他的事情,等到響應得到之後才切換回來繼續處理,這樣可以充分利用 CPU 和其他資源,這就是非同步協程的優勢。

 

三、分析處理 

  同步呼叫:即提交一個任務後就在原地等待任務結束,等到拿到任務的結果後再繼續下一行程式碼,效率低

import requests

def get_page(url):
    print('下載 %s' %url)
    response=requests.get(url)
    if response.status_code == 200:
        return response.text

def parse_page(res):
    print('解析 %s' %(len(res)))


def main():
    urls=['https://www.baidu.com/','http://www.sina.com.cn/','https://www.python.org']
    for url in urls:
        res=get_page(url)                         #呼叫一個任務,就在原地等待任務結束拿到結果後才繼續往後執行
        parse_page(res)

if __name__ == "__main__":
    main()

a. 解決同步呼叫方案之多執行緒/多程序

好處:在伺服器端使用多執行緒(或多程序)。多執行緒(或多程序)的目的是讓每個連線都擁有獨立的執行緒(或程序),這樣任何一個連線的阻塞都不會影響其他的連線。

弊端:開啟多程序或都執行緒的方式,我們是無法無限制地開啟多程序或多執行緒的:在遇到要同時響應成百上千路的連線請求,則無論多執行緒還是多程序都會嚴重佔據系統資源,降低系統對外界響應效率,而且執行緒與程序本身也更容易進入假死狀態。

b. 解決同步呼叫方案之執行緒/程序池

好處:很多程式設計師可能會考慮使用“執行緒池”或“連線池”。“執行緒池”旨在減少建立和銷燬執行緒的頻率,其維持一定合理數量的執行緒,並讓空閒的執行緒重新承擔新的執行任務。可以很好的降低系統開銷。

弊端:“執行緒池”和“連線池”技術也只是在一定程度上緩解了頻繁呼叫IO介面帶來的資源佔用。而且,所謂“池”始終有其上限,當請求大大超過上限時,“池”構成的系統對外界的響應並不比沒有池的時候效果好多少。所以使用“池”必須考慮其面臨的響應規模,並根據響應規模調整“池”的大小。

 

案例:基於multiprocessing.dummy執行緒池爬取梨視訊的視訊資訊

import requests
import re
from lxml import etree
from multiprocessing.dummy import Pool
import random

header = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.119 Safari/537.36'
}

def get_page(url):
    response = requests.get(url=url,headers=header)
    if response.status_code == 200:
        return response.text

    return None

def parse_page(res):
    tree = etree.HTML(res)
    li_list = tree.xpath('//div[@id="listvideoList"]/ul/li')

    video_url_list = []
    for li in li_list:
        detail_url = 'https://www.pearvideo.com/' + li.xpath('./div/a/@href')[0]
        detail_page = requests.get(url=detail_url, headers=header).text
        video_url = re.findall('srcUrl="(.*?)",vdoUrl', detail_page, re.S)[0]
        video_url_list.append(video_url)

    return video_url_list

# 獲取視訊
def getVideoData(url):
    return requests.get(url=url, headers=header).content

# 持久化儲存
def saveVideo(data):
    fileName = str(random.randint(0, 5000)) + '.mp4'  # 因回撥函式只能傳一個引數,所以沒辦法再傳名字了,只能自己取名
    with open(fileName, 'wb') as fp:
        fp.write(data)

def main():
    url = 'https://www.pearvideo.com/category_1'
    res = get_page(url)
    links = parse_page(res)


    pool = Pool(5)  # 例項化一個執行緒池物件

    #  pool.map(回撥函式,可迭代物件)函式依次執行物件
    video_data_list = pool.map(getVideoData, links)
    pool.map(saveVideo, video_data_list)
    
    pool.close()
    pool.join()

if __name__== "__main__":
    main()

總結:對應上例中的所面臨的可能同時出現的上千甚至上萬次的客戶端請求,“執行緒池”或“連線池”或許可以緩解部分壓力,但是不能解決所有問題。總之,多執行緒模型可以方便高效的解決小規模的服務請求,但面對大規模的服務請求,多執行緒模型也會遇到瓶頸,可以用非阻塞介面來嘗試解決這個問題。

 

終極處理方案

  上述無論哪種方案都沒有解決一個性能相關的問題:IO阻塞,無論是多程序還是多執行緒,在遇到IO阻塞時都會被作業系統強行剝奪走CPU的執行許可權,程式的執行效率因此就降低了下來。

  解決這一問題的關鍵在於,我們自己從應用程式級別檢測IO阻塞,然後切換到我們自己程式的其他任務執行,這樣把我們程式的IO降到最低,我們的程式處於就緒態就會增多,以此來迷惑作業系統,作業系統便以為我們的程式是IO比較少的程式,從而會盡可能多的分配CPU給我們,這樣也就達到了提升程式執行效率的目的。

  實現方式:單執行緒+協程實現非同步IO操作。

  非同步IO:就是你發起一個 網路IO 操作,卻不用等它結束,你可以繼續做其他事情,當它結束時,你會得到通知。

 

四、 非同步協程

在python3.4之後新增了asyncio模組,可以幫我們檢測IO(只能是網路IO【HTTP連線就是網路IO操作】),實現應用程式級別的切換(非同步IO)。注意:asyncio只能發tcp級別的請求,不能發http協議。

asyncio 是幹什麼的?

  • 非同步網路操作
  • 併發
  • 協程 

 

幾個概念:

event_loop:事件迴圈,相當於一個無限迴圈,我們可以把一些函式註冊到這個事件迴圈上,當滿足條件發生的時候,就會呼叫對應的處理方法。

coroutine:中文翻譯叫協程,在 Python 中常指代為協程物件型別,我們可以將協程物件註冊到時間迴圈中,它會被事件迴圈呼叫。我們可以使用 async 關鍵字來定義一個方法,這個方法在呼叫時不會立即被執行,而是返回一個協程物件。

task:任務,它是對協程物件的進一步封裝,包含了任務的各個狀態。

future:代表將來執行或沒有執行的任務的結果,實際上和 task 沒有本質區別。

async/await 關鍵字:async 定義一個協程,await 用來掛起阻塞方法的執行。

 

1.定義一個協程

import asyncio

async def execute(x):
    print('Number:', x)


coroutine = execute(1)
print('Coroutine:', coroutine)
print('After calling execute')

loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)
print('After calling loop') 

執行結果:

Coroutine: <coroutine object execute at 0x1034cf830>

After calling execute

Number: 1

After calling loop

可見,async 定義的方法就會變成一個無法直接執行的 coroutine 物件,必須將其註冊到事件迴圈中才可以執行。

上文我們還提到了 task,它是對 coroutine 物件的進一步封裝,它裡面相比 coroutine 物件多了執行狀態,比如 running、finished 等,我們可以用這些狀態來獲取協程物件的執行情況。

 

在上面的例子中,當我們將 coroutine 物件傳遞給 run_until_complete() 方法的時候,實際上它進行了一個操作就是將 coroutine 封裝成了 task 物件,我們也可以顯式地進行宣告,如下所示:

import asyncio

async def execute(x):
    print('Number:',x)
    return x

coroutine = execute(1)
print('Coroutine:',coroutine)print('After calling execute')

loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print('Task:',task)

loop.run_until_complete(task)
print('Task:',task)
print('After calling loop')

執行結果:

Coroutine: <coroutine object execute at 0x10e0f7830>

After calling execute

Task: <Task pending coro=<execute() running at demo.py:4>>

Number: 1

Task: <Task finished coro=<execute() done, defined at demo.py:4> result=1>

After calling loop

這裡我們定義了 loop 物件之後,接著呼叫了它的 create_task() 方法將 coroutine 物件轉化為了 task 物件,隨後我們列印輸出一下,發現它是 pending 狀態。接著我們將 task 物件新增到事件迴圈中得到執行,隨後我們再列印輸出一下 task 物件,發現它的狀態就變成了 finished,同時還可以看到其 result 變成了 1,也就是我們定義的 execute() 方法的返回結果。

 

另外,定義 task 物件還有一種方式,就是直接通過 asyncio 的 ensure_future() 方法,返回結果也是 task 物件,這樣的話我們就可以不借助於 loop 來定義,即使我們還沒有宣告 loop 也可以提前定義好 task 物件,寫法如下:

import asyncio

async def execute(x):
    print('Number:',x)
    return x

coroutine = execute(1)
print('Coroutine:',coroutine)
print('After calling execute')

task=asyncio.ensure_future(coroutine)
print('Task:',task)

loop=asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:',task)
print('After calling loop')

2.繫結回撥:也可以為某個 task 繫結一個回撥方法.

import asyncio
import requests

async def request():
    url='https://www.baidu.com'
    status = requests.get(url).status_code
    return status

def  callback(task):
    print('Status:',task.result())

coroutine = request()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
print('Task:',task)

loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:',task)

 執行結果:

Task: <Task pending coro=<request() running at demo.py:5> cb=[callback() at demo.py:11]>

Status: <Response [200]>

Task: <Task finished coro=<request() done, defined at demo.py:5> result=<Response [200]>>

在這裡我們定義了一個 request() 方法,請求了百度,返回狀態碼,但是這個方法裡面我們沒有任何 print() 語句。隨後我們定義了一個 callback() 方法,這個方法接收一個引數,是 task 物件,然後呼叫 print() 方法列印了 task 物件的結果。這樣我們就定義好了一個 coroutine 物件和一個回撥方法,我們現在希望的效果是,當 coroutine 物件執行完畢之後,就去執行宣告的 callback() 方法。

那麼它們二者怎樣關聯起來呢?很簡單,只需要呼叫 add_done_callback() 方法即可,我們將 callback() 方法傳遞給了封裝好的 task 物件,這樣當 task 執行完畢之後就可以呼叫 callback() 方法了,同時 task 物件還會作為引數傳遞給 callback() 方法,呼叫 task 物件的 result() 方法就可以獲取返回結果了。

實際上不用回撥方法,直接在 task 執行完畢之後也可以直接呼叫 result() 方法獲取結果,執行結果是一樣的。如下所示:

import asyncio
import requests

async def request():
    url='https://www.baidu.com'
    status=requests.get(url).status_code
    return status

coroutine=request()
task=asyncio.ensure_future(coroutine)
print('Task:',task)

loop=asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:',task)
print('Task Result:',task.result())

 

3.多工協程

  上面的例子我們只執行了一次請求,如果我們想執行多次請求應該怎麼辦呢?我們可以定義一個 task 列表,然後使用 asyncio 的 wait() 方法即可執行。

import asyncio
import requests

async def request():
    url = 'https://www.baidu.com'
    status = requests.get(url).status_code
    return status

tasks = [asyncio.ensure_future(request()) for _ in range(5)]
print('Tasks:',tasks)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
for task in tasks:
    print('Task Result:',task.result())

執行結果:

Tasks: [<Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>]

Task Result: <Response [200]>

Task Result: <Response [200]>

Task Result: <Response [200]>

Task Result: <Response [200]>

Task Result: <Response [200]>

這裡我們使用一個 for 迴圈建立了五個 task,組成了一個列表,然後把這個列表首先傳遞給了 asyncio 的 wait() 方法,然後再將其註冊到時間迴圈中,就可以發起五個任務了。

 

4.協程實現

  上面的案例只是為後面的使用作鋪墊,接下來我們正式來看下協程在解決 IO 密集型任務上有怎樣的優勢吧!

  為了表現出協程的優勢,我們需要先建立一個合適的實驗環境,最好的方法就是模擬一個需要等待一定時間才可以獲取返回結果的網頁,上面的程式碼中使用了百度,但百度的響應太快了,而且響應速度也會受本機網速影響,所以最好的方式是自己在本地模擬一個慢速伺服器,這裡我們選用 Flask。

伺服器程式碼:
from flask import Flask
import time
 
app = Flask(__name__)
 
@app.route('/')
def index():
    time.sleep(3)
return 'Hello!'

if __name__ == '__main__':
    app.run(threaded=True)                #這表明 Flask 啟動了多執行緒模式,不然預設是隻有一個執行緒的。

接下來我們再重新使用上面的方法請求一遍:

import asyncio
import requests
import time
 
start = time.time()
 
async def request():
    url = 'http://127.0.0.1:5000'
    print('Waiting for', url)
    response = requests.get(url)
    print('Get response from', url, 'Result:', response.text)
 
tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
 
end = time.time()
print('Cost time:', end - start)
執行結果如下:
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Cost time: 15.049368143081665

在這裡我們還是建立了五個 task,然後將 task 列表傳給 wait() 方法並註冊到時間迴圈中執行。

其實,要實現非同步處理,我們得先要有掛起的操作,當一個任務需要等待 IO 結果的時候,可以掛起當前任務,轉而去執行其他任務,這樣我們才能充分利用好資源,上面方法都是一本正經的序列走下來,連個掛起都沒有,怎麼可能實現非同步?

要實現非同步,接下來我們再瞭解一下 await 的用法,使用 await 可以將耗時等待的操作掛起,讓出控制權。當協程執行的時候遇到 await,時間迴圈就會將本協程掛起,轉而去執行別的協程,直到其他的協程掛起或執行完畢。

所以,我們可能會將程式碼中的 request() 方法改成如下的樣子:

async def request():
    url = 'http://127.0.0.1:5000'
    print('Waiting for', url)
    response = await requests.get(url)
print('Get response from', url, 'Result:', response.text)

僅僅是在 requests 前面加了一個 await,然而執行以下程式碼,會得到如下報錯:

Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Cost time: 15.048935890197754
Task exception was never retrieved
future: <Task finished coro=<request() done, defined at demo.py:7> exception=TypeError("object Response can't be used in 'await' expression",)>
Traceback (most recent call last):
  File "demo.py", line 10, in request
    status = await requests.get(url)
TypeError: object Response can't be used in 'await' expression

這次它遇到 await 方法確實掛起了,也等待了,但是最後卻報了這麼個錯,這個錯誤的意思是 requests 返回的 Response 物件不能和 await 一起使用,為什麼呢?因為根據官方文件說明,await 後面的物件必須是如下格式之一:

  • A native coroutine object returned from a native coroutine function,一個原生 coroutine 物件。
  • A generator-based coroutine object returned from a function decorated with types.coroutine(),一個由 types.coroutine() 修飾的生成器,這個生成器可以返回 coroutine 物件。
  • An object with an await__ method returning an iterator,一個包含 __await 方法的物件返回的一個迭代器。

reqeusts 返回的 Response 不符合上面任一條件,因此就會報上面的錯誤了。既然 await 後面可以跟一個 coroutine 物件,那麼我將請求頁面的方法獨立出來,並用 async 修飾,這樣就得到了一個 coroutine 物件

import asyncio
import requests
import time
 
start = time.time()
 
async def get(url):
    return requests.get(url)
 
async def request():
    url = 'http://127.0.0.1:5000'
    print('Waiting for', url)
    response = await get(url)
    print('Get response from', url, 'Result:', response.text)
 
tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
 
end = time.time()
print('Cost time:', end - start)

這裡我們,我們執行一下看看:

Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Cost time: 15.134317874908447

還是不行,它還不是非同步執行,也就是說我們僅僅將涉及 IO 操作的程式碼封裝到 async 修飾的方法裡面是不可行的!我們必須要使用支援非同步操作的請求方式才可以實現真正的非同步,所以這裡就需要 aiohttp 派上用場了。

 

5.使用 aiohttp 

-環境安裝:pip install aiohttp

我們將 aiohttp 用上來,將請求庫由 requests 改成了 aiohttp,通過 aiohttp 的 ClientSession 類的 get() 方法進行請求

import asyncio
import aiohttp
import time

start= time.time()

async def get(url):
    session = aiohttp.ClientSession()
    response = await session.get(url)
    result = await response.text()
    session.close()
    return result

async def request():
    url = 'http://127.0.0.1:5000'
    print('Waiting for',url)
    result = await get(url)
    print('Get response from',url,'Result:',result)

tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print('Cost time:', end - start)

結果如下:我們發現這次請求的耗時由 15 秒變成了 3 秒,耗時直接變成了原來的 1/5

Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Get response from http://127.0.0.1:5000 Result: Hello!
Get response from http://127.0.0.1:5000 Result: Hello!
Get response from http://127.0.0.1:5000 Result: Hello!
Get response from http://127.0.0.1:5000 Result: Hello!
Cost time: 3.0199508666992188

程式碼裡面我們使用了 await,後面跟了 get() 方法,在執行這五個協程的時候,如果遇到了 await,那麼就會將當前協程掛起,轉而去執行其他的協程,直到其他的協程也掛起或執行完畢,再進行下一個協程的執行。充分利用 CPU 時間,而不必把時間浪費在等待 IO 上

開始執行時,時間迴圈會執行第一個 task,針對第一個 task 來說,當執行到第一個 await 跟著的 get() 方法時,它被掛起,但這個 get() 方法第一步的執行是非阻塞的,掛起之後立馬被喚醒,所以立即又進入執行,建立了 ClientSession 物件,接著遇到了第二個 await,呼叫了 session.get() 請求方法,然後就被掛起了,由於請求需要耗時很久,所以一直沒有被喚醒,好第一個 task 被掛起了,那接下來該怎麼辦呢?事件迴圈會尋找當前未被掛起的協程繼續執行,於是就轉而執行第二個 task 了,也是一樣的流程操作,直到執行了第五個 task 的 session.get() 方法之後,全部的 task 都被掛起了。所有 task 都已經處於掛起狀態,那咋辦?只好等待了。3 秒之後,幾個請求幾乎同時都有了響應,然後幾個 task 也被喚醒接著執行,輸出請求結果,最後耗時,3 秒!

在上面的例子中,在發出網路請求後,既然接下來的 3 秒都是在等待的,在 3 秒之內,CPU 可以處理的 task 數量遠不止這些,那麼豈不是我們放 很多 個 task 一起執行,最後得到所有結果的耗時不都是 3 秒左右嗎?因為這幾個任務被掛起後都是一起等待的。理論來說確實是這樣的,不過有個前提,那就是伺服器在同一時刻接受無限次請求都能保證正常返回結果,也就是伺服器無限抗壓,另外還要忽略 IO 傳輸時延,確實可以做到無限 task 一起執行且在預想時間內得到結果。

我們這裡將 task 數量設定成 100,再試一下:

tasks = [asyncio.ensure_future(request()) for _ in range(100)]
耗時結果如下:
Cost time: 3.106252670288086

最後執行時間也是在 3 秒左右,當然多出來的時間就是 IO 時延了。可見,使用了非同步協程之後,我們幾乎可以在相同的時間內實現成百上千倍次的網路請求,把這個運用在爬蟲中,速度提升可謂是非常可觀了。

 

6. 與單程序、多程序對比

單程序

import requests
import time

start = time.time()


def request():
    url = 'http://127.0.0.1:5000'
    print('Waiting for', url)
    result = requests.get(url).text
    print('Get response from', url, 'Result:', result)


for _ in range(100):
    request()

end = time.time()
print('Cost time:', end - start)
最後耗時:
Cost time: 305.16639709472656

多程序
import requests
import time
import multiprocessing

start = time.time()


def request(_):
    url = 'http://127.0.0.1:5000'
    print('Waiting for', url)
    result = requests.get(url).text
    print('Get response from', url, 'Result:', result)


cpu_count = multiprocessing.cpu_count()
print('Cpu count:', cpu_count)
pool = multiprocessing.Pool(cpu_count)
pool.map(request, range(100))

end = time.time()
print('Cost time:', end - start)

這裡我使用了multiprocessing 裡面的 Pool 類,即程序池。我的電腦的 CPU 個數是 8 個,這裡的程序池的大小就是 8。

耗時:
Cost time: 48.17306900024414


7.與多程序結合

在最新的 PyCon 2018 上,來自 Facebook 的 John Reese 介紹了 asyncio 和 multiprocessing 各自的特點,並開發了一個新的庫,叫做 aiomultiprocess。需要 Python 3.6 及更高版本才可使用。

安裝:pip install aiomultiprocess

使用這個庫,我們可以將上面的例子改寫如下:

import asyncio
import aiohttp
import time
from aiomultiprocess import Pool
 
start = time.time()
 
async def get(url):
    session = aiohttp.ClientSession()
    response = await session.get(url)
    result = await response.text()
    session.close()
    return result
 
async def request():
    url = 'http://127.0.0.1:5000'
    urls = [url for _ in range(100)]
    async with Pool() as pool:
        result = await pool.map(get, urls)
        return result
 
coroutine = request()
task = asyncio.ensure_future(coroutine)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
 
end = time.time()
print('Cost time:', end - start)

這樣就會同時使用多程序和非同步協程進行請求,當然最後的結果其實和非同步是差不多的:

Cost time: 3.1156570434570312

因為我的測試介面的原因,最快的響應也是 3 秒,所以這部分多餘的時間基本都是 IO 傳輸時延。但在真實情況下,我們在做爬取的時候遇到的情況千變萬化,一方面我們使用非同步協程來防止阻塞,另一方面我們使用 multiprocessing 來利用多核成倍加速,節省時間其實還是非常可觀的。

 

參考連結: https://blog.csdn.net/zhusongziye/article/details/81637088