1. 程式人生 > >Scrapy Pipeline之處理CPU密集型或阻塞型操作

Scrapy Pipeline之處理CPU密集型或阻塞型操作

Twisted框架的reactor適合於處理短的、非阻塞的操作。但是如果要處理一些複雜的、或者包含阻塞的操作又該怎麼辦呢?Twisted提供了執行緒池來在其他的執行緒而不是主執行緒(Twisted的reactor執行緒)中執行慢的操作——使用reactor.callInThread() API。這就意味著reactor在執行計算時還能保持執行並對事件做出反應。一定要記住執行緒池中的處理不是執行緒安全的。這就意味著當你使用了全域性的狀態之後,還要面臨所有那些傳統的多執行緒程式設計的同步問題。下面是一個簡單的例子:

class UsingBlocking(object):
    @defer.inlineCallbacks
def process_item(self, item, spider): price = item["price"][0] out = defer.Deferred() reactor.callInThread(self._do_calculation, price, out) item["price"][0] = yield out defer.returnValue(item) def _do_calculation(self, price, out): new_price = price + 1
time.sleep(0.10) reactor.callFromThread(out.callback, new_price)

在上面的Pipeline中,對於每個Item,我們提取出它的price欄位,想要在_do_caculation()方法中對它進行處理。這個方法使用了time.sleep(),一個阻塞的操作。我們呼叫reactor.callInThread()方法使它執行在另一個執行緒中,該方法的第一個引數是想要呼叫的函式,後面的引數則會全部傳遞給被呼叫的函式作為引數。在這裡我們給被呼叫的函式傳遞了price,還有一個建立的Deferred物件out。當_do_caculation()

函式完成計算後,我們會使用out的回撥函式來返回這個值。接下來,yield這個 Deferred物件併為price設定一個新的值,最後返回Item

_do_caculation()函式中我們把price加一,然後休眠了100ms。其實這個時間是很長的,如果在reactor的執行緒中呼叫這個函式,那就意味著我們每秒只能處理不超過10個頁面。不過如果把它放在另一個執行緒中來呼叫就不會出現這種問題了。這些計算任務會線上程池中排隊,等待某個執行緒處於可用狀態,然後這個執行緒就會執行這個任務,休眠100ms。最後一步是啟用out的回撥函式。通常情況下,我們可以這樣來啟用:out.callback(new_price),但是既然現在我們處於另外一個執行緒中,這樣做就不安全了。如果我們執意這樣做了,這個Deferred物件的程式碼,也就是Scrapy的功能就會在別的執行緒中執行,這樣會導致資料被損壞。所以我們呼叫了reactor.callFromThread()函式,同樣的,它也是以一個函式作為引數,並把額外的引數直接傳遞給被呼叫的函式。這個函式會在主執行緒中排隊並等待被呼叫,它反過來解鎖了process_item()方法中的yield語句,並恢復Scrapy對這個Item的操作。

如果我們的pipeline中含有全域性狀態會怎麼樣呢?比如,計數器或者平均值等,我們需要在_do_caculation()函式中使用的。例如有以下兩個變數,beta和delta:

class UsingBlocking(object):
    def __init__(self):
        self.beta, self.delta = 0, 0
        ...

    def _do_calculation(self, price, out):
        self.beta += 1
        time.sleep(0.001)
        self.delta += 1
        new_price = price + self.beta - self.delta + 1
        assert abs(new_price-price-1) < 0.01

        time.sleep(0.10)...

上面的程式碼有一些問題,並且在執行的時候會給出assertion錯誤。這是因為,如果一個執行緒在self.beta += 1self.delta += 1語句之間切換的話,另一個執行緒就會恢復執行並使用beta和delta的值來計算price,這裡執行緒會發現這兩個值處於不一致的狀態(beta比delta大),這樣,錯誤的產生了。中間短的sleep會讓執行緒切換更可能發生,不過即使沒有它,同樣也會出現競態條件。為了阻止競態條件的發生,我們必須使用鎖,例如Python的threading.RLock()鎖。使用了這個遞迴鎖,就能確保兩個執行緒不會同時執行鎖保護的臨界區的程式碼:

class UsingBlocking(object):
    def __init__(self):
        ...
        self.lock = threading.RLock()
        ...

    def _do_calculation(self, price, out):
        with self.lock:
            self.beta += 1
            ...
            new_price = price + self.beta - self.delta + 1
        assert abs(new_price-price-1) < 0.01 ...

現在的程式碼就沒問題了,要注意的是,我們不需要保護整個程式碼,只需要能夠覆蓋全域性狀態的使用就行了。

ITEM_PIPELINES中加上:

ITEM_PIPELINES = { ...
    'properties.pipelines.computation.UsingBlocking': 500,
}

執行一下會發現,時延由於100ms的休眠的緣故變調了,不過吞吐量還是保持不變,大約每秒25個。