1. 程式人生 > >scrapy原始碼分析(十一)----------下載器Downloader

scrapy原始碼分析(十一)----------下載器Downloader

經過前面幾篇的分析,scrapy的五大核心元件已經介紹了4個:engine,scheduler,scraper,spidemw。

還剩最後一個downloader,這個下載器關係到了網頁如何下載,內容相對來說是最為複雜的一部分,這篇教程就逐步分析其原始碼。

下載操作開始於engine的_next_request_from_scheduler,這個方法已經不止一次提到過,這次只列出關鍵程式碼:

scrapy/core/engine.py:

def _next_request_from_scheduler(self, spider):
    slot = self.slot
    request = slot.scheduler.next_request()
    if not 
request: return d = self._download(request, spider)
呼叫_download方法:
def _download(self, request, spider):
    slot = self.slot
    slot.add_request(request)
    def _on_success(response):
        assert isinstance(response, (Response, Request))
        if isinstance(response, Response):
            response.request = request # tie request to response received
logkws = self.logformatter.crawled(request, response, spider) logger.log(*logformatter_adapter(logkws), extra={'spider': spider}) self.signals.send_catch_log(signal=signals.response_received, \ response=response, request=request, spider=spider) return
response def _on_complete(_): slot.nextcall.schedule() return _ dwld = self.downloader.fetch(request, spider) dwld.addCallbacks(_on_success) dwld.addBoth(_on_complete) return dwld
_download方法首先將request加入slot的inprogress集合記錄正在進行的request,然後呼叫下載器downloader的fetch方法,給fetch返回的deferred新增一個'_on_success'方法,這樣在下載完成後會列印日誌併發送一個response_received訊息給關心者。

我們看下這個預設的downloader是什麼:

scrapy/settings/default_settings.py:

scrapy.core.downloader.Downloader
我們先來看下它的建構函式,再看fetch方法的實現:

scrapy/core/downloader/__init__.py:

class Downloader(object):

    def __init__(self, crawler):
        self.settings = crawler.settings
        self.signals = crawler.signals
        self.slots = {}
        self.active = set()
        self.handlers = DownloadHandlers(crawler)
        self.total_concurrency = self.settings.getint('CONCURRENT_REQUESTS')
        self.domain_concurrency = self.settings.getint('CONCURRENT_REQUESTS_PER_DOMAIN')
        self.ip_concurrency = self.settings.getint('CONCURRENT_REQUESTS_PER_IP')
        self.randomize_delay = self.settings.getbool('RANDOMIZE_DOWNLOAD_DELAY')
        self.middleware = DownloaderMiddlewareManager.from_crawler(crawler)
        self._slot_gc_loop = task.LoopingCall(self._slot_gc)
        self._slot_gc_loop.start(60)
關鍵物件4個:slots,active,DownloadHandlers,middleware以及一些配置選項。先依次分析4個物件的作用:

1.slots:

這個slots是一個儲存Slot物件的字典,key是request對應的域名,值是一個Slot物件。

Slot物件用來控制一種Request下載請求,通常這種下載請求是對於同一個域名。

這個Slot物件還控制了訪問這個域名的併發度,下載延遲控制,隨機延時等,主要是為了控制對一個域名的訪問策略,一定程度上避免流量過大被封IP,不能繼續爬取。

通過程式碼來詳細瞭解:

def _get_slot(self, request, spider):
    key = self._get_slot_key(request, spider)
    if key not in self.slots:
        conc = self.ip_concurrency if self.ip_concurrency else self.domain_concurrency
        conc, delay = _get_concurrency_delay(conc, spider, self.settings)
        self.slots[key] = Slot(conc, delay, self.randomize_delay)

    return key, self.slots[key]

可以看到,對於一個request,先呼叫'_get_slot_key'獲取request對應的key.

看下其中的'_get_slot_key'函式,可以看到我們可以通過給request的meta中新增'download_slot'來控制request的key值,這樣增加了靈活性。如果沒有定製request的key,則key值來源於request要訪問的域名。

另外對於request對應的域名也增加了快取機制:urlparse_cached,dnscahe.

def _get_slot_key(self, request, spider):
    if 'download_slot' in request.meta:
        return request.meta['download_slot']

    key = urlparse_cached(request).hostname or ''
    if self.ip_concurrency:
        key = dnscache.get(key, key)

    return key
同時也通過slots集合達到了快取的目的,對於同一個域名的訪問策略可以通過slots獲取而不用每次都解析配置。

然後根據key從slots裡取對應的Slot物件,如果還沒有,則構造一個新的物件。

if key not in self.slots:
        conc = self.ip_concurrency if self.ip_concurrency else self.domain_concurrency
        conc, delay = _get_concurrency_delay(conc, spider, self.settings)
        self.slots[key] = Slot(conc, delay, self.randomize_delay)

這個Slot物件有3個引數,併發度,延遲時間和隨機延遲。下面分別看下3個引數的獲取:

a.併發度

我們看下這個併發度先取ip併發度控制,如果沒有則取域名的併發配置。預設配置如下:

ip併發度:

CONCURRENT_REQUESTS_PER_IP = 0
域名併發度:
CONCURRENT_REQUESTS_PER_DOMAIN = 8

b.延遲:
def _get_concurrency_delay(concurrency, spider, settings):
    delay = settings.getfloat('DOWNLOAD_DELAY')
    if hasattr(spider, 'DOWNLOAD_DELAY'):
        warnings.warn("%s.DOWNLOAD_DELAY attribute is deprecated, use %s.download_delay instead" %
                      (type(spider).__name__, type(spider).__name__))
        delay = spider.DOWNLOAD_DELAY
    if hasattr(spider, 'download_delay'):
        delay = spider.download_delay

    if hasattr(spider, 'max_concurrent_requests'):
        concurrency = spider.max_concurrent_requests

    return concurrency, delay
先從配置中取'DOWNLOAD_DELAY':
DOWNLOAD_DELAY = 0
如果spider定義了'DOWNLOAD_DELAY'則取它,這個大寫的配置已經過期,如果需要請定義小寫的值.

然後取spider定義的'max_concurrent_requests'.

綜上可知,併發度優先取spider定義的'max_concurrent_request',如果未定義則取配置中的ip併發度或域名併發度。

對於延遲則優先取spider中定義的'download_delay',如果示定義則取配置中的.

c.隨機延遲

RANDOMIZE_DOWNLOAD_DELAY = True
取配置中的值,是否開啟隨機下載延遲。如果開啟的話,會給前面2中的延遲值增加一個隨機性。

綜上,對這個Slot物件的作用應該清楚了,就是控制一個域名的request的訪問策略。

如果一個域名的request已經爬取完了,如果清除slots中的快取呢?

後面通過task.LoopingCall安裝了一個60s的定時心跳函式_slot_gc,這個函式用於對slots中的物件進行定期的回收。

垃圾回收:
def _slot_gc(self, age=60):
    mintime = time() - age
    for key, slot in list(self.slots.items()):
        if not slot.active and slot.lastseen + slot.delay < mintime:
            self.slots.pop(key).close()
可以看到垃圾回收的策略:如果一個Slot物件沒有正在活動的下載request,且距離上次活動的時間已經過去了60s則進行回收。

2.active

active是一個活動集合,用於記錄當前正在下載的request集合。

3.handlers:

它是一個DownloadHandlers物件,它控制了許多handlers,對於不同的下載協議使用不同的handlers.

預設支援handlers如下:

DOWNLOAD_HANDLERS_BASE = {
    'file': 'scrapy.core.downloader.handlers.file.FileDownloadHandler',
    'http': 'scrapy.core.downloader.handlers.http.HTTPDownloadHandler',
    'https': 'scrapy.core.downloader.handlers.http.HTTPDownloadHandler',
    's3': 'scrapy.core.downloader.handlers.s3.S3DownloadHandler',
    'ftp': 'scrapy.core.downloader.handlers.ftp.FTPDownloadHandler',
}
後面下載網頁會呼叫handler的download_request方法,後面講fetch原始碼時再詳細講解。

4.middleware

這個middleware前面已經講解過很多次,對於下載器,它使用的中介軟體管理器是

DownloaderMiddlewareManager
當然,也通過呼叫其from_crawler方法生成下載器中介軟體管理器物件。
self.middleware = DownloaderMiddlewareManager.from_crawler(crawler)
前面講過,中介軟體要自己實現'_get_mwlist_from_settings'構造自己的中介軟體列表。還可以實現‘_add_middleware'方法來新增特有的中介軟體方法。我們看下DownloaderMiddlewareManager的實現:

scrapy/core/downloader/middleware.py:

@classmethod
def _get_mwlist_from_settings(cls, settings):
    return build_component_list(
        settings.getwithbase('DOWNLOADER_MIDDLEWARES'))

def _add_middleware(self, mw):
    if hasattr(mw, 'process_request'):
        self.methods['process_request'].append(mw.process_request)
    if hasattr(mw, 'process_response'):
        self.methods['process_response'].insert(0, mw.process_response)
    if hasattr(mw, 'process_exception'):
        self.methods['process_exception'].insert(0, mw.process_exception)
可以看到,加入的中介軟體為'DOWNLOADER_MIDDLEWARES',預設有以下幾個:
DOWNLOADER_MIDDLEWARES_BASE = {
    # Engine side
'scrapy.downloadermiddlewares.robotstxt.RobotsTxtMiddleware': 100,
    'scrapy.downloadermiddlewares.httpauth.HttpAuthMiddleware': 300,
    'scrapy.downloadermiddlewares.downloadtimeout.DownloadTimeoutMiddleware': 350,
    'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': 400,
    'scrapy.downloadermiddlewares.retry.RetryMiddleware': 500,
    'scrapy.downloadermiddlewares.defaultheaders.DefaultHeadersMiddleware': 550,
    'scrapy.downloadermiddlewares.ajaxcrawl.AjaxCrawlMiddleware': 560,
    'scrapy.downloadermiddlewares.redirect.MetaRefreshMiddleware': 580,
    'scrapy.downloadermiddlewares.httpcompression.HttpCompressionMiddleware': 590,
    'scrapy.downloadermiddlewares.redirect.RedirectMiddleware': 600,
    'scrapy.downloadermiddlewares.cookies.CookiesMiddleware': 700,
    'scrapy.downloadermiddlewares.httpproxy.HttpProxyMiddleware': 750,
    'scrapy.downloadermiddlewares.chunked.ChunkedTransferMiddleware': 830,
    'scrapy.downloadermiddlewares.stats.DownloaderStats': 850,
    'scrapy.downloadermiddlewares.httpcache.HttpCacheMiddleware': 900,
    # Downloader side
}
另外,對於下載中介軟體,可以實現的方法有’process_request','process_response','process_exception'.

分析完了4個關鍵物件,我們通過fetch方法來看下下載器是如何使用它們工作的:

def fetch(self, request, spider):
    def _deactivate(response):
        self.active.remove(request)
        return response

    self.active.add(request)
    dfd = self.middleware.download(self._enqueue_request, request, spider)
    return dfd.addBoth(_deactivate)
首先,呼叫中介軟體管理器的download方法,同時傳入了自己的_enqueue_request方法。

看下中介軟體管理器的download方法:

scrapy/core/downloader/middleware.py:

def download(self, download_func, request, spider):
    @defer.inlineCallbacks
def process_request(request):
        for method in self.methods['process_request']:
            response = yield method(request=request, spider=spider)
            assert response is None or isinstance(response, (Response, Request)), \
                    'Middleware %s.process_request must return None, Response or Request, got %s' % \
                    (six.get_method_self(method).__class__.__name__, response.__class__.__name__)
            if response:
                defer.returnValue(response)
        defer.returnValue((yield download_func(request=request,spider=spider)))

    @defer.inlineCallbacks
def process_response(response):
        assert response is not None, 'Received None in process_response'
if isinstance(response, Request):
            defer.returnValue(response)

        for method in self.methods['process_response']:
            response = yield method(request=request, response=response,
                                    spider=spider)
            assert isinstance(response, (Response, Request)), \
                'Middleware %s.process_response must return Response or Request, got %s' % \
                (six.get_method_self(method).__class__.__name__, type(response))
            if isinstance(response, Request):
                defer.returnValue(response)
        defer.returnValue(response)

    @defer.inlineCallbacks
def process_exception(_failure):
        exception = _failure.value
        for method in self.methods['process_exception']:
            response = yield method(request=request, exception=exception,
                                    spider=spider)
            assert response is None or isinstance(response, (Response, Request)), \
                'Middleware %s.process_exception must return None, Response or Request, got %s' % \
                (six.get_method_self(method).__class__.__name__, type(response))
            if response:
                defer.returnValue(response)
        defer.returnValue(_failure)

    deferred = mustbe_deferred(process_request, request)
    deferred.addErrback(process_exception)
    deferred.addCallback(process_response)
    return deferred
可以看出和上一節講的spidermiddlewaremanager的scrape_reponse方法類似,先依次呼叫下載中介軟體的'process_request'方法處理request,然後呼叫Downloader的'_enqueue_request'方法進行下載,最後對response依次呼叫中介軟體的'process_response'方法。

接著,分析Downloader的_enqueue_request方法:

def _enqueue_request(self, request, spider):
    key, slot = self._get_slot(request, spider)
    request.meta['download_slot'] = key

    def _deactivate(response):
        slot.active.remove(request)
        return response

    slot.active.add(request)
    deferred = defer.Deferred().addBoth(_deactivate)
    slot.queue.append((request, deferred))
    self._process_queue(spider, slot)
    return deferred
這個方法一開始呼叫前面分析的'_get_slot'方法獲取request相對應的Slot物件(主要是分析域名),然後向對應的slot對應的活動集合active中新增一個request,並向slot的佇列queue新增request和對應的deferred物件。然後呼叫'_process_queue'方法處理slot物件。

接著分析'_process_queue'方法:

這個方法主要用於從slot物件的佇列queue中獲取請求並下載。

def _process_queue(self, spider, slot):
    if slot.latercall and slot.latercall.active(): /*如果一個latercall正在執行則直接返回*/
        return
# Delay queue processing if a download_delay is configured
now = time()
    delay = slot.download_delay() /*獲取slot物件的延遲時間*/
    if delay:
        penalty = delay - now + slot.lastseen /*距離上次執行還需要延遲則latercall*/
        if penalty > 0:
            slot.latercall = reactor.callLater(penalty, self._process_queue, spider, slot)
            return

    # Process enqueued requests if there are free slots to transfer for this slot
while slot.queue and slot.free_transfer_slots() > 0: /*不停地處理slot佇列queue中的請求,如果佇列非空且有空閒的傳輸slot,則下載,如果需要延遲則繼續呼叫'_process_queue'*/
        slot.lastseen = now
        request, deferred = slot.queue.popleft()
        dfd = self._download(slot, request, spider)
        dfd.chainDeferred(deferred)
        # prevent burst if inter-request delays were configured
if delay:
            self._process_queue(spider, slot)
            break
這個函式通過最下面的while迴圈處理佇列中的請求,並判斷當前是否有空閒的傳輸slot,有空閒的才繼續下載處理。

處理下載請求時,會不斷更新slot的lastseen為當前時間,這個值代表了slot的最近一次活躍下載時間。

這裡有個注意點就是如果當前沒有空閒的傳輸slot而佇列非空,那麼未處理的request怎麼辦?(後面講解)

如果需要delay則再次呼叫'_process_queue',否則不停地繼續下載request.

再次呼叫後,會先計算延遲時間距離上次活躍時間是否到時,如果還要延遲則啟動一個latercall(通過twisted的reactor的callLater實現)。這個latercall會再次處理slot的佇列queue.因此入口處判斷如果有正在活動的latercall則不再處理。

這樣,就不斷地處理下載請求,並根據需要進行適當的延遲。

緊接著分析'_download'方法:

def _download(self, slot, request, spider):
    # The order is very important for the following deferreds. Do not change!
    # 1. Create the download deferred
dfd = mustbe_deferred(self.handlers.download_request, request, spider)

    # 2. Notify response_downloaded listeners about the recent download
    # before querying queue for next request
def _downloaded(response):
        self.signals.send_catch_log(signal=signals.response_downloaded,
                                    response=response,
                                    request=request,
                                    spider=spider)
        return response
    dfd.addCallback(_downloaded)

    # 3. After response arrives,  remove the request from transferring
    # state to free up the transferring slot so it can be used by the
    # following requests (perhaps those which came from the downloader
    # middleware itself)
slot.transferring.add(request)

    def finish_transferring(_):
        slot.transferring.remove(request)
        self._process_queue(spider, slot)
        return _

    return dfd.addBoth(finish_transferring)
可以看到這裡呼叫了DownloadHandlers的download_request方法,並向傳輸集合transferring中新增正在傳輸request.

並給返回的Deferred物件添加了finish_transferring方法。

這裡finish_transferring方法解釋了上面的疑問,每次下載一個request完成,都會從傳輸集合中移除request,並觸發一次_process_queue操作,這樣就保證了佇列queue中的請求不會殘留。

下面分析handler的download_request方法:

scrapy/core/downloader/handlers/__init__.py:

def download_request(self, request, spider):
    scheme = urlparse_cached(request).scheme
    handler = self._get_handler(scheme)
    if not handler:
        raise NotSupported("Unsupported URL scheme '%s': %s" %
                           (scheme, self._notconfigured[scheme]))
    return handler.download_request(request, spider)
這裡根據url的scheme獲取對應的handler,這裡的handler前面已經講過了。就是不同協議對應不同的handler.

這樣下載器Downloader的工作流程和核心程式碼就分析完了,至於具體怎麼通過網路下載網頁,後面詳細分析。