使用Tornado和協程爬取部落格園文章
Python3.5後 Tornado官方建議使用async和await的方式實現非同步程式,嘗試了下使用Tornado和協程爬取部落格園的文章並使用peewee_async非同步寫入SQL/">MySQL資料庫。
一. 部落格園文章抓取測試:
這裡我以我自己的一篇文章詳情作為測試url, ofollow,noindex">https://www.cnblogs.com/FG123/p/9934244.html ,主要是抓取文章標題、內容及作者資訊:
文章標題、內容、作者使用者名稱可通過上述的詳情頁url獲取,但是作者資訊需通過http://www.cnblogs.com/mvc/blog/news.aspx?blogApp=FG123獲取,FG123是我這篇文章的作者使用者名稱,下面看使用beautiful soup抓取測試的程式碼及結果:
detail_article_html = requests.get("https://www.cnblogs.com/FG123/p/9934244.html").content author_profile_html = requests.get("http://www.cnblogs.com/mvc/blog/news.aspx?blogApp=FG123").content detail_soup = BeautifulSoup(detail_article_html) title = detail_soup.find(id="cb_post_title_url").get_text() info = detail_soup.find(id="cnblogs_post_body") author_soup = BeautifulSoup(author_profile_html) author = author_soup.select('div > a') author_name = author[0].get_text() blog_age = author[1].get_text() fans_num = author[2].get_text() follow_num = author[3].get_text() print("文章標題:{}".format(title)) print("博主暱稱:{}".format(author_name)) print("博主園齡:{}".format(blog_age)) print("粉絲數:{}".format(fans_num)) print("關注數:{}".format(follow_num)) print("文章內容:{}".format(info))
結果:
二. 使用Tornado和協程非同步抓取邏輯:
這裡的抓取邏輯採用 tornado官方文件爬蟲例子 的邏輯,使用Tornado的Queue實現非同步生產者/消費者模式,當Queue滿時會切換協程,首先定義協程通過解析url獲取相關連結並去除無效的連結:
1 async def get_links_from_url(url): 2""" 3通過AsyncHTTPClient非同步fetch url, 4通過BeautifulSoup提取解析內容中的所有url 5:param url: 6:return: 7""" 8response = await httpclient.AsyncHTTPClient().fetch(url) 9print('fetched %s' % url) 10 11html = response.body.decode("utf8", errors='ignore') 12soup = BeautifulSoup(html) 13return set([urljoin(url, remove_fragment(a.get("href"))) 14for a in soup.find_all("a", href=True)]) 15 16 17 def remove_fragment(url): 18""" 19去除無效的連結 20:param url: 21:return: 22""" 23pure_url, frag = urldefrag(url) 24return pure_url View Code
當前url通過呼叫協程獲取它包含的有效url_list,並將非外連結的url放入tornado的queue中:
1 async def fetch_url(current_url): 2""" 3fetching是已爬取過的url集合, 4通過呼叫協程get_links_from_url獲取current_url所有的url, 5並將 非外連結 放入到queue中 6:param current_url: 7:return: 8""" 9if current_url in fetching: 10return 11 12print('fetching %s' % current_url) 13fetching.add(current_url) 14urls = await get_links_from_url(current_url) 15fetched.add(current_url) 16 17for new_url in urls: 18# 非外連結 19if new_url.startswith(base_url) and new_url.endswith(".html"): 20await q.put(new_url) View Code
使用async for的方式取出queue中的url, 並呼叫協程fetch_url獲取它包含的urls, 呼叫協程get_info_data獲取url頁面詳情資料:
1 async def worker(): 2""" 3使用async for的方式取出q中的url 4並呼叫協程fetch_url獲取它包含的urls 5呼叫協程get_info_data獲取url頁面詳情資料 6:return: 7""" 8async for url in q: 9if url is None: 10return 11try: 12await fetch_url(url) 13await get_info_data(url) 14except Exception as e: 15print('Exception: %s %s' % (e, url)) 16finally: 17q.task_done() View Code
定義 主協程,通過tornado的gen.multi同時初始化concurrency個協程, 並將協程放入到事件迴圈中等待完成,等到佇列全部為空或超時的時候放入與協程 數量相同的None來結束協程的事件迴圈。
1 async def main(): 2""" 3主協程,通過tornado的gen.multi同時初始化concurrency個協程, 4並將協程放入到事件迴圈中等待完成,等到佇列全部為空或超時 5:return: 6""" 7q = queues.Queue() 8start = time.time() 9fetching, fetched = set(), set() 10 11# 放入初始url到佇列 12await q.put(base_url) 13 14workers = gen.multi([worker() for _ in range(concurrency)]) 15await q.join(timeout=timedelta(seconds=300)) 16assert fetching == fetched 17print('Done in %d seconds, fetched %s URLs.' % ( 18time.time() - start, len(fetched))) 19 20# 佇列中放入concurrency數量的None 結束相應協程 在worker()中取到None會結束 21for _ in range(concurrency): 22await q.put(None) 23 24await workers View Code
三. 使用 peewee_async和aiomysql將爬取的資料非同步寫入MySQL資料庫
使用 peewee建立並生成model:
1 # coding:utf-8 2 from peewee import * 3 import peewee_async 4 5 database = peewee_async.MySQLDatabase( 6'xxx', host="192.168.xx.xx", 7port=3306, user="root", password="xxxxxx" 8 ) 9 10 objects = peewee_async.Manager(database) 11 12 database.set_allow_sync(True) 13 14 15 class Blogger(Model): 16article_id = CharField(max_length=50, verbose_name="文章ID") 17title = CharField(max_length=150, verbose_name="標題") 18content = TextField(null=True, verbose_name="內容") 19author_name = CharField(max_length=50, verbose_name="博主暱稱") 20blog_age = CharField(max_length=50, verbose_name="園齡") 21fans_num = IntegerField(null=True, verbose_name="粉絲數") 22follow_num = IntegerField(null=True, verbose_name="關注數") 23 24class Meta: 25database = database 26table_name = "blogger" 27 28 29 def init_table(): 30database.create_tables([Blogger]) 31 32 33 if __name__ == "__main__": 34init_table() View Code
獲取部落格文章的詳情資訊,並將資訊非同步寫入MySQL資料庫:
1 async def get_info_data(url): 2""" 3獲取詳情資訊並非同步寫入MySQL資料庫 4:param url: 5:return: 6""" 7response = await httpclient.AsyncHTTPClient().fetch(url) 8html = response.body.decode("utf8") 9soup = BeautifulSoup(html) 10title = soup.find(id="cb_post_title_url").get_text() 11content = soup.find(id="cnblogs_post_body") 12name = url.split("/")[3] 13article_id = url.split("/")[-1].split(".")[0] 14author_url = "http://www.cnblogs.com/mvc/blog/news.aspx?blogApp={}".format(name) 15author_response = await httpclient.AsyncHTTPClient().fetch(author_url) 16author_html = author_response.body.decode("utf8") 17author_soup = BeautifulSoup(author_html) 18author = author_soup.select('div > a') 19author_name = author[0].get_text() 20blog_age = author[1].get_text() 21fans_num = author[2].get_text() 22follow_num = author[3].get_text() 23await objects.create( 24Blogger, title=title, 25article_id=article_id, 26content=content, 27author_name=author_name, 28blog_age=blog_age, 29fans_num=fans_num, 30follow_num=follow_num 31) View Code
爬取結果:
簡單體驗了下使用Tornado結合協程的方式爬取部落格園,這裡我開啟了10個協程,已經感覺速度很快了,協程間的切換開銷是非常小的,而且一個執行緒或程序可以擁有多個協程,經過實測相比多執行緒的爬蟲確實要快些。