使用Tornado和協程爬取部落格園文章
Python3.5後 Tornado官方建議使用async和await的方式實現非同步程式,嘗試了下使用Tornado和協程爬取部落格園的文章並使用peewee_async非同步寫入MySQL資料庫。
一. 部落格園文章抓取測試:
這裡我以我自己的一篇文章詳情作為測試url,https://www.cnblogs.com/FG123/p/9934244.html ,主要是抓取文章標題、內容及作者資訊:
文章標題、內容、作者使用者名稱可通過上述的詳情頁url獲取,但是作者資訊需通過http://www.cnblogs.com/mvc/blog/news.aspx?blogApp=FG123獲取,FG123是我這篇文章的作者使用者名稱,下面看使用beautiful soup抓取測試的程式碼及結果:
detail_article_html = requests.get("https://www.cnblogs.com/FG123/p/9934244.html").content
author_profile_html = requests.get("http://www.cnblogs.com/mvc/blog/news.aspx?blogApp=FG123").content
detail_soup = BeautifulSoup(detail_article_html)
title = detail_soup.find(id="cb_post_title_url").get_text()
info = detail_soup.find(id="cnblogs_post_body")
author_soup = BeautifulSoup(author_profile_html)
author = author_soup.select('div > a')
author_name = author[0].get_text()
blog_age = author[1].get_text()
fans_num = author[2].get_text()
follow_num = author[3].get_text()
print("文章標題:{}".format(title))
print ("博主暱稱:{}".format(author_name))
print("博主園齡:{}".format(blog_age))
print("粉絲數:{}".format(fans_num))
print("關注數:{}".format(follow_num))
print("文章內容:{}".format(info))
結果:
二. 使用Tornado和協程非同步抓取邏輯:
這裡的抓取邏輯採用tornado官方文件爬蟲例子的邏輯,使用Tornado的Queue實現非同步生產者/消費者模式,當Queue滿時會切換協程,首先定義協程通過解析url獲取相關連結並去除無效的連結:
1 async def get_links_from_url(url):
2 """
3 通過AsyncHTTPClient非同步fetch url,
4 通過BeautifulSoup提取解析內容中的所有url
5 :param url:
6 :return:
7 """
8 response = await httpclient.AsyncHTTPClient().fetch(url)
9 print('fetched %s' % url)
10
11 html = response.body.decode("utf8", errors='ignore')
12 soup = BeautifulSoup(html)
13 return set([urljoin(url, remove_fragment(a.get("href")))
14 for a in soup.find_all("a", href=True)])
15
16
17 def remove_fragment(url):
18 """
19 去除無效的連結
20 :param url:
21 :return:
22 """
23 pure_url, frag = urldefrag(url)
24 return pure_url
View Code
當前url通過呼叫協程獲取它包含的有效url_list,並將非外連結的url放入tornado的queue中:
1 async def fetch_url(current_url):
2 """
3 fetching是已爬取過的url集合,
4 通過呼叫協程get_links_from_url獲取current_url所有的url,
5 並將 非外連結 放入到queue中
6 :param current_url:
7 :return:
8 """
9 if current_url in fetching:
10 return
11
12 print('fetching %s' % current_url)
13 fetching.add(current_url)
14 urls = await get_links_from_url(current_url)
15 fetched.add(current_url)
16
17 for new_url in urls:
18 # 非外連結
19 if new_url.startswith(base_url) and new_url.endswith(".html"):
20 await q.put(new_url)
View Code
使用async for的方式取出queue中的url,並呼叫協程fetch_url獲取它包含的urls,呼叫協程get_info_data獲取url頁面詳情資料:
1 async def worker():
2 """
3 使用async for的方式取出q中的url
4 並呼叫協程fetch_url獲取它包含的urls
5 呼叫協程get_info_data獲取url頁面詳情資料
6 :return:
7 """
8 async for url in q:
9 if url is None:
10 return
11 try:
12 await fetch_url(url)
13 await get_info_data(url)
14 except Exception as e:
15 print('Exception: %s %s' % (e, url))
16 finally:
17 q.task_done()
View Code
定義主協程,通過tornado的gen.multi同時初始化concurrency個協程,並將協程放入到事件迴圈中等待完成,等到佇列全部為空或超時的時候放入與協程數量相同的None來結束協程的事件迴圈。
1 async def main():
2 """
3 主協程,通過tornado的gen.multi同時初始化concurrency個協程,
4 並將協程放入到事件迴圈中等待完成,等到佇列全部為空或超時
5 :return:
6 """
7 q = queues.Queue()
8 start = time.time()
9 fetching, fetched = set(), set()
10
11 # 放入初始url到佇列
12 await q.put(base_url)
13
14 workers = gen.multi([worker() for _ in range(concurrency)])
15 await q.join(timeout=timedelta(seconds=300))
16 assert fetching == fetched
17 print('Done in %d seconds, fetched %s URLs.' % (
18 time.time() - start, len(fetched)))
19
20 # 佇列中放入concurrency數量的None 結束相應協程 在worker()中取到None會結束
21 for _ in range(concurrency):
22 await q.put(None)
23
24 await workers
View Code
三. 使用peewee_async和aiomysql將爬取的資料非同步寫入MySQL資料庫
使用peewee建立並生成model:
1 # coding:utf-8
2 from peewee import *
3 import peewee_async
4
5 database = peewee_async.MySQLDatabase(
6 'xxx', host="192.168.xx.xx",
7 port=3306, user="root", password="xxxxxx"
8 )
9
10 objects = peewee_async.Manager(database)
11
12 database.set_allow_sync(True)
13
14
15 class Blogger(Model):
16 article_id = CharField(max_length=50, verbose_name="文章ID")
17 title = CharField(max_length=150, verbose_name="標題")
18 content = TextField(null=True, verbose_name="內容")
19 author_name = CharField(max_length=50, verbose_name="博主暱稱")
20 blog_age = CharField(max_length=50, verbose_name="園齡")
21 fans_num = IntegerField(null=True, verbose_name="粉絲數")
22 follow_num = IntegerField(null=True, verbose_name="關注數")
23
24 class Meta:
25 database = database
26 table_name = "blogger"
27
28
29 def init_table():
30 database.create_tables([Blogger])
31
32
33 if __name__ == "__main__":
34 init_table()
View Code
獲取部落格文章的詳情資訊,並將資訊非同步寫入MySQL資料庫:
1 async def get_info_data(url):
2 """
3 獲取詳情資訊並非同步寫入MySQL資料庫
4 :param url:
5 :return:
6 """
7 response = await httpclient.AsyncHTTPClient().fetch(url)
8 html = response.body.decode("utf8")
9 soup = BeautifulSoup(html)
10 title = soup.find(id="cb_post_title_url").get_text()
11 content = soup.find(id="cnblogs_post_body")
12 name = url.split("/")[3]
13 article_id = url.split("/")[-1].split(".")[0]
14 author_url = "http://www.cnblogs.com/mvc/blog/news.aspx?blogApp={}".format(name)
15 author_response = await httpclient.AsyncHTTPClient().fetch(author_url)
16 author_html = author_response.body.decode("utf8")
17 author_soup = BeautifulSoup(author_html)
18 author = author_soup.select('div > a')
19 author_name = author[0].get_text()
20 blog_age = author[1].get_text()
21 fans_num = author[2].get_text()
22 follow_num = author[3].get_text()
23 await objects.create(
24 Blogger, title=title,
25 article_id=article_id,
26 content=content,
27 author_name=author_name,
28 blog_age=blog_age,
29 fans_num=fans_num,
30 follow_num=follow_num
31 )
View Code
爬取結果:
簡單體驗了下使用Tornado結合協程的方式爬取部落格園,這裡我開啟了10個協程,已經感覺速度很快了,協程間的切換開銷是非常小的,而且一個執行緒或程序可以擁有多個協程,經過實測相比多執行緒的爬蟲確實要快些。