1. 程式人生 > >爬蟲工程師進階(八):去重與入庫

爬蟲工程師進階(八):去重與入庫

資料去重又稱重複資料刪除,是指在一個數字檔案集合中,找出重複的資料並將其刪除,只儲存唯一的資料單元。資料去重可以有效避免資源的浪費,所以資料去重至關重要。

資料去重


資料去重可以從兩個節點入手:一個是URL去重。即直接篩選掉重複的URL;另一個是資料庫去重。即利用資料庫的一些特性篩選重複的資料。

URL去重

為什麼需要進行URL去重?
  1. 在爬蟲啟動工作的過程中,我們不希望同一個網頁被多次下載,因為重複下載不僅會浪費CPU機時,還會為搜尋引擎系統增加負荷。而想要控制這種重複性下載問題,就要考慮下載所依據的超連結,只要能夠控制待下載的URL不重複,基本可以解決同一個網頁重複下載的問題。
  2. 對於已經抓取過的連結,進行持久化,並且在啟動的時候載入進入去重佇列,是一個比較強的需求。 
    它主要應對爬蟲故障重跑,不需要重跑所有連結
如何確定去重強度?

根據爬取週期確定使用去重強度 
抓取週期在一個小時內,不需要對已經抓取的連結做持久化 
抓取週期在一天內(或抓取的資料總量30w以下),需要對抓取連結做一個相對簡單的持久化 
抓取週期在一天以上,需要對抓取連結做相對專業的持久化

URL去重方法:兩種解決方法
  • 已經造好的輪子系列:

    • scrapy-deltafetch
    • scrapy-crawl-once
    • scrapy-redis
    • scrapy-redis-bloomfilter
  • 自己造輪子:

    • 自己寫的init_add_request方法 
      可以輕量實現

scrapy-deltafetch通過Berkeley DB來記錄爬蟲每次爬取收集的request和item,當重複執行爬蟲時只爬取新的item,實現增量去重,提高爬蟲爬取效能。

scrapy-deltafetch是依賴於Berkeley DB的,所以必須先安裝bsddb3。選擇使用LFD,這個網址包含幾乎所有在Windows上安裝容易出錯的庫,使用命令pip install 檔案路徑+檔名

安裝

pip install scrapy-deltafetch

配置

  1. settings.py檔案中爬蟲中介軟體部分新增DeltaFetch middleware

SPIDER_MIDDLEWARES = { 
‘scrapy_deltafetch.DeltaFetch’: 100, 

這裡,優先順序100只是一個例子。設定它的值取決於其他中介軟體的優先順序。

  1. 為了讓DeltaFetch可以使用,要在settings.py檔案中把DELTAFETCH_ENABLED設為TRUE,

DELTAFETCH_ENABLED = True

引數設定: 
DELTAFETCH_ENABLED— 是否啟用這個擴充套件件 
DELTAFETCH_DIR— 儲存狀態目錄 
DELTAFETCH_RESET — 重置資料庫,清除資料庫所有請求

核心程式碼:

def process_spider_output(self, response, result, spider): for r in result: if isinstance(r, Request): #對結果進行分析,如果是url,繼續下一步,否則跳過 key = self._get_key(r) #通過_get_key函式生成key if key in self.db: #檢視key是否在資料庫中 logger.info("Ignoring already visited: %s" % r) #如果在資料庫,就拋棄 if self.stats: self.stats.inc_value('deltafetch/skipped', spider=spider) continue elif isinstance(r, (BaseItem, dict)): #對結果分析,如果是dict or item ,繼續下一步 key = self._get_key(response.request) self.db[key] = str(time.time()) #對url進行持久化操作 if self.stats: self.stats.inc_value('deltafetch/stored', spider=spider) yield r def _get_key(self, request): #key值的生成,要麼是request.meta傳過來,要麼使用指紋演算法生成 key = request.meta.get('deltafetch_key') or request_fingerprint(request) return to_bytes(key)

scrapy原去重方法 
這個處理的程式碼是編寫在 dupefilter.py 檔案中的,其中定義了處理重複 url 的方法。 
具體步驟如下:

  1. 通過RFPDupeFilter() 類初始化一個名為fingerprints的set()集合
  2. 當request傳入進來,通過指紋計算(也就是用請求方式和url對其進行雜湊),得到唯一key值
  3. 在已抓取 url 集合中查詢,如果不存在,就新增進去,如果需要寫入檔案,就寫入檔案;如果已經存在了,告訴上層呼叫 url 已經抓取過了。

主要程式碼如下:

def request_seen(self, request): fp = self.request_fingerprint(request) #指紋計算,得到唯一key值 if fp in self.fingerprints: #在已抓取 url 集合中查詢,如果不存在,就新增進去,如果需要寫入檔案,就寫入檔案;如果已經存在了,告訴上層呼叫 url 已經抓取過了。 return True self.fingerprints.add(fp) if self.file: self.file.write(fp + os.linesep)

弊端:

  • 如果該函式在下載器之前被呼叫,沒辦法保證目標url是否下載成功,如果失敗了,沒辦法重複下載重試。(經過驗證scrapy使用的是這種呼叫方式) 
    scrapy框架的解決辦法:在request中把dont_filter引數設為True
  • 如果該函式在下載器之後被呼叫,沒起到url去重應有的作用,依然浪費了資源。

init_add_request方法

from scrapy.http import Request def init_add_request(spider, url): """ 此方法用於在,scrapy啟動的時候新增一些已經跑過的url,讓爬蟲不需要重複跑 """ rf = spider.crawler.engine.slot.scheduler.df #找到例項化物件 request = Request(url) rf.request_seen(request) #呼叫request_seen方法

演示示例 
這次示例先使用沒有新增去重機制的爬蟲進行試驗,再新增去重機制看看兩者的區別。

import scrapy from scrapy.http import Request class SpiderCity58Spider(scrapy.Spider): name = 'spider_city_58' allowed_domains = ['58.com'] start_urls = ['http://cd.58.com/'] def parse(self, response): pass yield Request('http://bj.58.com' , callback = self.parse) #請求bj.58.com,並回調parse函式 yield Request('http://wh.58.com' , callback=self.parse) #請求wh.58.com,並回調parse函式

得到結果如下:


可以看到三個連結都可以正常爬取,返回碼為200

在scrapy資料夾下,建立去重檔案init_utils.py

from scrapy.http import Request def init_add_request(spider, url): """ 此方法用於在,scrapy啟動的時候新增一些已經跑過的url,讓爬蟲不需要重複跑 """ rf = spider.crawler.engine.slot.scheduler.df #找到例項化物件 request = Request(url) rf.request_seen(request) #呼叫request_seen方法

pipelines中新增去重函式:

def open_spider(self,spider): 
init_add_request(spider, ‘http://wh.58.com‘)

改變settings引數:

ITEM_PIPELINES = { 
‘city_58.pipelines.City58Pipeline’: 300, 
}

可以得到結果:

scrapy-redis是什麼?

scrapy-redis:一個三方的基於redis的分散式爬蟲框架,配合scrapy使用,讓爬蟲具有了分散式爬取的功能

scrapy-redis的原理

scrapy-redis實現分散式,其實從原理上來說很簡單,這裡為描述方便,我們把自己的核心伺服器稱為master,而把用於跑爬蟲程式的機器稱為slave。 
我們在master上搭建一個redis資料庫(注意這個資料庫只用作url的儲存,不關心爬取的具體資料,不要和後面的mongodb或者mysql混淆),並對每一個需要爬取的網站型別,都開闢一個單獨的列表欄位。通過設定slave上scrapy-redis獲取url的地址為master地址。這樣的結果就是,儘管有多個slave,然而大家獲取url的地方只有一個,那就是伺服器master上的redis資料庫。 
並且,由於scrapy-redis自身的佇列機制,slave獲取的連結不會相互衝突。這樣各個slave在完成抓取任務之後,再把獲取的結果彙總到伺服器上(這時的資料儲存不再在是redis,而是mongodb或者 mysql等存放具體內容的資料庫了) 
這種方法的好處就是程式移植性強,只要處理好路徑問題,把slave上的程式移植到另一臺機器上執行,基本上就是複製貼上的事情。

資料庫去重

理論上講,我們應該優先對url連結去重,資料庫去重是一個很牽強的方案資料庫去重,應該儘量使用資料庫本身的各種機制去去重,如唯一鍵

資料入庫

優先選擇非關係資料庫MongoDB,詳情請見2-4課後資料

課後資料

  • 自行了解,如何入庫到MySQL

補充資料

Berkeley DB資料庫

Berkeley DB是一個嵌入式資料庫,為應用程式提供可伸縮的、高效能的、有事務保護功能的資料管理服務。

主要特點:

  • 嵌入式:直接連結到應用程式中,與應用程式運行於同樣的地址空間中,因此,無論是在網路上不同計算機之間還是在同一臺計算機的不同程序之間,資料庫操作並不要求程序間通訊。 Berkeley DB為多種程式語言提供了API介面,其中包括C、C++、Java、Perl、Tcl、Python和PHP,所有的資料庫操作都在程式庫內部發生。多個程序,或者同一程序的多個執行緒可同時使用資料庫,有如各自單獨使用,底層的服務如加鎖、事務日誌、共享緩衝區管理、記憶體管理等等都由程式庫透明地執行。
  • 輕便靈活:可以運行於幾乎所有的UNIX和Linux系統及其變種系統、Windows作業系統以及多種嵌入式實時作業系統之下,已經被好多高階的因特網伺服器、桌上型電腦、掌上電腦、機頂盒、網路交換機以及其他一些應用領域所採用。一旦Berkeley DB被連結到應用程式中,終端使用者一般根本感覺不到有一個數據庫系統存在。
  • 可伸縮:Database library本身是很精簡的(少於300KB的文字空間),但它能夠管理規模高達256TB的資料庫。它支援高併發度,成千上萬個使用者可同時操縱同一個資料庫

資料入庫到MySQL

這次爬取的物件是58同城,作為例項只是爬取了北京程式設計師的招聘資訊。 
首先在mysql中初始化一個表,用來儲存資料

item.py:需要爬取的資料列表

import scrapy class E58TongchengItem(scrapy.Item): name = scrapy.Field() companyname = scrapy.Field() salary = scrapy.Field() type = scrapy.Field()

a58tongchengSpider.py:爬取網頁的具體操作

import scrapy from scrapy.selector import Selector from ..items import E58TongchengItem class A58tongchengSpider(scrapy.Spider): name = '58tongcheng' allowed_domains = ['www.58.com'] start_urls = ['http://zp.58.com/bailing/bj/tech/pn1?&ClickID=7'] def parse(self, response): item = E58TongchengItem() try: selector = Selector(text=response.body) except Exception as e: print(e) name_hrefse = selector.xpath('//*[@id="list_con"]/li') for se in name_hrefse: #使用xpath提取資料 item['name'] = se.xpath('div[1]/div[1]/a/span[2]/text()').extract()[0] item['companyname'] = se.xpath('div[2]/div/a/text()').extract()[0] item['salary'] = se.xpath('div[1]/p/text()').extract()[0] item['type'] = se.xpath('div[2]/p/span[1]/text()').extract()[0] yield item

E58TongchengPipeline.py:儲存在mysql中

import pymysql class E58TongchengPipeline(object): def process_item(self, item, spider): # 開啟資料庫連線 db = pymysql.connect("localhost", "root", "", "test", charset="utf8") # 使用 cursor() 方法建立一個遊標物件 cursor cursor = db.cursor() #插入資料庫 cursor.execute("insert into test(name,companyname,salary,type)values(%s,%s,%s,%s)",[item['name'],item['companyname'],item['salary'],item['type']]) #提交 db.commit() cursor.close() db.close() print(item) return item

最後使用命令啟動

scrapy crawl 58tongcheng

最後得到結果: