1. 程式人生 > >使用Tornado和協程爬取部落格園文章

使用Tornado和協程爬取部落格園文章

Python3.5後 Tornado官方建議使用async和await的方式實現非同步程式,嘗試了下使用Tornado和協程爬取部落格園的文章並使用peewee_async非同步寫入MySQL資料庫。

一. 部落格園文章抓取測試:

這裡我以我自己的一篇文章詳情作為測試url,https://www.cnblogs.com/FG123/p/9934244.html ,主要是抓取文章標題、內容及作者資訊:

 

文章標題、內容、作者使用者名稱可通過上述的詳情頁url獲取,但是作者資訊需通過http://www.cnblogs.com/mvc/blog/news.aspx?blogApp=FG123獲取,FG123是我這篇文章的作者使用者名稱,下面看使用beautiful soup抓取測試的程式碼及結果:

detail_article_html = requests.get("https://www.cnblogs.com/FG123/p/9934244.html").content
author_profile_html = requests.get("http://www.cnblogs.com/mvc/blog/news.aspx?blogApp=FG123").content
detail_soup = BeautifulSoup(detail_article_html)
title = detail_soup.find(id="cb_post_title_url").get_text()
info 
= detail_soup.find(id="cnblogs_post_body") author_soup = BeautifulSoup(author_profile_html) author = author_soup.select('div > a') author_name = author[0].get_text() blog_age = author[1].get_text() fans_num = author[2].get_text() follow_num = author[3].get_text() print("文章標題:{}".format(title)) print
("博主暱稱:{}".format(author_name)) print("博主園齡:{}".format(blog_age)) print("粉絲數:{}".format(fans_num)) print("關注數:{}".format(follow_num)) print("文章內容:{}".format(info))

結果:

 

二. 使用Tornado和協程非同步抓取邏輯:

這裡的抓取邏輯採用tornado官方文件爬蟲例子的邏輯,使用Tornado的Queue實現非同步生產者/消費者模式,當Queue滿時會切換協程,首先定義協程通過解析url獲取相關連結並去除無效的連結:

 

 1 async def get_links_from_url(url):
 2     """
 3     通過AsyncHTTPClient非同步fetch url,
 4     通過BeautifulSoup提取解析內容中的所有url
 5     :param url:
 6     :return:
 7     """
 8     response = await httpclient.AsyncHTTPClient().fetch(url)
 9     print('fetched %s' % url)
10 
11     html = response.body.decode("utf8", errors='ignore')
12     soup = BeautifulSoup(html)
13     return set([urljoin(url, remove_fragment(a.get("href")))
14             for a in soup.find_all("a", href=True)])
15 
16 
17 def remove_fragment(url):
18     """
19     去除無效的連結
20     :param url: 
21     :return: 
22     """
23     pure_url, frag = urldefrag(url)
24     return pure_url
View Code

 

當前url通過呼叫協程獲取它包含的有效url_list,並將非外連結的url放入tornado的queue中:

 1 async def fetch_url(current_url):
 2         """
 3         fetching是已爬取過的url集合,
 4         通過呼叫協程get_links_from_url獲取current_url所有的url,
 5         並將 非外連結 放入到queue中
 6         :param current_url:
 7         :return:
 8         """
 9         if current_url in fetching:
10             return
11 
12         print('fetching %s' % current_url)
13         fetching.add(current_url)
14         urls = await get_links_from_url(current_url)
15         fetched.add(current_url)
16 
17         for new_url in urls:
18             # 非外連結
19             if new_url.startswith(base_url) and new_url.endswith(".html"):
20                 await q.put(new_url)
View Code

 

使用async for的方式取出queue中的url,並呼叫協程fetch_url獲取它包含的urls,呼叫協程get_info_data獲取url頁面詳情資料:

 1 async def worker():
 2         """
 3         使用async for的方式取出q中的url
 4         並呼叫協程fetch_url獲取它包含的urls
 5         呼叫協程get_info_data獲取url頁面詳情資料
 6         :return:
 7         """
 8         async for url in q:
 9             if url is None:
10                 return
11             try:
12                 await fetch_url(url)
13                 await get_info_data(url)
14             except Exception as e:
15                 print('Exception: %s %s' % (e, url))
16             finally:
17                 q.task_done()
View Code

 

定義主協程,通過tornado的gen.multi同時初始化concurrency個協程,並將協程放入到事件迴圈中等待完成,等到佇列全部為空或超時的時候放入與協程數量相同的None來結束協程的事件迴圈。

 1 async def main():
 2     """
 3     主協程,通過tornado的gen.multi同時初始化concurrency個協程,
 4     並將協程放入到事件迴圈中等待完成,等到佇列全部為空或超時
 5     :return:
 6     """
 7     q = queues.Queue()
 8     start = time.time()
 9     fetching, fetched = set(), set()
10 
11     # 放入初始url到佇列
12     await q.put(base_url)
13 
14     workers = gen.multi([worker() for _ in range(concurrency)])
15     await q.join(timeout=timedelta(seconds=300))
16     assert fetching == fetched
17     print('Done in %d seconds, fetched %s URLs.' % (
18         time.time() - start, len(fetched)))
19 
20     # 佇列中放入concurrency數量的None 結束相應協程 在worker()中取到None會結束
21     for _ in range(concurrency):
22         await q.put(None)
23         
24     await workers
View Code

 

三. 使用peewee_async和aiomysql將爬取的資料非同步寫入MySQL資料庫

使用peewee建立並生成model:

 

 1 # coding:utf-8
 2 from peewee import *
 3 import peewee_async
 4 
 5 database = peewee_async.MySQLDatabase(
 6     'xxx', host="192.168.xx.xx",
 7     port=3306, user="root", password="xxxxxx"
 8 )
 9 
10 objects = peewee_async.Manager(database)
11 
12 database.set_allow_sync(True)
13 
14 
15 class Blogger(Model):
16     article_id = CharField(max_length=50, verbose_name="文章ID")
17     title = CharField(max_length=150, verbose_name="標題")
18     content = TextField(null=True, verbose_name="內容")
19     author_name = CharField(max_length=50, verbose_name="博主暱稱")
20     blog_age = CharField(max_length=50, verbose_name="園齡")
21     fans_num = IntegerField(null=True, verbose_name="粉絲數")
22     follow_num = IntegerField(null=True, verbose_name="關注數")
23 
24     class Meta:
25         database = database
26         table_name = "blogger"
27 
28 
29 def init_table():
30     database.create_tables([Blogger])
31 
32 
33 if __name__ == "__main__":
34     init_table()
View Code

 

獲取部落格文章的詳情資訊,並將資訊非同步寫入MySQL資料庫:

 1 async def get_info_data(url):
 2     """
 3     獲取詳情資訊並非同步寫入MySQL資料庫
 4     :param url:
 5     :return:
 6     """
 7     response = await httpclient.AsyncHTTPClient().fetch(url)
 8     html = response.body.decode("utf8")
 9     soup = BeautifulSoup(html)
10     title = soup.find(id="cb_post_title_url").get_text()
11     content = soup.find(id="cnblogs_post_body")
12     name = url.split("/")[3]
13     article_id = url.split("/")[-1].split(".")[0]
14     author_url = "http://www.cnblogs.com/mvc/blog/news.aspx?blogApp={}".format(name)
15     author_response = await httpclient.AsyncHTTPClient().fetch(author_url)
16     author_html = author_response.body.decode("utf8")
17     author_soup = BeautifulSoup(author_html)
18     author = author_soup.select('div > a')
19     author_name = author[0].get_text()
20     blog_age = author[1].get_text()
21     fans_num = author[2].get_text()
22     follow_num = author[3].get_text()
23     await objects.create(
24         Blogger, title=title,
25         article_id=article_id,
26         content=content,
27         author_name=author_name,
28         blog_age=blog_age,
29         fans_num=fans_num,
30         follow_num=follow_num
31     )
View Code

 

爬取結果:

 簡單體驗了下使用Tornado結合協程的方式爬取部落格園,這裡我開啟了10個協程,已經感覺速度很快了,協程間的切換開銷是非常小的,而且一個執行緒或程序可以擁有多個協程,經過實測相比多執行緒的爬蟲確實要快些。