1. 程式人生 > >調度 engine._next_request_from_scheduler() 取出request交給handler,結果是request,執行engine.craw(),結果是resp/fail,下一步看scraper.enqueue_scrape()

調度 engine._next_request_from_scheduler() 取出request交給handler,結果是request,執行engine.craw(),結果是resp/fail,下一步看scraper.enqueue_scrape()

put left com nal manager cep time() erro inline

0.def _next_request_from_scheduler(self, spider):

C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\engine.py

    def _next_request_from_scheduler(self, spider):
        slot = self.slot
        request = slot.scheduler.next_request()  #首先從優先級隊列取出一個 request
        if not request:
            
return d = self._download(request, spider) #request交給handler下載 d.addBoth(self._handle_downloader_output, request, spider) #拿到下載結果執行回調 d.addErrback(lambda f: logger.info(Error while handling downloader output, exc_info=failure_to_exc_info(f), extra
={spider: spider})) d.addBoth(lambda _: slot.remove_request(request)) d.addErrback(lambda f: logger.info(Error while removing request from slot, exc_info=failure_to_exc_info(f), extra={spider: spider})) d.addBoth(
lambda _: slot.nextcall.schedule()) d.addErrback(lambda f: logger.info(Error while scheduling new request, exc_info=failure_to_exc_info(f), extra={spider: spider})) return d

1. request = slot.scheduler.next_request() #scheduler從優先級隊列[當前優先級]取出一個request返回給engine

C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\scheduler.py

class Scheduler(object):

    def next_request(self):
        request = self.mqs.pop()  #這裏的mqs指的是管理多個mqclss的pqclass,其中self.queues = {0:mqclss(),},如果使用dqclass,scheduler會先從指定文件夾加載保存的request,使用 PickleLifoDiskQueue
        if request:
            self.stats.inc_value(scheduler/dequeued/memory, spider=self.spider)
        else:
            request = self._dqpop()
            if request:
                self.stats.inc_value(scheduler/dequeued/disk, spider=self.spider)
        if request:
            self.stats.inc_value(scheduler/dequeued, spider=self.spider)
        return request

C:\Program Files\Anaconda2\Lib\site-packages\queuelib\pqueue.py ########################後面補充入隊相關操作

class PriorityQueue(object):
    def push(self, obj, priority=0):  #入隊默認優先級為0
        if priority not in self.queues:
            self.queues[priority] = self.qfactory(priority)
        q = self.queues[priority]
        q.push(obj) # this may fail (eg. serialization error)
        if self.curprio is None or priority < self.curprio:  #入隊的時候發現更高優先級,更新當前優先級,保證優先處理重定向
            self.curprio = priority

    def pop(self):
        if self.curprio is None:
            return
        q = self.queues[self.curprio]
        m = q.pop()  #取出當前pri的一個request
        if len(q) == 0: #如果取完之後當前pri隊列為空,則更新當前pri
            del self.queues[self.curprio]
            q.close()
            prios = [p for p, q in self.queues.items() if len(q) > 0]
            self.curprio = min(prios) if prios else None  #更新當前pri為最小值
        return m 

2. d = self._download(request, spider) #request交給handler下載

C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\engine.py

    def _download(self, request, spider):
        slot = self.slot
        slot.add_request(request)        #slot:self.inprogress = set()     self.inprogress.add(request)
        def _on_success(response):
            assert isinstance(response, (Response, Request))
            if isinstance(response, Response):
                response.request = request # tie request to response received
                logkws = self.logformatter.crawled(request, response, spider)
                logger.log(*logformatter_adapter(logkws), extra={spider: spider})
                self.signals.send_catch_log(signal=signals.response_received,                     response=response, request=request, spider=spider)
            return response

        def _on_complete(_):
            slot.nextcall.schedule()
            return _

        dwld = self.downloader.fetch(request, spider)    #DOWNLOADER = ‘scrapy.core.downloader.Downloader‘  
        dwld.addCallbacks(_on_success) ###############################################
        dwld.addBoth(_on_complete) #############################################
        return dwld

###dwld = self.downloader.fetch(request, spider)

C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\downloader\__init__.py

from .middleware import DownloaderMiddlewareManager
from .handlers import DownloadHandlers
class Downloader(object):

    def __init__(self, crawler):
        self.handlers = DownloadHandlers(crawler)
        self.middleware = DownloaderMiddlewareManager.from_crawler(crawler)

    def fetch(self, request, spider):
        def _deactivate(response):
            self.active.remove(request)
            return response

        self.active.add(request)
        dfd = self.middleware.download(self._enqueue_request, request, spider)
        return dfd.addBoth(_deactivate) 

####先在 DownloaderMiddlewareManager 加工 request

C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\downloader\middleware.py

class DownloaderMiddlewareManager(MiddlewareManager):

    def download(self, download_func, request, spider):
        @defer.inlineCallbacks
        def process_request(request):
            for method in self.methods[process_request]:
                response = yield method(request=request, spider=spider)
                assert response is None or isinstance(response, (Response, Request)),                         Middleware %s.process_request must return None, Response or Request, got %s %                         (six.get_method_self(method).__class__.__name__, response.__class__.__name__)
                if response:
                    defer.returnValue(response)
            defer.returnValue((yield download_func(request=request,spider=spider))) #正常流程是對 request 進行一系列加工,去 yield 傳入的 _enqueue_request()

        deferred = mustbe_deferred(process_request, request)  #正常流程走完,激活走下面的 process_exception 或 process_response
        deferred.addErrback(process_exception)
        deferred.addCallback(process_response)
        return deferred

####加工後的request存入downloader維護的 self.slots{hostname:slot},順便從當前slot queue取出request交給handler下載,直到填滿當前域名最大並行數

C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\downloader\__init__.py

class Downloader(object):

    def __init__(self, crawler):
        self.slots = {}    #字典 self.slots[key] = Slot(conc, delay, self.randomize_delay)

    def _enqueue_request(self, request, spider):   #從scheduler取出request》》》downloader.fetch:mw加工request,在downloader維護的slots字典中選定slot
        key, slot = self._get_slot(request, spider)  #key默認是hostname,指向一個對應的slot。如果字典沒有對應的slot,新建。
        request.meta[download_slot] = key

        def _deactivate(response):
            slot.active.remove(request)
            return response

        slot.active.add(request)  ##
        deferred = defer.Deferred().addBoth(_deactivate)  ##
        slot.queue.append((request, deferred))  # queue無限存儲  active set()  transferring set()
        self._process_queue(spider, slot)  # 當前hostname對應的slot的並行有空缺,則取出request交給handler,並一定就是當前存入slot queue的request。
        return deferred

    def _process_queue(self, spider, slot):
        if slot.latercall and slot.latercall.active():
            return

        # Delay queue processing if a download_delay is configured
        now = time()
        delay = slot.download_delay()  #默認DOWNLOAD_DELAY = 0
        if delay:
            penalty = delay - now + slot.lastseen
            if penalty > 0:
                slot.latercall = reactor.callLater(penalty, self._process_queue, spider, slot)
                return

        # Process enqueued requests if there are free slots to transfer for this slot  #只要當前hostname對應的queue存儲不為空而且並行有空缺
        while slot.queue and slot.free_transfer_slots() > 0:  #默認每個域名即hostname對應一個slot,CONCURRENT_REQUESTS_PER_DOMAIN = 8,這裏計算 8減去self.transferring,下面顯示request交給handler之後加1
            slot.lastseen = now
            request, deferred = slot.queue.popleft()
            dfd = self._download(slot, request, spider)
            dfd.chainDeferred(deferred)
            # prevent burst if inter-request delays were configured
            if delay:
                self._process_queue(spider, slot)
                break

    def _download(self, slot, request, spider):
        # The order is very important for the following deferreds. Do not change!

        # 1. Create the download deferred
        dfd = mustbe_deferred(self.handlers.download_request, request, spider)  #交給handler

        # 2. Notify response_downloaded listeners about the recent download
        # before querying queue for next request
        def _downloaded(response):
            self.signals.send_catch_log(signal=signals.response_downloaded,
                                        response=response,
                                        request=request,
                                        spider=spider)
            return response
        dfd.addCallback(_downloaded)  #handler返回resp,通知。。。。。。。。。。。

        # 3. After response arrives,  remove the request from transferring
        # state to free up the transferring slot so it can be used by the
        # following requests (perhaps those which came from the downloader
        # middleware itself)
        slot.transferring.add(request)   #request交給handler之後,加1

        def finish_transferring(_):
            slot.transferring.remove(request)
            self._process_queue(spider, slot)
            return _

        return dfd.addBoth(finish_transferring)  #handler返回resp,減1

####直到填滿當前slot並行數:從slot queue取出的request,根據 scheme 選擇相應 handler ,比如 http 選擇 HTTPDownloadHandler 實際對應 \core\downloader\handlers\http11.py HTTP11DownloadHandler

C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\downloader\handlers\__init__.py

class DownloadHandlers(object):

    def __init__(self, crawler):
        self._schemes = {}  # stores acceptable schemes on instancing
        handlers = without_none_values(
            crawler.settings.getwithbase(DOWNLOAD_HANDLERS))
        for scheme, clspath in six.iteritems(handlers):  #字典 {scheme: clspath}
            self._schemes[scheme] = clspath

    def download_request(self, request, spider):
        scheme = urlparse_cached(request).scheme
        handler = self._get_handler(scheme)  #選擇 handler
        if not handler:
            raise NotSupported("Unsupported URL scheme ‘%s‘: %s" %
                               (scheme, self._notconfigured[scheme]))
        return handler.download_request(request, spider)

C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\downloader\handlers\http11.py

最後跟蹤到如下,

其中 agent 是 from twisted.web.client import Agent

L{Agent} is a very basic HTTP client. It supports I{HTTP} and I{HTTPS} scheme URIs

        d = agent.request(
            method, to_bytes(url, encoding=ascii), headers, bodyproducer)
        # set download latency
        d.addCallback(self._cb_latency, request, start_time)
        # response body is ready to be consumed
        d.addCallback(self._cb_bodyready, request)
        d.addCallback(self._cb_bodydone, request, url)
        # check download timeout
        self._timeout_cl = reactor.callLater(timeout, d.cancel)
        d.addBoth(self._cb_timeout, request, url, timeout)
        return d

正常則最終返回 response

return respcls(url=url, status=status, headers=headers, body=body, flags=flags)

停。

###fetch 將request交給handler拿到結果之後的回調 dwld.addCallbacks(_on_success) 和 dwld.addBoth(_on_complete)

C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\engine.py

class ExecutionEngine(object):

    def _download(self, request, spider):
        slot = self.slot
        slot.add_request(request)
        def _on_success(response):
            assert isinstance(response, (Response, Request))
            if isinstance(response, Response):
                response.request = request # tie request to response received
                logkws = self.logformatter.crawled(request, response, spider)
                logger.log(*logformatter_adapter(logkws), extra={spider: spider})
                self.signals.send_catch_log(signal=signals.response_received,                     response=response, request=request, spider=spider)
            return response   #如果結果是response,發出信號

        def _on_complete(_):
            slot.nextcall.schedule()  #觸發同心跳操作
            return _

        dwld = self.downloader.fetch(request, spider)
        dwld.addCallbacks(_on_success)
        dwld.addBoth(_on_complete)
        return dwld  #返回結果

3. d.addBoth(self._handle_downloader_output, request, spider) #拿到下載結果執行回調

###回調 d.addBoth(self._handle_downloader_output, request, spider)

C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\engine.py

class ExecutionEngine(object):

    def _handle_downloader_output(self, response, request, spider):
        assert isinstance(response, (Request, Response, Failure)), response    #結果只能是 request/response/failure
        # downloader middleware can return requests (for example, redirects)
        if isinstance(response, Request):   #結果是 request
            self.crawl(response, spider)
            return
        # response is a Response or Failure
        d = self.scraper.enqueue_scrape(response, request, spider)  #結果是 response/failure,交給scraper
        d.addErrback(lambda f: logger.error(Error while enqueuing downloader output,
                                            exc_info=failure_to_exc_info(f),
                                            extra={spider: spider}))
        return d

###回調 d.addBoth(lambda _: slot.remove_request(request))

###回調 d.addBoth(lambda _: slot.nextcall.schedule())

調度 engine._next_request_from_scheduler() 取出request交給handler,結果是request,執行engine.craw(),結果是resp/fail,下一步看scraper.enqueue_scrape()