scrapy原始碼分析(十一)----------下載器Downloader
經過前面幾篇的分析,scrapy的五大核心元件已經介紹了4個:engine,scheduler,scraper,spidemw。
還剩最後一個downloader,這個下載器關係到了網頁如何下載,內容相對來說是最為複雜的一部分,這篇教程就逐步分析其原始碼。
下載操作開始於engine的_next_request_from_scheduler,這個方法已經不止一次提到過,這次只列出關鍵程式碼:
scrapy/core/engine.py:
def _next_request_from_scheduler(self, spider): slot = self.slot request = slot.scheduler.next_request() if not呼叫_download方法:request: return d = self._download(request, spider)
def _download(self, request, spider): slot = self.slot slot.add_request(request) def _on_success(response): assert isinstance(response, (Response, Request)) if isinstance(response, Response): response.request = request # tie request to response received_download方法首先將request加入slot的inprogress集合記錄正在進行的request,然後呼叫下載器downloader的fetch方法,給fetch返回的deferred新增一個'_on_success'方法,這樣在下載完成後會列印日誌併發送一個response_received訊息給關心者。logkws = self.logformatter.crawled(request, response, spider) logger.log(*logformatter_adapter(logkws), extra={'spider': spider}) self.signals.send_catch_log(signal=signals.response_received, \ response=response, request=request, spider=spider) returnresponse def _on_complete(_): slot.nextcall.schedule() return _ dwld = self.downloader.fetch(request, spider) dwld.addCallbacks(_on_success) dwld.addBoth(_on_complete) return dwld
我們看下這個預設的downloader是什麼:
scrapy/settings/default_settings.py:
scrapy.core.downloader.Downloader
我們先來看下它的建構函式,再看fetch方法的實現:
scrapy/core/downloader/__init__.py:
class Downloader(object): def __init__(self, crawler): self.settings = crawler.settings self.signals = crawler.signals self.slots = {} self.active = set() self.handlers = DownloadHandlers(crawler) self.total_concurrency = self.settings.getint('CONCURRENT_REQUESTS') self.domain_concurrency = self.settings.getint('CONCURRENT_REQUESTS_PER_DOMAIN') self.ip_concurrency = self.settings.getint('CONCURRENT_REQUESTS_PER_IP') self.randomize_delay = self.settings.getbool('RANDOMIZE_DOWNLOAD_DELAY') self.middleware = DownloaderMiddlewareManager.from_crawler(crawler) self._slot_gc_loop = task.LoopingCall(self._slot_gc) self._slot_gc_loop.start(60)關鍵物件4個:slots,active,DownloadHandlers,middleware以及一些配置選項。先依次分析4個物件的作用:
1.slots:
這個slots是一個儲存Slot物件的字典,key是request對應的域名,值是一個Slot物件。
Slot物件用來控制一種Request下載請求,通常這種下載請求是對於同一個域名。
這個Slot物件還控制了訪問這個域名的併發度,下載延遲控制,隨機延時等,主要是為了控制對一個域名的訪問策略,一定程度上避免流量過大被封IP,不能繼續爬取。
通過程式碼來詳細瞭解:
def _get_slot(self, request, spider): key = self._get_slot_key(request, spider) if key not in self.slots: conc = self.ip_concurrency if self.ip_concurrency else self.domain_concurrency conc, delay = _get_concurrency_delay(conc, spider, self.settings) self.slots[key] = Slot(conc, delay, self.randomize_delay) return key, self.slots[key]
可以看到,對於一個request,先呼叫'_get_slot_key'獲取request對應的key.
看下其中的'_get_slot_key'函式,可以看到我們可以通過給request的meta中新增'download_slot'來控制request的key值,這樣增加了靈活性。如果沒有定製request的key,則key值來源於request要訪問的域名。
另外對於request對應的域名也增加了快取機制:urlparse_cached,dnscahe.
def _get_slot_key(self, request, spider): if 'download_slot' in request.meta: return request.meta['download_slot'] key = urlparse_cached(request).hostname or '' if self.ip_concurrency: key = dnscache.get(key, key) return key同時也通過slots集合達到了快取的目的,對於同一個域名的訪問策略可以通過slots獲取而不用每次都解析配置。
然後根據key從slots裡取對應的Slot物件,如果還沒有,則構造一個新的物件。
if key not in self.slots: conc = self.ip_concurrency if self.ip_concurrency else self.domain_concurrency conc, delay = _get_concurrency_delay(conc, spider, self.settings) self.slots[key] = Slot(conc, delay, self.randomize_delay)
這個Slot物件有3個引數,併發度,延遲時間和隨機延遲。下面分別看下3個引數的獲取:
a.併發度
我們看下這個併發度先取ip併發度控制,如果沒有則取域名的併發配置。預設配置如下:
ip併發度:
CONCURRENT_REQUESTS_PER_IP = 0
域名併發度:
CONCURRENT_REQUESTS_PER_DOMAIN = 8
b.延遲:
def _get_concurrency_delay(concurrency, spider, settings): delay = settings.getfloat('DOWNLOAD_DELAY') if hasattr(spider, 'DOWNLOAD_DELAY'): warnings.warn("%s.DOWNLOAD_DELAY attribute is deprecated, use %s.download_delay instead" % (type(spider).__name__, type(spider).__name__)) delay = spider.DOWNLOAD_DELAY if hasattr(spider, 'download_delay'): delay = spider.download_delay if hasattr(spider, 'max_concurrent_requests'): concurrency = spider.max_concurrent_requests return concurrency, delay先從配置中取'DOWNLOAD_DELAY':
DOWNLOAD_DELAY = 0
如果spider定義了'DOWNLOAD_DELAY'則取它,這個大寫的配置已經過期,如果需要請定義小寫的值.
然後取spider定義的'max_concurrent_requests'.
綜上可知,併發度優先取spider定義的'max_concurrent_request',如果未定義則取配置中的ip併發度或域名併發度。
對於延遲則優先取spider中定義的'download_delay',如果示定義則取配置中的.
c.隨機延遲
RANDOMIZE_DOWNLOAD_DELAY = True
取配置中的值,是否開啟隨機下載延遲。如果開啟的話,會給前面2中的延遲值增加一個隨機性。綜上,對這個Slot物件的作用應該清楚了,就是控制一個域名的request的訪問策略。
如果一個域名的request已經爬取完了,如果清除slots中的快取呢?
後面通過task.LoopingCall安裝了一個60s的定時心跳函式_slot_gc,這個函式用於對slots中的物件進行定期的回收。
垃圾回收: def _slot_gc(self, age=60): mintime = time() - age for key, slot in list(self.slots.items()): if not slot.active and slot.lastseen + slot.delay < mintime: self.slots.pop(key).close()可以看到垃圾回收的策略:如果一個Slot物件沒有正在活動的下載request,且距離上次活動的時間已經過去了60s則進行回收。
2.active
active是一個活動集合,用於記錄當前正在下載的request集合。
3.handlers:
它是一個DownloadHandlers物件,它控制了許多handlers,對於不同的下載協議使用不同的handlers.
預設支援handlers如下:
DOWNLOAD_HANDLERS_BASE = { 'file': 'scrapy.core.downloader.handlers.file.FileDownloadHandler', 'http': 'scrapy.core.downloader.handlers.http.HTTPDownloadHandler', 'https': 'scrapy.core.downloader.handlers.http.HTTPDownloadHandler', 's3': 'scrapy.core.downloader.handlers.s3.S3DownloadHandler', 'ftp': 'scrapy.core.downloader.handlers.ftp.FTPDownloadHandler', }後面下載網頁會呼叫handler的download_request方法,後面講fetch原始碼時再詳細講解。
4.middleware
這個middleware前面已經講解過很多次,對於下載器,它使用的中介軟體管理器是
DownloaderMiddlewareManager
當然,也通過呼叫其from_crawler方法生成下載器中介軟體管理器物件。
self.middleware = DownloaderMiddlewareManager.from_crawler(crawler)前面講過,中介軟體要自己實現'_get_mwlist_from_settings'構造自己的中介軟體列表。還可以實現‘_add_middleware'方法來新增特有的中介軟體方法。我們看下DownloaderMiddlewareManager的實現:
scrapy/core/downloader/middleware.py:
@classmethod def _get_mwlist_from_settings(cls, settings): return build_component_list( settings.getwithbase('DOWNLOADER_MIDDLEWARES')) def _add_middleware(self, mw): if hasattr(mw, 'process_request'): self.methods['process_request'].append(mw.process_request) if hasattr(mw, 'process_response'): self.methods['process_response'].insert(0, mw.process_response) if hasattr(mw, 'process_exception'): self.methods['process_exception'].insert(0, mw.process_exception)可以看到,加入的中介軟體為'DOWNLOADER_MIDDLEWARES',預設有以下幾個:
DOWNLOADER_MIDDLEWARES_BASE = { # Engine side 'scrapy.downloadermiddlewares.robotstxt.RobotsTxtMiddleware': 100, 'scrapy.downloadermiddlewares.httpauth.HttpAuthMiddleware': 300, 'scrapy.downloadermiddlewares.downloadtimeout.DownloadTimeoutMiddleware': 350, 'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': 400, 'scrapy.downloadermiddlewares.retry.RetryMiddleware': 500, 'scrapy.downloadermiddlewares.defaultheaders.DefaultHeadersMiddleware': 550, 'scrapy.downloadermiddlewares.ajaxcrawl.AjaxCrawlMiddleware': 560, 'scrapy.downloadermiddlewares.redirect.MetaRefreshMiddleware': 580, 'scrapy.downloadermiddlewares.httpcompression.HttpCompressionMiddleware': 590, 'scrapy.downloadermiddlewares.redirect.RedirectMiddleware': 600, 'scrapy.downloadermiddlewares.cookies.CookiesMiddleware': 700, 'scrapy.downloadermiddlewares.httpproxy.HttpProxyMiddleware': 750, 'scrapy.downloadermiddlewares.chunked.ChunkedTransferMiddleware': 830, 'scrapy.downloadermiddlewares.stats.DownloaderStats': 850, 'scrapy.downloadermiddlewares.httpcache.HttpCacheMiddleware': 900, # Downloader side }另外,對於下載中介軟體,可以實現的方法有’process_request','process_response','process_exception'.
分析完了4個關鍵物件,我們通過fetch方法來看下下載器是如何使用它們工作的:
def fetch(self, request, spider): def _deactivate(response): self.active.remove(request) return response self.active.add(request) dfd = self.middleware.download(self._enqueue_request, request, spider) return dfd.addBoth(_deactivate)首先,呼叫中介軟體管理器的download方法,同時傳入了自己的_enqueue_request方法。
看下中介軟體管理器的download方法:
scrapy/core/downloader/middleware.py:
def download(self, download_func, request, spider): @defer.inlineCallbacks def process_request(request): for method in self.methods['process_request']: response = yield method(request=request, spider=spider) assert response is None or isinstance(response, (Response, Request)), \ 'Middleware %s.process_request must return None, Response or Request, got %s' % \ (six.get_method_self(method).__class__.__name__, response.__class__.__name__) if response: defer.returnValue(response) defer.returnValue((yield download_func(request=request,spider=spider))) @defer.inlineCallbacks def process_response(response): assert response is not None, 'Received None in process_response' if isinstance(response, Request): defer.returnValue(response) for method in self.methods['process_response']: response = yield method(request=request, response=response, spider=spider) assert isinstance(response, (Response, Request)), \ 'Middleware %s.process_response must return Response or Request, got %s' % \ (six.get_method_self(method).__class__.__name__, type(response)) if isinstance(response, Request): defer.returnValue(response) defer.returnValue(response) @defer.inlineCallbacks def process_exception(_failure): exception = _failure.value for method in self.methods['process_exception']: response = yield method(request=request, exception=exception, spider=spider) assert response is None or isinstance(response, (Response, Request)), \ 'Middleware %s.process_exception must return None, Response or Request, got %s' % \ (six.get_method_self(method).__class__.__name__, type(response)) if response: defer.returnValue(response) defer.returnValue(_failure) deferred = mustbe_deferred(process_request, request) deferred.addErrback(process_exception) deferred.addCallback(process_response) return deferred可以看出和上一節講的spidermiddlewaremanager的scrape_reponse方法類似,先依次呼叫下載中介軟體的'process_request'方法處理request,然後呼叫Downloader的'_enqueue_request'方法進行下載,最後對response依次呼叫中介軟體的'process_response'方法。
接著,分析Downloader的_enqueue_request方法:
def _enqueue_request(self, request, spider): key, slot = self._get_slot(request, spider) request.meta['download_slot'] = key def _deactivate(response): slot.active.remove(request) return response slot.active.add(request) deferred = defer.Deferred().addBoth(_deactivate) slot.queue.append((request, deferred)) self._process_queue(spider, slot) return deferred這個方法一開始呼叫前面分析的'_get_slot'方法獲取request相對應的Slot物件(主要是分析域名),然後向對應的slot對應的活動集合active中新增一個request,並向slot的佇列queue新增request和對應的deferred物件。然後呼叫'_process_queue'方法處理slot物件。
接著分析'_process_queue'方法:
這個方法主要用於從slot物件的佇列queue中獲取請求並下載。
def _process_queue(self, spider, slot): if slot.latercall and slot.latercall.active(): /*如果一個latercall正在執行則直接返回*/ return # Delay queue processing if a download_delay is configured now = time() delay = slot.download_delay() /*獲取slot物件的延遲時間*/ if delay: penalty = delay - now + slot.lastseen /*距離上次執行還需要延遲則latercall*/ if penalty > 0: slot.latercall = reactor.callLater(penalty, self._process_queue, spider, slot) return # Process enqueued requests if there are free slots to transfer for this slot while slot.queue and slot.free_transfer_slots() > 0: /*不停地處理slot佇列queue中的請求,如果佇列非空且有空閒的傳輸slot,則下載,如果需要延遲則繼續呼叫'_process_queue'*/ slot.lastseen = now request, deferred = slot.queue.popleft() dfd = self._download(slot, request, spider) dfd.chainDeferred(deferred) # prevent burst if inter-request delays were configured if delay: self._process_queue(spider, slot) break這個函式通過最下面的while迴圈處理佇列中的請求,並判斷當前是否有空閒的傳輸slot,有空閒的才繼續下載處理。
處理下載請求時,會不斷更新slot的lastseen為當前時間,這個值代表了slot的最近一次活躍下載時間。
這裡有個注意點就是如果當前沒有空閒的傳輸slot而佇列非空,那麼未處理的request怎麼辦?(後面講解)
如果需要delay則再次呼叫'_process_queue',否則不停地繼續下載request.
再次呼叫後,會先計算延遲時間距離上次活躍時間是否到時,如果還要延遲則啟動一個latercall(通過twisted的reactor的callLater實現)。這個latercall會再次處理slot的佇列queue.因此入口處判斷如果有正在活動的latercall則不再處理。
這樣,就不斷地處理下載請求,並根據需要進行適當的延遲。
緊接著分析'_download'方法:
def _download(self, slot, request, spider): # The order is very important for the following deferreds. Do not change! # 1. Create the download deferred dfd = mustbe_deferred(self.handlers.download_request, request, spider) # 2. Notify response_downloaded listeners about the recent download # before querying queue for next request def _downloaded(response): self.signals.send_catch_log(signal=signals.response_downloaded, response=response, request=request, spider=spider) return response dfd.addCallback(_downloaded) # 3. After response arrives, remove the request from transferring # state to free up the transferring slot so it can be used by the # following requests (perhaps those which came from the downloader # middleware itself) slot.transferring.add(request) def finish_transferring(_): slot.transferring.remove(request) self._process_queue(spider, slot) return _ return dfd.addBoth(finish_transferring)可以看到這裡呼叫了DownloadHandlers的download_request方法,並向傳輸集合transferring中新增正在傳輸request.
並給返回的Deferred物件添加了finish_transferring方法。
這裡finish_transferring方法解釋了上面的疑問,每次下載一個request完成,都會從傳輸集合中移除request,並觸發一次_process_queue操作,這樣就保證了佇列queue中的請求不會殘留。
下面分析handler的download_request方法:
scrapy/core/downloader/handlers/__init__.py:
def download_request(self, request, spider): scheme = urlparse_cached(request).scheme handler = self._get_handler(scheme) if not handler: raise NotSupported("Unsupported URL scheme '%s': %s" % (scheme, self._notconfigured[scheme])) return handler.download_request(request, spider)這裡根據url的scheme獲取對應的handler,這裡的handler前面已經講過了。就是不同協議對應不同的handler.
這樣下載器Downloader的工作流程和核心程式碼就分析完了,至於具體怎麼通過網路下載網頁,後面詳細分析。