1. 程式人生 > >(三)爬蟲之新增快取

(三)爬蟲之新增快取

  對於爬取下來的網頁進行快取,可以方便下次訪問,無需再次下載。實現過程可以分為兩大塊,一是實現下載器,二是實現快取器。

1. 下載器:

  下載器功能:下載時先檢視快取中是否儲存url對應的檔案,不存在時再去訪問網頁端,並將結果加入到快取中,下載器程式碼如下:

class Downloader(object):
    def __init__(self,user_agent=None, proxies=None, num_retries=3,delay=5,cache=None):
        self.user_agent = user_agent
        self.proxies 
= proxies self.num_retries=num_retries self.throttle = Throttle(delay) self.cache = cache def __call__(self, url): result = None try: result = self.cache[url] #從快取中獲取結果 except KeyError: pass else: if self.num_retries>0 and
500<=result['code']<None: result = None if result==None: self.throttle.wait(url) response = self.download(url,self.user_agent,self.proxies,self.num_retries) result={'html':response.text,'code':response.status_code} if self.cache: self.cache[url]
=result #將結果儲存到快取 return result['html'] def download(self,url, user_agent, proxies, num_retries): response = requests.get(url, headers={'User-Agent': user_agent}, proxies=proxies) if response.status_code and 500 <= response.status_code < 600: # 出現伺服器端錯誤時重試三次 if num_retries > 0: response = self.download(url, user_agent, proxies, num_retries - 1) return response #同一個域名的下載延遲 class Throttle(object): def __init__(self,delay): self.delay = delay self.domains={} def wait(self,url): domain = urlparse.urlparse(url).netloc #提取網址的域名 last_accessed = self.domains.get(domain) if self.delay>0 and last_accessed!=None: sleep_secs = self.delay-(datetime.now()-last_accessed).seconds if sleep_secs>0: time.sleep(sleep_secs) self.domains[domain]=datetime.now()

  在爬取網頁時,使用下載器程式碼如下:

def link_carwl(start_url,link_regex,max_depth=5,callback=None,user_agent=None, proxies=None, num_retries=3,delay=5,cache=None):
    url_queue = [start_url]
    seen = {start_url:0}
    down = Downloader(user_agent=user_agent, proxies=proxies, num_retries=num_retries,delay=delay,cache=cache)
    while url_queue:
        url = url_queue.pop()
        html = down(url)
        if callback!=None:
            callback(url,html)
        depth = seen[url]
        if depth<max_depth:
            for link in get_links(html):
                if re.match(link_regex,link):
                    #urlparse.urljoin(url,link)  #link可能為相對路徑
                    if link not in seen:   #不訪問重複的url
                        seen[link] =depth+1  #在url的深度基礎上加一
                        url_queue.append(link)
# url提取
def get_links(html):
    webpage_regex = re.compile('<a[^>]+href=["\'](.*?)["\']', re.IGNORECASE)  # ["\']匹配單引號或雙引號
    return webpage_regex.findall(html)

2. 快取器:

  2.1 磁碟快取

    在上述下載器的程式碼中,快取器可以像字典一樣,以url為鍵,網頁內容為值,進行存取,因此快取器應該實現__getitem__()和__setitem__()方法。另外,快取還應該設定有效期,並對網頁內容進行壓縮,減少所需磁碟空間。

    思路: 對url進行hash計算,將結果做為檔名,然後將內容寫入檔案或從檔案讀出。實現程式碼如下:

class DiskCache(object):
    def __init__(self,cache_dir='cache',expires=timedelta(days=30)):
        self.cache_dir = cache_dir
        self.expires = expires  #快取有效期30天
     if not os.path.exists(self.cache_dir):
  os.makedirs(self.cache_dir) def url_to_path(self,url): #對url進行hash摘要計算,以其為檔名 h = hashlib.md5() h.update(url) return h.hexdigest() def __getitem__(self, url): path = os.path.join(self.cache_dir, self.url_to_path(url)) if os.path.exists(path): with open(path,'rb') as f: result,timestamp = pickle.loads(zlib.decompress(f.read())) if datetime.utcnow()>timestamp+self.expires: #判斷快取是否過期 raise KeyError(url+'has expired!') return result else: raise KeyError(url+'does not exist!') def __setitem__(self, url, result): path = os.path.join(self.cache_dir,self.url_to_path(url)) timestamp = datetime.utcnow() data = pickle.dumps((result,timestamp)) #加上時間戳,判斷快取是否過期 with open(path,'wb') as f: f.write(zlib.compress(data)) #壓縮,減少記憶體

 使用下載器和磁碟快取器的完整程式碼如下: 

#coding:utf-8

# 為爬蟲新增快取

import requests
import re
import urlparse
from datetime import datetime,timedelta
import time
import hashlib
import os
import pickle
import zlib
from pymongo import MongoClient
from bson.binary import Binary



class Downloader(object):
    def __init__(self,user_agent=None, proxies=None, num_retries=3,delay=5,cache=None):
        self.user_agent = user_agent
        self.proxies = proxies
        self.num_retries=num_retries
        self.throttle = Throttle(delay)
        self.cache = cache

    def __call__(self, url):
        result = None
        try:
            result = self.cache[url]  #從快取中獲取結果
        except KeyError:
            pass
        else:
            if self.num_retries>0 and 500<=result['code']<None:
                result = None
        if result==None:
            self.throttle.wait(url)
            response = self.download(url,self.user_agent,self.proxies,self.num_retries)
            result={'html':response.text,'code':response.status_code}
            if self.cache:
                self.cache[url]=result   #將結果儲存到快取
        return result['html']

    def download(self,url, user_agent, proxies, num_retries):
        response = requests.get(url, headers={'User-Agent': user_agent}, proxies=proxies)
        if response.status_code and 500 <= response.status_code < 600:  # 出現伺服器端錯誤時重試三次
            if num_retries > 0:
                response = self.download(url, user_agent, proxies, num_retries - 1)
        return response

#同一個域名的下載延遲
class Throttle(object):
    def __init__(self,delay):
        self.delay = delay
        self.domains={}

    def wait(self,url):
        domain = urlparse.urlparse(url).netloc  #提取網址的域名
        last_accessed = self.domains.get(domain)
        if self.delay>0 and last_accessed!=None:
            sleep_secs = self.delay-(datetime.now()-last_accessed).seconds
            if sleep_secs>0:
                time.sleep(sleep_secs)
        self.domains[domain]=datetime.now()

class DiskCache(object):
    def __init__(self,cache_dir='cache',expires=timedelta(days=30)):
        self.cache_dir = cache_dir
        self.expires = expires  #快取有效期30天
        if not os.path.exists(self.cache_dir):
            os.makedirs(self.cache_dir)

    def url_to_path(self,url):  #對url進行hash摘要計算,以其為檔名
        h = hashlib.md5()
        h.update(url)
        return h.hexdigest()

    def __getitem__(self, url):
        path = os.path.join(self.cache_dir, self.url_to_path(url))
        if os.path.exists(path):
            with open(path,'rb') as f:
                result,timestamp = pickle.loads(zlib.decompress(f.read()))
            if datetime.utcnow()>timestamp+self.expires:  #判斷快取是否過期
                raise KeyError(url+'has expired!')
            return result
        else:
            raise KeyError(url+'does not exist!')

    def __setitem__(self, url, result):
        path = os.path.join(self.cache_dir,self.url_to_path(url))
        timestamp = datetime.utcnow()
        data = pickle.dumps((result,timestamp))  #加上時間戳,判斷快取是否過期
        with open(path,'wb') as f:
            f.write(zlib.compress(data))  #壓縮,減少記憶體

def link_carwl(start_url,link_regex,max_depth=5,callback=None,user_agent=None, proxies=None, num_retries=3,delay=5,cache=None):
    url_queue = [start_url]
    seen = {start_url:0}
    down = Downloader(user_agent=user_agent, proxies=proxies, num_retries=num_retries,delay=delay,cache=cache)
    while url_queue:
        url = url_queue.pop()
        html = down(url)
        if callback!=None:
            callback(url,html)
        depth = seen[url]
        if depth<max_depth:
            for link in get_links(html):
                if re.match(link_regex,link):
                    #urlparse.urljoin(url,link)  #link可能為相對路徑
                    if link not in seen:   #不訪問重複的url
                        seen[link] =depth+1  #在url的深度基礎上加一
                        url_queue.append(link)
# url提取
def get_links(html):
    webpage_regex = re.compile('<a[^>]+href=["\'](.*?)["\']', re.IGNORECASE)  # ["\']匹配單引號或雙引號
    return webpage_regex.findall(html)

if __name__ == '__main__':
    link_carwl('https://nj.lianjia.com/ershoufang/',r'https://nj.lianjia.com/ershoufang/.*',max_depth=1,cache=DiskCache())
    d =Downloader(cache=DiskCache())
    print d.cache['https://nj.lianjia.com/ershoufang/']['html']
View Code

  2.2 MongoDB快取

  使用磁碟快取時,每個資料夾下儲存的檔案數量有限(FAT32為65535個),而且對於過期的快取資料需要手動管理。使用資料庫快取,能儲存的檔案數量更多,且能自動清除過期資料,實現起來也較為簡單。還是以url為鍵,網頁為值進行儲存,實現程式碼如下:

from pymongo import MongoClient
from bson.binary import Binary

class MongoCache(object):
    def __init__(self,client=None,expires=timedelta(days=30)):
        self.client =MongoClient('127.0.0.1',27017) if client is None else client
        self.db = self.client.cache #連線cache資料庫,沒有則建立
        self.collection = self.db.webpage #webpage集合,沒有則建立(集合相當於表)
        self.collection.create_index('timestamp',expireAfterSeconds=expires.total_seconds())

    def __getitem__(self, url):
        record = self.collection.find_one({'_id':url})
        if record:
            return pickle.loads(zlib.decompress(record['result']))
        else:
            raise KeyError(url + 'does not exist!')
    def __setitem__(self, url, result):
        record={'result':Binary(zlib.compress(pickle.dumps(result))),'timestamp':datetime.utcnow()}
        #mongoDB 儲存檔案,將資料轉化為二進位制再儲存
        self.collection.update({'_id':url},{'$set':record},upsert=True)

  使用下載器和MongoDB快取器的完整程式碼如下:  

#coding:utf-8

# 為爬蟲新增快取

import requests
import re
import urlparse
from datetime import datetime,timedelta
import time
import hashlib
import os
import pickle
import zlib
from pymongo import MongoClient
from bson.binary import Binary



class Downloader(object):
    def __init__(self,user_agent=None, proxies=None, num_retries=3,delay=5,cache=None):
        self.user_agent = user_agent
        self.proxies = proxies
        self.num_retries=num_retries
        self.throttle = Throttle(delay)
        self.cache = cache

    def __call__(self, url):
        result = None
        try:
            result = self.cache[url]  #從快取中獲取結果
        except KeyError:
            pass
        else:
            if self.num_retries>0 and 500<=result['code']<None:
                result = None
        if result==None:
            self.throttle.wait(url)
            response = self.download(url,self.user_agent,self.proxies,self.num_retries)
            result={'html':response.text,'code':response.status_code}
            if self.cache:
                self.cache[url]=result   #將結果儲存到快取
        return result['html']

    def download(self,url, user_agent, proxies, num_retries):
        response = requests.get(url, headers={'User-Agent': user_agent}, proxies=proxies)
        if response.status_code and 500 <= response.status_code < 600:  # 出現伺服器端錯誤時重試三次
            if num_retries > 0:
                response = self.download(url, user_agent, proxies, num_retries - 1)
        return response

#同一個域名的下載延遲
class Throttle(object):
    def __init__(self,delay):
        self.delay = delay
        self.domains={}

    def wait(self,url):
        domain = urlparse.urlparse(url).netloc  #提取網址的域名
        last_accessed = self.domains.get(domain)
        if self.delay>0 and last_accessed!=None:
            sleep_secs = self.delay-(datetime.now()-last_accessed).seconds
            if sleep_secs>0:
                time.sleep(sleep_secs)
        self.domains[domain]=datetime.now()

class MongoCache(object):
    def __init__(self,client=None,expires=timedelta(days=30)):
        self.client =MongoClient('127.0.0.1',27017) if client is None else client
        self.db = self.client.cache #連線cache資料庫,沒有則建立
        self.collection = self.db.webpage #webpage集合,沒有則建立(集合相當於表)
        self.collection.create_index('timestamp',expireAfterSeconds=expires.total_seconds())

    def __getitem__(self, url):
        record = self.collection.find_one({'_id':url})
        if record:
            return pickle.loads(zlib.decompress(record['result']))
        else:
            raise KeyError(url + 'does not exist!')
    def __setitem__(self, url, result):
        record={'result':Binary(zlib.compress(pickle.dumps(result))),'timestamp':datetime.utcnow()}
        #mongoDB 儲存檔案,將資料轉化為二進位制再儲存
        self.collection.update({'_id':url},{'$set':record},upsert=True)

def link_carwl(start_url,link_regex,max_depth=5,callback=None,user_agent=None, proxies=None, num_retries=3,delay=5,cache=None):
    url_queue = [start_url]
    seen = {start_url:0}
    down = Downloader(user_agent=user_agent, proxies=proxies, num_retries=num_retries,delay=delay,cache=cache)
    while url_queue:
        url = url_queue.pop()
        html = down(url)
        if callback!=None:
            callback(url,html)
        depth = seen[url]
        if depth<max_depth:
            for link in get_links(html):
                if re.match(link_regex,link):
                    #urlparse.urljoin(url,link)  #link可能為相對路徑
                    if link not in seen:   #不訪問重複的url
                        seen[link] =depth+1  #在url的深度基礎上加一
                        url_queue.append(link)
# url提取
def get_links(html):
    webpage_regex = re.compile('<a[^>]+href=["\'](.*?)["\']', re.IGNORECASE)  # ["\']匹配單引號或雙引號
    return webpage_regex.findall(html)

if __name__ == '__main__':
    link_carwl('https://nj.lianjia.com/ershoufang/',r'https://nj.lianjia.com/ershoufang/.*',max_depth=1,cache=MongoCache())
    d = Downloader(cache=MongoCache())
    print d.cache['https://nj.lianjia.com/ershoufang/']['html']
View Code