1. 程式人生 > >爬蟲高性能相關(主要基於異步io)

爬蟲高性能相關(主要基於異步io)

執行權限 exec 三次握手 hub event 空閑 data handler 數量

一背景常識

爬蟲的本質就是一個socket客戶端與服務端的通信過程,如果我們有多個url待爬取,采用串行的方式執行,只能等待爬取一個結束後才能繼續下一個,效率會非常低。

需要強調的是:串行並不意味著低效,如果串行的都是純計算的任務,那麽cpu的利用率仍然會很高,之所以爬蟲程序的串行低效,是因為爬蟲程序是明顯的IO密集型程序。

關於IO模型詳見鏈接:http://www.cnblogs.com/linhaifeng/articles/7454717.html

那麽該如何提高爬取性能呢?

二同步,異步,回調機制

1. 同步調用:即提交一個任務後就在原地等待任務結束,等到拿到任務的結果後再繼續下一行代碼,效率低下(串行)

技術分享圖片
import requests

def get_page(url):
    response=requests.get(url)
    if response.status_code == 200:
        return response.text


urls=[https://www.baidu.com/,http://www.sina.com.cn/,https://www.python.org]
for url in urls:
    res=get_page(url) #調用一個任務,就在原地等待任務結束拿到結果後才繼續往後執行
    print(len(res))

同步調用
View Code

2. 一個簡單的解決方案:多線程或多進程

#在服務器端使用多線程(或多進程)。多線程(或多進程)的目的是讓每個連接都擁有獨立的線程(或進程),這樣任何一個連接的阻塞都不會影響其他的連接。
技術分享圖片
from multiprocessing import Process
from threading import Thread
import requests

def get_page(url):
    response=requests.get(url)
    if response.status_code == 200:
        return response.text


if __name__ == __main__: urls=[https://www.baidu.com/,http://www.sina.com.cn/,https://www.python.org] for url in urls: p=Process(target=get_page,args=(url,)) p.start() # t=Thread(target=get_page,args=(url,)) # t.start()
多進程或多線程

該方案的問題是:

#開啟多進程或都線程的方式,我們是無法無限制地開啟多進程或多線程的:在遇到要同時響應成百上千路的連接請求,則無論多線程還是多進程都會嚴重占據系統資源,降低系統對外界響應效率,而且線程與進程本身也更容易進入假死狀態。

3. 改進方案: 線程池或進程池+異步調用:提交一個任務後並不會等待任務結束,而是繼續下一行代碼

#很多程序員可能會考慮使用“線程池”或“連接池”。“線程池”旨在減少創建和銷毀線程的頻率,其維持一定合理數量的線程,並讓空閑的線程重新承擔新的執行任務。“連接池”維持連接的緩存池,盡量重用已有的連接、減少創建和關閉連接的頻率。這兩種技術都可以很好的降低系統開銷,都被廣泛應用很多大型系統,如websphere、tomcat和各種數據庫等。
技術分享圖片
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import requests

def get_page(url):
    print(GET : %s %url)
    response=requests.get(url)
    if response.status_code == 200:
        return response.text


if __name__ == __main__:
    p=ProcessPoolExecutor()
    # p=ThreadPoolExecutor()

    urls=[https://www.baidu.com/,http://www.sina.com.cn/,https://www.python.org]
    for url in urls:
        p.submit(get_page,url)
    p.shutdown(wait=True)
進程池或線程池 技術分享圖片
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import requests
import os

def get_page(url):
    print(%s GET : %s %(os.getpid(),url))
    response=requests.get(url)
    if response.status_code == 200:
        return response.text

def parse_page(res):
    res=res.result()
    print(%s parsing %os.getpid())

if __name__ == __main__:
    p=ProcessPoolExecutor()
    # p=ThreadPoolExecutor()

    urls=[https://www.baidu.com/,http://www.sina.com.cn/,https://www.python.org]
    for url in urls:
        p.submit(get_page,url).add_done_callback(parse_page)
    p.shutdown(wait=True)
異步調用+回調機制

改進後方案其實也存在著問題:

#“線程池”和“連接池”技術也只是在一定程度上緩解了頻繁調用IO接口帶來的資源占用。而且,所謂“池”始終有其上限,當請求大大超過上限時,“池”構成的系統對外界的響應並不比沒有池的時候效果好多少。所以使用“池”必須考慮其面臨的響應規模,並根據響應規模調整“池”的大小。

對應上例中的所面臨的可能同時出現的上千甚至上萬次的客戶端請求,“線程池”或“連接池”或許可以緩解部分壓力,但是不能解決所有問題。總之,多線程模型可以方便高效的解決小規模的服務請求,但面對大規模的服務請求,多線程模型也會遇到瓶頸,可以用非阻塞接口來嘗試解決這個問題。

三高性能

上述無論哪種解決方案其實沒有解決一個性能相關的問題:IO阻塞,無論是多進程還是多線程,在遇到IO阻塞時都會被操作系統強行剝奪走CPU的執行權限,程序的執行效率因此就降低了下來。

解決這一問題的關鍵在於,我們自己從應用程序級別檢測IO阻塞然後切換到我們自己程序的其他任務執行,這樣把我們程序的IO降到最低,我們的程序處於就緒態就會增多,以此來迷惑操作系統,操作系統便以為我們的程序是IO比較少的程序,從而會盡可能多的分配CPU給

1. 在python3.3之後新增了asyncio模塊,可以幫我們檢測IO(只能是網絡IO),實現應用程序級別的切換

技術分享圖片
import asyncio

@asyncio.coroutine
def task(task_id,senconds):
    print(%s is start %task_id)
    yield from asyncio.sleep(senconds) #只能檢測網絡IO,檢測到IO後切換到其他任務執行
    print(%s is end %task_id)

tasks=[task(task_id=1,senconds=3),task(task_id=2,senconds=4)]

loop=asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
基本使用

2. 但asyncio模塊只能發tcp級別的請求,不能發http協議,因此,在我們需要發送http請求的時候,需要我們自定義http報頭

技術分享圖片
#我們爬取一個網頁的過程,以https://www.python.org/doc/為例,將關鍵步驟列舉如下
#步驟一:向www.python.org這臺主機發送tcp三次握手,是IO阻塞操作
#步驟二:封裝http協議的報頭
#步驟三:發送http協議的請求包,是IO阻塞操作
#步驟四:接收http協議的響應包,是IO阻塞操作
import asyncio

@asyncio.coroutine
def get_page(host,port=80,url=/):
    #步驟一(IO阻塞):發起tcp鏈接,是阻塞操作,因此需要yield from
    recv,send=yield from asyncio.open_connection(host,port)

    #步驟二:封裝http協議的報頭,因為asyncio模塊只能封裝並發送tcp包,因此這一步需要我們自己封裝http協議的包
    requset_headers="""GET %s HTTP/1.0\r\nHost: %s\r\n\r\n""" % (url, host,)
    # requset_headers="""POST %s HTTP/1.0\r\nHost: %s\r\n\r\nname=egon&password=123""" % (url, host,)
    requset_headers=requset_headers.encode(utf-8)

    #步驟三(IO阻塞):發送http請求包
    send.write(requset_headers)
    yield from send.drain()

    #步驟四(IO阻塞):接收http協議的響應包
    text=yield from recv.read()

    #其他處理
    print(host,url,text)
    send.close()
    print(-===>)
    return 1

tasks=[get_page(host=www.python.org,url=/doc),get_page(host=www.cnblogs.com,url=linhaifeng),get_page(host=www.openstack.org)]

loop=asyncio.get_event_loop()
results=loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

print(=====>,results) #[1, 1, 1]
asyncio+自定義http協議報頭

3. 自定義http報頭多少有點麻煩,於是有了aiohttp模塊,專門幫我們封裝http報頭,然後我們還需要用asyncio檢測IO實現切換

技術分享圖片
import aiohttp
import asyncio

@asyncio.coroutine
def get_page(url):
    print(GET:%s %url)
    response=yield from aiohttp.request(GET,url)

    data=yield from response.read()

    print(url,data)
    response.close()
    return 1

tasks=[
    get_page(https://www.python.org/doc),
    get_page(https://www.cnblogs.com/linhaifeng),
    get_page(https://www.openstack.org)
]

loop=asyncio.get_event_loop()
results=loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

print(=====>,results) #[1, 1, 1]
asyncio+aiohttp

4. 此外,還可以將requests.get函數傳給asyncio,就能夠被檢測了

技術分享圖片
import requests
import asyncio

@asyncio.coroutine
def get_page(func,*args):
    print(GET:%s %args[0])
    loog=asyncio.get_event_loop()
    furture=loop.run_in_executor(None,func,*args)
    response=yield from furture

    print(response.url,len(response.text))
    return 1

tasks=[
    get_page(requests.get,https://www.python.org/doc),
    get_page(requests.get,https://www.cnblogs.com/linhaifeng),
    get_page(requests.get,https://www.openstack.org)
]

loop=asyncio.get_event_loop()
results=loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

print(=====>,results) #[1, 1, 1]
asyncio+requests模塊的方法

5. 還有之前在協程時介紹的gevent模塊

技術分享圖片
from gevent import monkey;monkey.patch_all()
import gevent
import requests

def get_page(url):
    print(GET:%s %url)
    response=requests.get(url)
    print(url,len(response.text))
    return 1

# g1=gevent.spawn(get_page,‘https://www.python.org/doc‘)
# g2=gevent.spawn(get_page,‘https://www.cnblogs.com/linhaifeng‘)
# g3=gevent.spawn(get_page,‘https://www.openstack.org‘)
# gevent.joinall([g1,g2,g3,])
# print(g1.value,g2.value,g3.value) #拿到返回值


#協程池
from gevent.pool import Pool
pool=Pool(2)
g1=pool.spawn(get_page,https://www.python.org/doc)
g2=pool.spawn(get_page,https://www.cnblogs.com/linhaifeng)
g3=pool.spawn(get_page,https://www.openstack.org)
gevent.joinall([g1,g2,g3,])
print(g1.value,g2.value,g3.value) #拿到返回值
gevent+requests

6. 封裝了gevent+requests模塊的grequests模塊

技術分享圖片
#pip3 install grequests

import grequests

request_list=[
    grequests.get(https://wwww.xxxx.org/doc1),
    grequests.get(https://www.cnblogs.com/linhaifeng),
    grequests.get(https://www.openstack.org)
]


##### 執行並獲取響應列表 #####
# response_list = grequests.map(request_list)
# print(response_list)

##### 執行並獲取響應列表(處理異常) #####
def exception_handler(request, exception):
    # print(request,exception)
    print("%s Request failed" %request.url)

response_list = grequests.map(request_list, exception_handler=exception_handler)
print(response_list)
grequests

7. twisted:是一個網絡框架,其中一個功能是發送異步請求,檢測IO並自動切換

技術分享圖片
‘‘‘
#問題一:error: Microsoft Visual C++ 14.0 is required. Get it with "Microsoft Visual C++ Build Tools": http://landinghub.visualstudio.com/visual-cpp-build-tools
https://www.lfd.uci.edu/~gohlke/pythonlibs/#twisted
pip3 install C:\Users\Administrator\Downloads\Twisted-17.9.0-cp36-cp36m-win_amd64.whl
pip3 install twisted

#問題二:ModuleNotFoundError: No module named ‘win32api‘
https://sourceforge.net/projects/pywin32/files/pywin32/

#問題三:openssl
pip3 install pyopenssl
‘‘‘

#twisted基本用法
from twisted.web.client import getPage,defer
from twisted.internet import reactor

def all_done(arg):
    # print(arg)
    reactor.stop()

def callback(res):
    print(res)
    return 1

defer_list=[]
urls=[
    http://www.baidu.com,
    http://www.bing.com,
    https://www.python.org,
]
for url in urls:
    obj=getPage(url.encode(utf=-8),)
    obj.addCallback(callback)
    defer_list.append(obj)

defer.DeferredList(defer_list).addBoth(all_done)

reactor.run()




#twisted的getPage的詳細用法
from twisted.internet import reactor
from twisted.web.client import getPage
import urllib.parse


def one_done(arg):
    print(arg)
    reactor.stop()

post_data = urllib.parse.urlencode({check_data: adf})
post_data = bytes(post_data, encoding=utf8)
headers = {bContent-Type: bapplication/x-www-form-urlencoded}
response = getPage(bytes(http://dig.chouti.com/login, encoding=utf8),
                   method=bytes(POST, encoding=utf8),
                   postdata=post_data,
                   cookies={},
                   headers=headers)
response.addBoth(one_done)

reactor.run()
twisted的用法

8. tornado

技術分享圖片
from tornado.httpclient import AsyncHTTPClient
from tornado.httpclient import HTTPRequest
from tornado import ioloop


def handle_response(response):
    """
    處理返回值內容(需要維護計數器,來停止IO循環),調用 ioloop.IOLoop.current().stop()
    :param response: 
    :return: 
    """
    if response.error:
        print("Error:", response.error)
    else:
        print(response.body)


def func():
    url_list = [
        http://www.baidu.com,
        http://www.bing.com,
    ]
    for url in url_list:
        print(url)
        http_client = AsyncHTTPClient()
        http_client.fetch(HTTPRequest(url), handle_response)


ioloop.IOLoop.current().add_callback(func)
ioloop.IOLoop.current().start()
Tornado

爬蟲高性能相關(主要基於異步io)