1. 程式人生 > >【Python爬蟲3】在下載的本地快取做爬蟲

【Python爬蟲3】在下載的本地快取做爬蟲

下載快取

上篇文章,我們學習瞭如何提取網頁中的資料,以及將提取結果存到表格中。如果我們還想提取另一欄位,則需要重新再下載整個網頁,這對我們這個小型的示例網站問題不大,但對於數百萬個網頁的網站而言來說就要消耗幾個星期的時間。所以,我們可以先對網頁進行快取,就使得每個網頁只下載一次。

1為連結爬蟲新增快取支援

  • 我們將downloader重構一類,這樣引數只需在構造方法中設定一次,就能在後續多次複用,在URL下載之前進行快取檢查,並把限速功能移到函式內部。
  • 在Downloader類的__call__特殊方法實現了下載前先檢查快取,如果已經定義該URL快取則再檢查下載中是否遇到了服務端錯誤,如果都沒問題表明快取結果可用,否則都需要正常下載該URL存到快取中。
  • downloader方法返回添加了HTTP狀態碼,以便快取中儲存錯誤機校驗。如果不需要限速或快取的話,你可以直接呼叫該方法,這樣就不會通過__call__方法呼叫了。

class Downloader:
    def __init__(self, delay=5, user_agent='Wu_Being', proxies=None, num_retries=1, cache=None):
        self.throttle = Throttle(delay)
        self.user_agent = user_agent
        self.proxies =
proxies self.num_retries = num_retries self.cache = cache def __call__(self, url): result = None if self.cache: try: result = self.cache[url] except KeyError: # url is not available in cache pass
else: if self.num_retries > 0 and 500 <= result['code'] < 600: # server error so ignore result from cache and re-download result = None if result is None: # result was not loaded from cache so still need to download self.throttle.wait(url) proxy = random.choice(self.proxies) if self.proxies else None headers = {'User-agent': self.user_agent} result = self.download(url, headers, proxy=proxy, num_retries=self.num_retries) if self.cache: # save result to cache self.cache[url] = result return result['html'] def download(self, url, headers, proxy, num_retries, data=None): print 'Downloading:', url ... return {'html': html, 'code': code} class Throttle: def __init__(self, delay): ... def wait(self, url): ...

為了支援快取功能,連結爬蟲程式碼也需用一些微調,包括新增cache引數、移除限速以及將download函式替換為新的類。

from downloader import Downloader

def link_crawler(... cache=None):
    crawl_queue = [seed_url]
    seen = {seed_url: 0}
    # track how many URL's have been downloaded
    num_urls = 0
    rp = get_robots(seed_url)
    #cache.clear()			###############################
    D = Downloader(delay=delay, user_agent=user_agent, proxies=proxies, num_retries=num_retries, cache=cache)

    while crawl_queue:
        url = crawl_queue.pop()
        depth = seen[url]
        # check url passes robots.txt restrictions
        if rp.can_fetch(user_agent, url):
            html = D(url)				###def __call__(self, url):
            links = []
	...

def normalize(seed_url, link):
	...
def same_domain(url1, url2):
	...
def get_robots(url):
	...
def get_links(html):
	...
"""
if __name__ == '__main__':
    link_crawler('http://example.webscraping.com', '/(index|view)', delay=0, num_retries=1, user_agent='BadCrawler')
    link_crawler('http://example.webscraping.com', '/(index|view)', delay=0, num_retries=1, max_depth=1, user_agent='GoodCrawler')
"""

現在,這個支援快取的網路爬蟲的基本架構已經準備好了,下面就要開始構建實際的快取功能了。

2磁碟快取

作業系統 檔案系統 非法檔名字元 檔名最大長度
Linux Ext3/Ext4 /\0 255個位元組
OS X HFS Plus :\0 255個UTF-16編碼單元
Windows NTFS \/?:*"><| 255個位元組

為了保證在不同檔案系統中,我們的檔案路徑都是安全的,就需要把除數字、字母和基本符號的其他字元替換為下劃線。

>>> import re
>>> url="http://example.webscraping.com/default/view/australia-1"
>>> re.sub('[^/0-9a-zA-Z\-,.;_ ]','_',url)
'http_//example.webscraping.com/default/view/australia-1'

此外,檔名及其目錄長度需要限制在255個字元以內。

>>> filename=re.sub('[^/0-9a-zA-Z\-,.;_ ]','_',url)
>>> filename='/'.join(segment[:255] for segment in filename.split('/'))
>>> print filename
http_//example.webscraping.com/default/view/australia-1
>>> print '#'.join(segment[:5] for segment in filename.split('/'))
http_##examp#defau#view#austr
>>> 

還有一種邊界情況,就是URL以斜槓結尾。這樣分割URL後就會造成一個非法的檔名。例如:

對於第一個URL可以在後面新增index.html作為檔名,所以可以把index作為目錄名,1為子目錄名,index.html為檔名。

>>> import urlparse
>>> components=urlparse.urlsplit('http://exmaple.scraping.com/index/')
>>> print components
SplitResult(scheme='http', netloc='exmaple.scraping.com', path='/index/', query='', fragment='')
>>> print components.path
/index/
>>> path=components.path
>>> if not path:
...     path='/index.html'
... elif path.endswith('/'):
...     path+='index.html'
... 
>>> filename=components.netloc+path+components.query
>>> filename
'exmaple.scraping.com/index/index.html'
>>> 

2.1用磁碟快取的實現

現在可以把URL到目錄和檔名完整對映邏輯結合起來,就形成了磁碟快取的主要部分。該構造方法傳入了用於設定快取位置的引數,然後在url_to_path方法中應用了前面討論的檔名限制。

from link_crawler import link_crawler

class DiskCache:

    def __init__(self, cache_dir='cache', ...):
        """
        cache_dir: the root level folder for the cache
        """
        self.cache_dir = cache_dir
	...

    def url_to_path(self, url):
        """Create file system path for this URL
        """
        components = urlparse.urlsplit(url)
        # when empty path set to /index.html
        path = components.path
        if not path:
            path = '/index.html'
        elif path.endswith('/'):
            path += 'index.html'
        filename = components.netloc + path + components.query
        # replace invalid characters
        filename = re.sub('[^/0-9a-zA-Z\-.,;_ ]', '_', filename)
        # restrict maximum number of characters
        filename = '/'.join(segment[:255] for segment in filename.split('/'))
        return os.path.join(self.cache_dir, filename) #拼接當前目錄和檔名為完整目錄
    
    def __getitem__(self, url):
        ...
    def __setitem__(self, url, result):
        ...
    def __delitem__(self, url):
        ...
    def has_expired(self, timestamp):
        ...
    def clear(self):
	...

if __name__ == '__main__':
    link_crawler('http://example.webscraping.com/', '/(index|view)', cache=DiskCache())

現在我們還缺少根據檔名存取資料的方法,就是Downloader類result=cache[url]cache[url]=result的介面方法:__getitem__()__setitem__()兩個特殊方法。

import pickle

class DiskCache:

    def __init__(self, cache_dir='cache', expires=timedelta(days=30), compress=True):
	...    
    def url_to_path(self, url):
	...
    def __getitem__(self, url):
	...
    def __setitem__(self, url, result):
        """Save data to disk for this url
        """
        path = self.url_to_path(url)
        folder = os.path.dirname(path)
        if not os.path.exists(folder):
            os.makedirs(folder)
        with open(path, 'wb') as fp:
            fp.write(pickle.dumps(result))

__setitem__()中,我們使用url_to_path()方法將URL對映為安全檔名,在必要情況下還需要建立目錄。這裡使用的pickle模組會把輸入轉化為字串(序列化),然後儲存到磁碟中。

import pickle

class DiskCache:

    def __init__(self, cache_dir='cache', expires=timedelta(days=30), compress=True):
	...    
    def url_to_path(self, url):
	...
    def __getitem__(self, url):
        """Load data from disk for this URL
        """
        path = self.url_to_path(url)
        if os.path.exists(path):
            with open(path, 'rb') as fp:
                return pickle.loads(fp.read())
        else:
            # URL has not yet been cached
            raise KeyError(url + ' does not exist')

    def __setitem__(self, url, result):
	...

__getitem__()中,還是先用url_to_path()方法將URL對映為安全檔名。然後檢查檔案是否存在,如果存在則載入內容,並執行反序列化,恢復其原始資料型別;如果不存在,則說明快取中還沒有該URL的資料,此時會丟擲KeyError異常。

2.2快取測試

可以在python命令前加time計時。我們可以發現,如果是在本地伺服器的網站,當快取為空時爬蟲實際耗時0m58.710s,第二次執行全部從快取讀取花了0m0.221s,快了265多倍。如果是爬取遠端伺服器的網站的資料時,將會耗更多時間。

[email protected]:~/GitHub/WebScrapingWithPython/3.下載快取$ time python 2disk_cache_Nozip127.py 
Downloading: http://127.0.0.1:8000/places/
Downloading: http://127.0.0.1:8000/places/default/index/1
...
Downloading: http://127.0.0.1:8000/places/default/view/Afghanistan-1
real	0m58.710s
user	0m0.684s
sys	0m0.120s
[email protected]:~/GitHub/WebScrapingWithPython/3.下載快取$ time python 2disk_cache_Nozip127.py 

real	0m0.221s
user	0m0.204s
sys	0m0.012s

2.3節省磁碟空間

為節省快取佔用空間,我們可以對下載的HTML檔案進行壓縮處理,使用zlib壓縮序列化字串即可。

fp.write(zlib.compress(pickle.dumps(result)))

從磁碟載入後解壓的程式碼如下:

return pickle.loads(zlib.decompress(fp.read()))

壓縮所有網頁之後,快取佔用大小2.8 MB下降到821.2 KB,耗時略有增加。

[email protected]:~/GitHub/WebScrapingWithPython/3.下載快取$ time python 2disk_cache.py 
Downloading: http://127.0.0.1:8000/places/
Downloading: http://127.0.0.1:8000/places/default/index/1
...
Downloading: http://127.0.0.1:8000/places/default/view/Afghanistan-1

real	1m0.011s
user	0m0.800s
sys	0m0.104s
[email protected]:~/GitHub/WebScrapingWithPython/3.下載快取$ 
[email protected]:~/GitHub/WebScrapingWithPython/3.下載快取$ time python 2disk_cache.py 

real	0m0.252s
user	0m0.228s
sys	0m0.020s
[email protected]:~/GitHub/WebScrapingWithPython/3.下載快取$ 

2.4清理過期資料

本節中,我們將為快取資料新增過期時間,以便爬蟲知道何時需要重新下載網頁。在構造方法中,我們使用timedelta物件將預設過期時間設定為30天,在__set__方法中把當前時間戳儲存在序列化資料中,在__get__方法中對比當前時間和快取時間,檢查是否過期。

from datetime import datetime, timedelta

class DiskCache:

    def __init__(self, cache_dir='cache', expires=timedelta(days=30), compress=True):
        """
        cache_dir: the root level folder for the cache
        expires: timedelta of amount of time before a cache entry is considered expired
        compress: whether to compress data in the cache
        """
        self.cache_dir = cache_dir
        self.expires = expires
        self.compress = compress

    def __getitem__(self, url):
        """Load data from disk for this URL
        """
        path = self.url_to_path(url)
        if os.path.exists(path):
            with open(path, 'rb') as fp:
                data = fp.read()
                if self.compress:
                    data = zlib.decompress(data)
                result, timestamp = pickle.loads(data)
                if self.has_expired(timestamp):
                    raise KeyError(url + ' has expired')
                return result
        else:
            # URL has not yet been cached
            raise KeyError(url + ' does not exist')

    def __setitem__(self, url, result):
        """Save data to disk for this url
        """
        path = self.url_to_path(url)
        folder = os.path.dirname(path)
        if not os.path.exists(folder):
            os.makedirs(folder)

        data = pickle.dumps((result, datetime.utcnow()))
        if self.compress:
            data = zlib.compress(data)
        with open(path, 'wb') as fp:
            fp.write(data)

	...
    def has_expired(self, timestamp):
        """Return whether this timestamp has expired
        """
        return datetime.utcnow() > timestamp + self.expires

為了測試時間功能,我們可以將其縮短為5秒,如下操作:

    """
    Dictionary interface that stores cached 
    values in the file system rather than in memory.
    The file path is formed from an md5 hash of the key.
    """
>>> from disk_cache import DiskCache
>>> cache=DiskCache()
>>> url='http://www.baidu.com'
>>> result={'html':'<html>...','code':200}
>>> cache[url]=result
>>> cache[url]
{'code': 200, 'html': '<html>...'}
>>> cache[url]['html']==result['html']
True
>>> 
>>> from datetime import timedelta
>>> cache2=DiskCache(expires=timedelta(seconds=5))
>>> url2='https://www.baidu.sss'
>>> result2={'html':'<html>..ss.','code':500}
>>> cache2[url2]=result2
>>> cache2[url2]
{'code': 200, 'html': '<html>...'}
>>> cache2[url2]
{'code': 200, 'html': '<html>...'}
>>> cache2[url2]
{'code': 200, 'html': '<html>...'}
>>> cache2[url2]
{'code': 200, 'html': '<html>...'}
>>> cache2[url2]
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "disk_cache.py", line 57, in __getitem__
    raise KeyError(url + ' has expired')
KeyError: 'http://www.baidu.com has expired'
>>> cache2.clear()

2.5用磁碟快取的缺點

由於受制於檔案系統的限制,之前我們將URL對映為安全檔名,然而這樣又會引發一些問題:

  • 有些URL會被對映為相同的檔名。比如URL:.../count.asp?a+b,.../count.asp?a*b
  • URL截斷255個字元的檔名也可能相同。因為URL可以超過2000下字元。

使用URL雜湊值為檔名可以帶來一定的改善。這樣也有一些問題:

  • 每個卷和每個目錄下的檔案數量是有限制的。FAT32檔案系統每個目錄的最大檔案數65535,但可以分割到不同目錄下。
  • 檔案系統可儲存的檔案總數也是有限的。ext4分割槽目前支援略多於1500萬個檔案,而一個大型網站往往擁有超過1億個網頁。

要想避免這些問題,我們需要把多個快取網頁合併到一個檔案中,並使用類似B+樹的演算法進行索引。但我們不會自己實現這種演算法,而是在下一節中介紹已實現這類演算法的資料庫。

3資料庫快取

爬取時,我們可能需要快取大量資料,但又無須任何複雜的連線操作,因此我們將選用NoSQL資料庫,這種資料庫比傳統的關係型資料庫更容易擴充套件。在本節中,我們將選用目前非常流行的MongoDB作為快取資料庫。

3.1NoSQL是什麼

NoSQL全稱為Not Only SQL,是一種相對較新的資料庫設計方式。傳統的關係模型使用是固定模式,並將資料分割到各個表中。然而,對於大資料集的情況,資料量太大使其難以存放在單一伺服器中,此時就需要擴充套件到多臺伺服器。不過,關係模型對於這種擴充套件的支援並不夠好,因為在查詢多個表時,資料可能在不同的伺服器中。相反,NoSQL資料庫通常是無模式的,從設計之初就考慮了跨伺服器無縫分片的問題。在NoSQL中,有多種方式可以實現該目標,分別是:
  • 列資料儲存(如HBase);
  • 鍵值對儲存(如Redis);
  • 圖形資料庫(如Neo4j);
  • 面向文件的資料庫(如MongoDB)。

3.2安裝MongoDB

MongoDB可以從https://www.mongodb.org/downloads 下載。然後安裝其Python封裝庫:

pip install pymongo

檢測安裝是否成功,在本地啟動MongoDB伺服器:

[email protected]:~/GitHub/WebScrapingWithPython/3.下載快取$ mongod -dbpath MongoD
2017-01-17T21:20:46.224+0800 [initandlisten] MongoDB starting : pid=1978 port=27017 dbpath=MongoD 64-bit host=ubuntukylin64
2017-01-17T21:20:46.224+0800 [initandlisten] db version v2.6.10
2017-01-17T21:20:46.224+0800 [initandlisten] git version: nogitversion
2017-01-17T21:20:46.225+0800 [initandlisten] OpenSSL version: OpenSSL 1.0.2g  1 Mar 2016
2017-01-17T21:20:46.225+0800 [initandlisten] build info: Linux lgw01-12 3.19.0-25-generic #26~14.04.1-Ubuntu SMP Fri Jul 24 21:16:20 UTC 2015 x86_64 BOOST_LIB_VERSION=1_58
2017-01-17T21:20:46.225+0800 [initandlisten] allocator: tcmalloc
2017-01-17T21:20:46.225+0800 [initandlisten] options: { storage: { dbPath: "MongoD" } }
2017-01-17T21:20:46.269+0800 [initandlisten] journal dir=MongoD/journal
2017-01-17T21:20:46.270+0800 [initandlisten] recover : no journal files present, no recovery needed
2017-01-17T21:20:49.126+0800 [initandlisten] preallocateIsFaster=true 33.72
2017-01-17T21:20:51.932+0800 [initandlisten] preallocateIsFaster=true 32.7
2017-01-17T21:20:55.729+0800 [initandlisten] preallocateIsFaster=true 32.36
2017-01-17T21:20:55.730+0800 [initandlisten] preallocateIsFaster check took 9.459 secs
2017-01-17T21:20:55.730+0800 [initandlisten] preallocating a journal file MongoD/journal/prealloc.0
2017-01-17T21:20:58.042+0800 [initandlisten] 		File Preallocator Progress: 608174080/1073741824	56%
2017-01-17T21:21:03.290+0800 [initandlisten] 		File Preallocator Progress: 744488960/1073741824	69%
2017-01-17T21:21:08.043+0800 [initandlisten] 		File Preallocator Progress: 954204160/1073741824	88%
2017-01-17T21:21:18.347+0800 [initandlisten] preallocating a journal file MongoD/journal/prealloc.1
2017-01-17T21:21:21.166+0800 [initandlisten] 		File Preallocator Progress: 639631360/1073741824	59%
2017-01-17T21:21:26.328+0800 [initandlisten] 		File Preallocator Progress: 754974720/1073741824	70%
...

然後,在Python中,使用MongoDB的預設埠嘗試連線MongoDB:

>>> from pymongo import MongoClient
>>> client=MongoClient('localhost',27017)

3.3MongoDB概述

下面是MongoDB示例程式碼:

>>> from pymongo import MongoClient
>>> client=MongoClient('localhost',27017)
>>> url='http://www.baidu.com/view/China-47'
>>> html='...<html>...'
>>> db=client.cache
>>> db.webpage.insert({'url':url,'html':html})
ObjectId('587e2cb26b00c10b956e0be9')
>>> db