1. 程式人生 > >自定義爬蟲架構之多線程爬蟲與異步爬蟲

自定義爬蟲架構之多線程爬蟲與異步爬蟲

重試 all 斷言 python3 pla info task webkit header

async/await關鍵字是出現在python3.4以後。網上已經有很多文章對async/await這兩個關鍵字都有講解,包括如何由python2的yield from發展到async/await這兩個關鍵字,以及一些代碼實現都有。但是對於像我這樣初次接觸的人來說,光看代碼分析也不一定能理解,我也是在度娘上搜索很多相關的網站,當中也有官網,都沒有發現能讓我一眼看懂在什麽地方可以用await,什麽情況用await的文章。經過自己的重新思考,總算對async、await有一些初步的了解,所以想把自己的理解記錄下來,希望對一些學習協程或者異步的初學者也有一定的幫助。

對於網上能搜到的一些代碼實現、例子,這裏就不重復了。

一、首先要知道什麽是協程、異步。

舉個例子:假設有1個洗衣房,裏面有10臺洗衣機,有一個洗衣工在負責這10臺洗衣機。那麽洗衣房就相當於1個進程,洗衣工就相當1個線程。如果有10個洗衣工,就相當於10個線程,1個進程是可以開多線程的。這就是多線程!

那麽協程呢?先不急。大家都知道,洗衣機洗衣服是需要等待時間的,如果10個洗衣工,1人負責1臺洗衣機,這樣效率肯定會提高,但是不覺得浪費資源嗎?明明1 個人能做的事,卻要10個人來做。只是把衣服放進去,打開開關,就沒事做了,等衣服洗好再拿出來就可以了。就算很多人來洗衣服,1個人也足以應付了,開好第一臺洗衣機,在等待的時候去開第二臺洗衣機,再開第三臺,……直到有衣服洗好了,就回來把衣服取出來,接著再取另一臺的(哪臺洗好先就取哪臺,所以協程是無序的)。這就是計算機的協程!洗衣機就是執行的方法。

當你程序中方法需要等待時間的話,就可以用協程,效率高,消耗資源少。

好了!現在來總結一下:

洗衣房 ==> 進程

洗衣工 ==> 線程

洗衣機 ==> 方法(函數)

二、Python中的異步

對於一些熟悉編寫傳統python代碼的人來說,轉換到異步程序可能有些不好接受。Python中的異步程序依賴於 Coroutines(協程) ,它與event loop(事件循環)一同工作,寫出的代碼像是執行多個小任務的片段。 協程可以看作是在代碼中有一些帶點函數,這些帶點函數又是控制程序回調中的上下文,除了通過上下文交換數據,這些“yield”點還可以暫停和恢復協程執行。

事件循環決定了可以在任何指定時刻運行代碼塊—它負責協程之間的暫停、恢復和通信。 這意味著不同協程的最終可能以不同於它們之前被安排的順序執行。 這種不按固定順序運行不同代碼塊的想法稱為異步。

可以在 HTTP 請求的場景中闡述異步的重要性。設想要向服務器發大量的請求。比如,要查詢一個網站,以獲得指定賽季所有運動員的統計信息。

我們可以按順序依次發出每個請求。然而,對於每個請求,可以想象到可能會花一些時間等待上一個請求被發送到服務器,且收到服務器響應。

但是有時,這些無用的花銷甚至可能需要幾秒鐘。因為程序可能會遇到網絡延遲,訪問數量過多,又或者是對方服務器的速度限制等問題。

如果我們的代碼可以在等待服務器響應的同時做其他事情呢?而且,如果它只在響應數據到達後才處理返回數據呢?如果我們不必等到每個單獨的請求都完成之後才繼續處理列表中的下一個請求,那麽我們可以快速地連續發出許多請求。

具有event loop的協程就可以讓我們的代碼支持以這樣的形式運行。

三、async\await 的使用

正常的函數在執行時是不會中斷的,所以你要寫一個能夠中斷的函數,就需要添加async關鍵。

async 用來聲明一個函數為異步函數,異步函數的特點是能在函數執行過程中掛起,去執行其他異步函數,等到掛起條件(假設掛起條件是sleep(5))消失後,也就是5秒到了再回來執行。

await 用來用來聲明程序掛起,比如異步程序執行到某一步時需要等待的時間很長,就將此掛起,去執行其他的異步程序。await 後面只能跟異步程序或有__await__屬性的對象,因為異步程序與一般程序不同。假設有兩個異步函數async a,async b,a中的某一步有await,當程序碰到關鍵字await b()後,異步程序掛起後去執行另一個異步b程序,就是從函數內部跳出去執行其他函數,當掛起條件消失後,不管b是否執行完,要馬上從b程序中跳出來,回到原程序執行原來的操作。如果await後面跟的b函數不是異步函數,那麽操作就只能等b執行完再返回,無法在b執行的過程中返回。如果要在b執行完才返回,也就不需要用await關鍵字了,直接調用b函數就行。所以這就需要await後面跟的是異步函數了。在一個異步函數中,可以不止一次掛起,也就是可以用多個await。

import time
import asyncio
import requests


async def test2(i):
    r = await other_test(i)
    print(i,r)


async def other_test(i):
    r = requests.get(i)
    print(i)
    await asyncio.sleep(4)
    print(time.time()-start)
    return r


if __name__ == ‘__main__‘:
    url = ["https://segmentfault.com/p/1210000013564725",
           "https://www.jianshu.com/p/83badc8028bd",
           "https://www.baidu.com/"]
    task = [asyncio.ensure_future(test2(i)) for i in url]
    start = time.time()
    asyncio.get_event_loop().run_until_complete(asyncio.wait(task))

技術分享圖片

四、aiohttp

安裝aiohttp

pip install aiohttp

客戶端:發送請求

下面的示例演示了如何使用 aiohttp 下載某些urtl的HTML內容:

import asyncio

import aiohttp


async def make_request(url):
    print(f"making request to {url}")
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            if resp.status == 200:
                print(await resp.text())


if __name__ == ‘__main__‘:
    urls = ["https://segmentfault.com/p/1210000013564725",
            "https://www.jianshu.com/p/83badc8028bd",
            "https://www.baidu.com/"]
    task = [asyncio.ensure_future(make_request(url)) for url in urls]
    asyncio.get_event_loop().run_until_complete(asyncio.wait(task))
  • 有幾點需要強調:
  • 和前面的 await asyncio.sleep 一樣,要獲取HTML頁面的內容,必須在 resp.text() 前面使用 await 。否則程序打印出來的內容會是這樣:
making request to https://www.baidu.com
<coroutine object ClientResponse.text at 0x109b8ddb0>
  • async with 是一個上下文管理器,它接收的是協程而不是函數。在這裏的兩處使用,是用於在內部自動關閉到服務器的連接釋放資源。

  • aiohttp.ClientSession 具有和 HTTP 方法相同的方法,session.get 發送 GET 請求,session.post 發送 POST 請求。

這個例子本身並不比同步HTTP請求有多大性能優勢。aiohttp 客戶端真正優勢在於多個請求並發:

import asyncio
import aiohttp


async def make_request(session, req_n):
    url = "https://www.baidu.com"
    print(f"making request to {req_n} to {url}")
    async with session.get(url) as resp:
        if resp.status == 200:
            print(await resp.text())


async def main():
    n_request = 100
    async with aiohttp.ClientSession() as session:
        await asyncio.gather(*[make_request(session, i) for i in range(n_request)])

if __name__ == ‘__main__‘:
    asyncio.get_event_loop().run_until_complete(main())

五、自定義協程異步框架

# -*- coding: utf-8 -*-

"""
@Datetime: 2019/5/2
@Author: Zhang Yafei
"""
import asyncio
import functools
import time
from collections import Iterator, Iterable
from pyquery import PyQuery as pq

import aiohttp

headers = {
    ‘User-Agent‘: ‘Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36‘}


class Request(object):
    """
    用於封裝用戶請求信息所用
    """

    def __init__(self, url, callback):
        self.url = url
        self.callback = callback


class RequestManager(object):
    def __init__(self):
        self.new_requests = set()
        self.old_requests = set()

    def has_new_request(self):
        """ 判斷是否有未爬取的url """
        return self.new_request_size() != 0

    async def get_new_request(self):
        """ 獲取一個未爬取的請求 """
        new_request = self.new_requests.pop()
        # 提取之後,將其添加到已爬取的鏈接中
        self.old_requests.add(new_request)
        return new_request

    async def add_new_request(self, request):
        """ 將新請求添加到未爬取的集合中(單個請求) """
        if request is None:
            return
        if request not in self.new_requests and request not in self.old_requests:
            self.new_requests.add(request)

    async def add_new_requests(self, requests):
        """ 將新請求添加到未爬取的集合中(集合) """
        if requests is None or len(requests) == 0:
            return
        for request in requests:
            self.add_new_request(request)

    def new_request_size(self):
        """ 獲取未爬取的url大小 """
        return len(self.new_requests)

    def old_request_size(self):
        """ 獲取已爬取的url大小 """
        return len(self.old_requests)


class HTMLParser(object):
    def __init__(self):
        pass

    async def parse(self, response):
        items = {‘name‘: ‘parse‘}
        doc = pq(await response.text())
        # print(doc)
        print(‘parse---‘, response.url)
        return [Request(url=‘http://www.sxmu.edu.cn/‘, callback=self.next_parse)]
        # return [items]

    async def next_parse(self, response):
        doc = pq(await response.text())
        print(‘next parse‘, response.url)
        items = {‘next_parse‘: response.url}
        return [items]


class DataMemory(object):
    def __init__(self):
        pass

    async def store_data(self, data):
        print(‘store data: ‘, data)

    def to_csv(self):
        pass

    def to_excel(self):
        pass

    def save_mongo(self):
        pass

    def save_redis(self):
        pass

    def save_mysql(self):
        pass


class Crawler(object):
    def __init__(self, urls):
        self.manager = RequestManager()
        self.parser = HTMLParser()
        self.data_memory = DataMemory()
        self.urls = urls

    def filter_downloaded_urls(self):
        return self.urls

    async def start(self, session, request):
        await self.manager.add_new_request(request)
        while self.manager.has_new_request() and self.manager.old_request_size() < 100:
            # 1. 取出新請求
            new_request = await self.manager.get_new_request()
            print(new_request.url)
            # 2. 將請求的url進行下載
            async with session.get(new_request.url) as response:
                if response.status == 200:
                    # 3. 將下載的Html文本進行解析
                    result = await new_request.callback(response)
                    # 4. 判斷解析之後返回的數據對象為新請求還是解析的數據
                    if not isinstance(result, Iterable):
                        raise Exception(‘返回的數據類型不可叠代‘)
                    for ret in result:
                        if isinstance(ret, Request):
                            # 5. 如果是新請求,則加入到請求管理器
                            await self.manager.add_new_request(ret)
                        elif isinstance(ret, dict) and ret:
                            # 6. 如果是解析的數據,則將數據進行存儲
                            await self.data_memory.store_data(ret)
                        else:
                            raise Exception(‘返回數據類型或格式不正確,只能返回Request的實例對象或字典的實例對象,且不能為空‘)

    async def run(self):
        url_list = self.filter_downloaded_urls()
        async with aiohttp.ClientSession() as session:
            await asyncio.gather(*[self.start(session, Request(url, self.parser.parse)) for url in url_list])


def timeit(func):
    @functools.wraps(func)
    def inner(*args, **kwargs):
        start = time.time()
        ret = func(*args, **kwargs)
        print(time.time() - start)
        return ret
    return inner


@timeit
def main():
    urls = [‘https://mp.weixin.qq.com‘ for _ in range(10)]
    crawl = Crawler(urls)
    asyncio.get_event_loop().run_until_complete(crawl.run())


if __name__ == ‘__main__‘:
    main() 
技術分享圖片
https://mp.weixin.qq.com
https://mp.weixin.qq.com
https://mp.weixin.qq.com
https://mp.weixin.qq.com
https://mp.weixin.qq.com
https://mp.weixin.qq.com
https://mp.weixin.qq.com
https://mp.weixin.qq.com
https://mp.weixin.qq.com
https://mp.weixin.qq.com
parse--- https://mp.weixin.qq.com
http://www.sxmu.edu.cn/
parse--- https://mp.weixin.qq.com
http://www.sxmu.edu.cn/
parse--- https://mp.weixin.qq.com
http://www.sxmu.edu.cn/
parse--- https://mp.weixin.qq.com
http://www.sxmu.edu.cn/
parse--- https://mp.weixin.qq.com
http://www.sxmu.edu.cn/
parse--- https://mp.weixin.qq.com
http://www.sxmu.edu.cn/
parse--- https://mp.weixin.qq.com
http://www.sxmu.edu.cn/
parse--- https://mp.weixin.qq.com
http://www.sxmu.edu.cn/
parse--- https://mp.weixin.qq.com
http://www.sxmu.edu.cn/
parse--- https://mp.weixin.qq.com
http://www.sxmu.edu.cn/
next parse http://www.sxmu.edu.cn/
store data:  {next_parse: URL(http://www.sxmu.edu.cn/)}
next parse http://www.sxmu.edu.cn/
store data:  {next_parse: URL(http://www.sxmu.edu.cn/)}
next parse http://www.sxmu.edu.cn/
store data:  {next_parse: URL(http://www.sxmu.edu.cn/)}
next parse http://www.sxmu.edu.cn/
store data:  {next_parse: URL(http://www.sxmu.edu.cn/)}
next parse http://www.sxmu.edu.cn/
store data:  {next_parse: URL(http://www.sxmu.edu.cn/)}
next parse http://www.sxmu.edu.cn/
store data:  {next_parse: URL(http://www.sxmu.edu.cn/)}
next parse http://www.sxmu.edu.cn/
store data:  {next_parse: URL(http://www.sxmu.edu.cn/)}
next parse http://www.sxmu.edu.cn/
store data:  {next_parse: URL(http://www.sxmu.edu.cn/)}
next parse http://www.sxmu.edu.cn/
store data:  {next_parse: URL(http://www.sxmu.edu.cn/)}
next parse http://www.sxmu.edu.cn/
store data:  {next_parse: URL(http://www.sxmu.edu.cn/)}
1.517087459564209
測試結果

六、自定義多線程爬蟲架構

# -*- coding: utf-8 -*-

"""
@Datetime: 2019/5/1
@Author: Zhang Yafei
"""
import functools
import os
import time
from collections import Iterator

from retrying import retry
import requests
from concurrent.futures import ThreadPoolExecutor

headers = {‘User-Agent‘: ‘Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36‘}


class Request(object):
    """
    用於封裝用戶請求信息所用
    """
    def __init__(self, url, callback):
        self.url = url
        self.callback = callback


class RequestManager(object):
    def __init__(self):
        self.new_requests = set()
        self.old_requests = set()

    def has_new_request(self):
        """ 判斷是否有未爬取的url """
        return self.new_request_size() != 0

    def get_new_request(self):
        """ 獲取一個未爬取的請求 """
        new_request = self.new_requests.pop()
        # 提取之後,將其添加到已爬取的鏈接中
        self.old_requests.add(new_request)
        return new_request

    def add_new_request(self, request):
        """ 將新請求添加到未爬取的集合中(單個請求) """
        if request is None:
            return
        if request not in self.new_requests and request not in self.old_requests:
            self.new_requests.add(request)

    def add_new_urls(self, requests):
        """ 將新請求添加到未爬取的集合中(集合) """
        if requests is None or len(requests) == 0:
            return
        for request in requests:
            self.add_new_request(request)

    def new_request_size(self):
        """ 獲取未爬取的url大小 """
        return len(self.new_requests)

    def old_request_size(self):
        """ 獲取已爬取的url大小 """
        return len(self.old_requests)


class HTMLDownLoad(object):

    @retry(stop_max_attempt_number=3)
    def retry_download(self, url):
        """
        通過裝飾器封裝重試下載模塊,最多重試三次
        :param url: 下載網頁的最終地址
        """
        result = requests.get(url, headers=headers, timeout=3)
        assert result.status_code == 200  # 使用斷言判斷下載狀態,成功則返回結果,失敗拋出異常
        return result

    def download(self, url):
        """
        真正的下載類,代理模式
        :param url:下載的鏈接
        """
        try:
            result = self.retry_download(url)
        except Exception as e:  # 異常處理盡量使用具體的異常
            print(e)
            result = None
        return result


class HTMLParser(object):
    def __init__(self):
        pass

    def parse(self, response):
        items = {}
        # print(response.result())
        # print(response.url)
        print(‘parse---‘, response)
        yield Request(url=‘http://www.sxmu.edu.cn/‘, callback=self.next_parse)
        # yield items

    def next_parse(self, response):
        print(‘next parse‘, response.url)
        items = {‘next_parse‘: response.url}
        yield items


class DataMemory(object):
    def __init__(self):
        pass

    def store_data(self, data):
        print(‘store data‘)

    def to_csv(self):
        pass

    def to_excel(self):
        pass

    def save_mongo(self):
        pass

    def save_redis(self):
        pass

    def save_mysql(self):
        pass


class Crawler(object):
    def __init__(self, urls):
        self.manager = RequestManager()
        self.downloader = HTMLDownLoad()
        self.parser = HTMLParser()
        self.data_memory = DataMemory()
        self.urls = urls

    def filter_downloaded_urls(self):
        return self.urls

    def start(self, request):
        self.manager.add_new_request(request)
        while self.manager.has_new_request() and self.manager.old_request_size() < 100:
            # 1. 取出新請求
            new_request = self.manager.get_new_request()
            print(new_request.url)
            # 2. 將請求的url進行下載
            response = self.downloader.download(new_request.url)
            # 3. 將下載的Html文本進行解析
            result = new_request.callback(response)
            # 4. 判斷解析之後返回的數據對象為新請求還是解析的數據
            if not isinstance(result, Iterator):
                raise Exception(‘返回的數據類型不是叠代器‘)
            for ret in result:
                if isinstance(ret, Request):
                    # 5. 如果是新請求,則加入到請求管理器
                    self.manager.add_new_request(ret)
                elif isinstance(ret, dict):
                    # 6. 如果是解析的數據,則將數據進行存儲
                    self.data_memory.store_data(ret)
                else:
                    raise Exception(‘返回數據類型不正確,只能返回Request的實例對象或字典的實例對象‘)

    def run(self):
        url_list = self.filter_downloaded_urls()
        request_lists = (Request(url, self.parser.parse) for url in url_list)
        with ThreadPoolExecutor(max_workers=os.cpu_count()) as pool:
            pool.map(self.start, request_lists)


def timeit(func):
    """ 裝飾器: 計算函數運行時間 """
    @functools.wraps(func)
    def inner(*args, **kwargs):
        start = time.time()
        ret = func(*args, **kwargs)
        print(time.time() - start)
        return ret
    return inner


@timeit
def main():
    urls = (‘https://mp.weixin.qq.com‘ for _ in range(10))
    crawl = Crawler(urls=urls)
    crawl.run()


if __name__ == ‘__main__‘:
    main()
技術分享圖片
https://mp.weixin.qq.com
https://mp.weixin.qq.com
https://mp.weixin.qq.com
https://mp.weixin.qq.com
parse--- <Response [200]>
http://www.sxmu.edu.cn/
parse--- <Response [200]>
parse--- <Response [200]>
http://www.sxmu.edu.cn/
http://www.sxmu.edu.cn/
parse--- <Response [200]>
http://www.sxmu.edu.cn/
next parse http://www.sxmu.edu.cn/
store data
https://mp.weixin.qq.com
next parse http://www.sxmu.edu.cn/
store data
https://mp.weixin.qq.com
next parse http://www.sxmu.edu.cn/
store data
https://mp.weixin.qq.com
next parse http://www.sxmu.edu.cn/
store data
https://mp.weixin.qq.com
parse--- <Response [200]>
http://www.sxmu.edu.cn/
parse--- <Response [200]>
http://www.sxmu.edu.cn/
parse--- <Response [200]>
http://www.sxmu.edu.cn/
parse--- <Response [200]>
http://www.sxmu.edu.cn/
next parse http://www.sxmu.edu.cn/
store data
https://mp.weixin.qq.com
next parse http://www.sxmu.edu.cn/
store data
https://mp.weixin.qq.com
next parse http://www.sxmu.edu.cn/
store data
next parse http://www.sxmu.edu.cn/
store data
parse--- <Response [200]>
http://www.sxmu.edu.cn/
parse--- <Response [200]>
http://www.sxmu.edu.cn/
next parse http://www.sxmu.edu.cn/
store data
next parse http://www.sxmu.edu.cn/
store data
2.2576050758361816
運行結果

 

自定義爬蟲架構之多線程爬蟲與異步爬蟲