1. 程式人生 > >【Python3爬蟲】快就完事了--使用Celery加速你的爬蟲

【Python3爬蟲】快就完事了--使用Celery加速你的爬蟲

一、寫在前面

  在上一篇部落格中提到過對於網路爬蟲這種包含大量網路請求的任務,是可以用Celery來做到加速爬取的,那麼,這一篇部落格就要具體說一下怎麼用Celery來對我們的爬蟲進行一個加速!

 

二、知識補充

1.class celery.group

  group這個類表示建立一組要並行執行的任務,不過一組任務是懶惰的,所以你需要執行並對其進行評估。要了解這個類,可以檢視文件,或者在Pycharm中直接Ctrl+左鍵就能直接檢視原始碼了,如下圖:

  

  當然了,直接看原始碼還不夠,最好還是自己動下手。所以先建立一個test.py,其中程式碼如下:

 1 from celery import Celery
 2 
 3 
 4 app = Celery("test", broker="redis://127.0.0.1:6379", backend="redis://127.0.0.1:6379")
 5 
 6 
 7 @app.task
 8 def add(x, y):
 9     return x + y
10 
11 
12 if __name__ == '__main__':
13     app.start()

  然後執行Celery伺服器,再在test.py所在目錄下建立一個test_run.py用於測試,其中程式碼如下:

 1 from celery import group
 2 from .test import add
 3 
 4 
 5 lazy_group = group(add.s(2, 2), add.s(4, 4))
 6 print(type(lazy_group))
 7 result = lazy_group()
 8 print(result)
 9 print(type(result))
10 print(result.get())

  在Pycharm中執行test_run.py,得到的結果如下:

<class 'celery.canvas.group'>

fe54f453-eb9c-4b24-87e3-a26fab75967f

<class 'celery.result.GroupResult'>

[4, 8]

   通過檢視原始碼可以知道,是可以往group中傳入一個由任務組成的可迭代物件的,所以這就進行一下測試,對上面的程式碼進行一點修改:

1 from celery import group
2 from CelerySpider.test import add
3 
4 
5 lazy_group = group(add.s(x, y) for x, y in zip([1, 3, 5, 7, 9], [2, 4, 6, 8, 10]))
6 result = lazy_group()
7 print(result)
8 print(result.get())

  執行之後得到了我們想要的結果:

f03387f1-af00-400b-b58a-37901563251d

[3, 7, 11, 15, 19]

2.celer.result.collect()

  在Celery中有一個類result,這個類包含了任務執行的結果和狀態等,而在這個類中就有一個collect()方法,使用該方法能在結果返回時收集結果。和之前一樣的步驟,先看看原始碼:

  

  這裡看原始碼也是看得一頭霧水,不如動手寫程式碼試試看。建立一個app.py,其中程式碼如下:

 1 from celery import Celery, group, result
 2 
 3 
 4 app = Celery("test", broker="redis://127.0.0.1:6379", backend="redis://127.0.0.1:6379")
 5 
 6 
 7 @app.task(trail=True)
 8 def A(how_many):
 9     return group(B.s(i) for i in range(how_many))()
10 
11 
12 @app.task(trail=True)
13 def B(i):
14     return pow2.delay(i)
15 
16 
17 @app.task(trail=True)
18 def pow2(i):
19     return i ** 2
20 
21 
22 if __name__ == '__main__':
23     app.start()

  可以看到在設定任務的時候都加了引數trail=True,這是為了儲存子任務列表執行後的結果,雖然是預設設定,但這裡明確啟用。在執行Celery伺服器之中,進入app.py同級目錄,輸入python,然後執行如下程式碼:

>>> from app import A
>>> res = A.delay(10)
>>> [i[1] for i in res.collect() if isinstance(i[1], int)]
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

 

三、具體步驟

1.專案結構

  這個爬蟲專案的基本檔案如下:

  

  其中app.py用於建立Celery例項,celeryconfig.py是Celery需要使用的配置檔案,tasks.py裡面的則是具體的任務,crawl.py是爬蟲指令碼,在開啟Celery伺服器之後,執行此檔案即可。

2.主要程式碼

  首先是app.py,程式碼如下,其中config_from_object()方法用於配置Celery,傳入的引數是一個可被匯入的模組:

 1 from celery import Celery
 2 
 3 
 4 app = Celery("spiders", include=["CelerySpider.tasks"])
 5 # 匯入配置檔案
 6 app.config_from_object("CelerySpider.celeryconfig")
 7 
 8 
 9 if __name__ == '__main__':
10     app.start()

   下面是tasks.py中的程式碼,其中包含了傳送請求和解析網頁的程式碼:

 1 import requests
 2 from lxml import etree
 3 from celery import group
 4 from CelerySpider.app import app
 5 
 6 
 7 headers = {
 8     "Cookie": "__cfduid=d5d815918f19b7370d14f80fc93f1f27e1566719058; UM_distinctid=16cc7bba92f7b6-0aac860ea9b9a7-7373e61-144000-16cc7bba930727; CNZZDATA1256911977=1379501843-1566718872-https%253A%252F%252Fwww.baidu.com%252F%7C1566718872; XSRF-TOKEN=eyJpdiI6InJvNVdZM0krZ1wvXC9BQjg3YUk5aGM1Zz09IiwidmFsdWUiOiI5WkI4QU42a0VTQUxKU2ZZelVxK1dFdVFydlVxb3g0NVpicEdkSGtyN0Uya3VkXC9pUkhTd2plVUtUTE5FNWR1aCIsIm1hYyI6Ijg4NjViZTQzNGRhZDcxNTdhMDZlMWM5MzI4NmVkOGZhNmRlNTBlYWM0MzUyODIyOWQ4ZmFhOTUxYjBjMTRmNDMifQ%3D%3D; doutula_session=eyJpdiI6IjFoK25pTG50azEwOXlZbmpWZGtacnc9PSIsInZhbHVlIjoiVGY2MU5Ob2pocnJsNVBLZUNMTWw5OVpjT0J6REJmOGVpSkZwNFlUZVwvd0tsMnZsaiszWEpTbEdyZFZ6cW9UR1QiLCJtYWMiOiIxZGQzNTJlNzBmYWE0MmQzMzQ0YzUzYmYwYmMyOWY3YzkxZjJlZTllNDdiZTlkODA2YmQ3YWRjNGRmZDgzYzNmIn0%3D",
 9     "Referer": "https://www.doutula.com/article/list/?page=1",
10     "UserAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36"
11 }
12 
13 
14 @app.task(trail=True)
15 def main(urls):
16     # 主函式
17     return group(call.s(url) for url in urls)()
18 
19 
20 @app.task(trail=True)
21 def call(url):
22     # 傳送請求
23     try:
24         res = requests.get(url, headers=headers)
25         parse.delay(res.text)
26     except Exception as e:
27         print(e)
28 
29 
30 @app.task(trail=True)
31 def parse(html):
32     # 解析網頁    
33     et = etree.HTML(html)
34     href_list = et.xpath('//*[@id="home"]/div/div[2]/a/@href')
35     result = []
36     for href in href_list:
37         href_res = requests.get(href, headers=headers)
38         href_et = etree.HTML(href_res.text)
39         src_list = href_et.xpath('//*[@class="artile_des"]/table/tbody/tr/td/a/img/@src') 
40         result.extend(src_list)
41     return result

  最後是crawl.py中的程式碼:

 1 import time
 2 from CelerySpider.tasks import main
 3 
 4 
 5 start_time = time.time()
 6 
 7 
 8 url_list = ["https://www.doutula.com/article/list/?page={}".format(i) for i in range(1, 31)]
 9 res = main.delay(url_list)
10 all_src = []
11 for i in res.collect():
12     if isinstance(i[1], list) and isinstance(i[1][0], str):
13         all_src.extend(i[1])
14 
15 print("Src count: ", len(all_src))
16 
17 
18 end_time = time.time()
19 print("Cost time: ", end_time - start_time)

   此次爬取的網站是一個表情包網站,url_list就表示要爬取的url,這裡我選擇爬取30頁來測試。all_src用於儲存表情包圖片的資源連結,通過collect()方法提取出要爬取的連結,然後將這些表情包下載下來,最後打印出下載的圖片數量和整個程式所耗費的時間。

 

 四、執行結果

  當執行Celery服務後,再執行crawl.py檔案,會看到如下資訊打印出來:

  

  當整個爬蟲執行完畢後,會打印出所耗費的時間:

  

 

  完整程式碼已上傳到GitHu