Python3 爬蟲知識梳理(實戰篇)
這一節打算爬取貓眼電影的 top 100 的電影資訊,我們首先可以訪問一下我們需要爬取的網站,看一下我們需要的資訊所處的位置和結構如何
看完以後我們的思路應該就比較清晰了,我們首先使用 requests 庫請求單頁內容,然後我們使用正則對我們需要的資訊進行匹配,然後將我們需要的每一條資訊儲存成一個JSON 字串,並將其存入檔案當中,然後就是開啟迴圈遍歷十頁的內容或者採用 Python 多執行緒的方式提高爬取速度
2.程式碼實現
spider.py
import requests import json from requests.exceptions import RequestException import re from multiprocessing import Pool requests.packages.urllib3.disable_warnings() def get_one_page(url): try: headers = { 'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36', } res = requests.get(url,headers=headers,verify=False) if res.status_code == 200: return res.text return None except RequestException: return None def parse_one_page(html): pattern = re.compile('<dd>.*?board-index.*?(\d+)</i>.*?data-src="(.*?)".*?alt="(\w+)".*?"star">' '(.*?)</p>.*?"releasetime">(.*?)</p>.*?integer">(.*?)</i>.*?fraction">(\d)</i>',re.S) items = re.findall(pattern,html) for item in items: #這裡使用 yield 將該函式變成了一個可迭代物件並且每次能返回自己定義好格式的資料 yield { 'index': item[0], 'image': item[1], 'name': item[2], 'actor':item[3].strip()[3:], 'time': item[4].strip()[5:], 'score': item[5]+item[6] } def write_to_file(content): with open('result.txt','a',encoding='utf-8') as f: f.write(json.dumps(content,ensure_ascii=False) + '\n' ) def main(offset): url = "http://maoyan.com/board/4?offset=" + str(offset) html = get_one_page(url) for item in parse_one_page(html): write_to_file(item) if __name__ == '__main__': pool = Pool() pool.map(main,[i*10 for i in range(10)])
3.執行效果
0X02 模擬 Ajax 請求抓取今日頭條街拍美圖
1.分析網頁確定思路
首先我們開啟頭條街拍的頁面,我們發現我們看到的詳細頁連結直接在原始碼中並不能找到,於是我們就需要去檢視我們的 ajax 請求,看看是不是通過 ajax 載入的,我們可以開啟瀏覽器控制檯,我們過濾 XHR 請求有了一些發現,如下圖:
在 xhr 請求中 offset 為 0 的部分,頁面中的 data 為 0 的 資料部分清楚地地顯示了我們想要查詢的詳細頁的資料,然後隨著我們滾動條的下拉,頁面會不斷髮起 xhr 請求,offset 會隨之不斷的增大,每次增大的數目為 10 ,實際上是通過 ajax 去請求索引頁,每次返回的 json 結果中有10條詳細頁的資料,這樣我們就能不斷在頁面中獲取到街拍新聞的資訊。
有了街拍新聞,自然我們還要進入新聞中獲取街拍的美圖,我們看一下新聞內部的圖片是怎麼獲取的,如下圖所示:
很明顯,街拍真正的圖片的 URL 是通過網頁中的 js 變數的方式獲取的,我們考慮使用 正則 來獲取,另外,頁面第一個 title 標籤裡面有該詳細頁面的名稱,我們可以使用 BeautifulSoup 來提取出來
思路梳理:
(1)使用 requests 庫去去請求網站,並獲取索引網頁(ajax 請求的 url)返回的 json 程式碼
(2)從索引網頁中提取出詳細頁面的 URL,並進一步抓取詳細頁的資訊
(3)通過正則匹配詳細頁中的圖片連結,並將其下載到本地,並將頁面資訊和圖片的 URL 儲存到本地的 MongoDB
(4)對多個索引頁進行迴圈抓取,並開啟多執行緒的方式提高效率
2.程式碼實現
config.py
MONGO_URL = 'localhost' MONGO_DB = 'toutiao' MONGO_TABLE = 'toutiao' GROUP_STATR = 0 GROUP_END = 5 KEYWORD = '街拍' IMAGE_DIR = 'DOWNLOADED'
spider.py
import requests import re from bs4 import BeautifulSoup from urllib.parse import urlencode import json from requests.exceptions import RequestException from config import * import pymongo import os from hashlib import md5 from multiprocessing import Pool # 宣告 mongodb 資料庫物件 client = pymongo.MongoClient(MONGO_URL) db = client[MONGO_DB] def get_page_index(offset,keyword): data = { 'aid': 24, 'app_name': 'web_search', 'offset': offset, 'format': 'json', 'keyword': keyword, 'autoload': 'true', 'count': 20, 'en_qc': 1, 'cur_tab': 1, 'from': 'search_tab', 'pd': 'synthesis', 'timestamp': 1556970196243, } headers = { 'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36', 'Cookie':'...' } url = 'https://www.toutiao.com/api/search/content/?' + urlencode(data) try: res = requests.get(url,headers=headers) res.encoding = 'utf-8' if res.status_code == 200: return res.text return None except RequestException: print('requests index page error') return None def parse_page_index(html): data = json.loads(html) if data and 'data' in data.keys(): for item in data.get('data'): yield item.get('article_url') def get_page_detail(url): headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36', 'Cookie': '...' } try: res = requests.get(url, headers=headers) res.encoding = 'utf-8' if res.status_code == 200: return res.text return None except RequestException: #print('requests detail page error',url) return None def parse_page_detail(html,url): soup = BeautifulSoup(html,'html.parser') title = soup.select('title')[0].get_text() pattern = re.compile("articleInfo: {.*?content: '(.*?);',",re.S) images = re.search(pattern,html) if images: images_pattern = re.compile("<img src="(.*?)" img_width="") res = re.findall(images_pattern,images.group(1)) for image_url in res: dir_name = re.sub(r'[\\\\/:*?|"<> ]','',title) download_image(image_url,dir_name[:10]) return { 'title': title, 'url': url, 'images': res, } def save_to_mongo(result): if db[MONGO_TABLE].insert(result): print("成功儲存到 mongodb 資料庫",result) return True return False def download_image(url,dir_name): print('正在下載:',url) headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36', 'Cookie': '...' } try: res = requests.get(url, headers=headers) if res.status_code == 200: # 儲存二進位制資料的時候使用content save_image(dir_name,res.content) return None except RequestException: print('requests image error',url) return None def save_image(dir_name,content): if not os.path.exists(IMAGE_DIR + '/' + dir_name): os.makedirs(IMAGE_DIR + '/' + dir_name) file_path = '{0}\\{1}\\{2}\\{3}.{4}'.format(os.getcwd(),IMAGE_DIR,dir_name,md5(content).hexdigest(),'jpg') if not os.path.exists(file_path): with open(file_path,'wb') as f: f.write(content) def main(offset): html = get_page_index(offset,KEYWORD) #print(html) for url in parse_page_index(html): #print(url) html = get_page_detail(url) if html: result = parse_page_detail(html,url) if result: #print(result) save_to_mongo(result) if __name__ == '__main__': groups = [x*20 for x in range(GROUP_STATR,GROUP_END + 1)] pool = Pool() pool.map(main,groups)
3.執行效果
0X03 使用Selenium模擬瀏覽器抓取淘寶商品美食資訊
眾所周知,淘寶的網頁是非常複雜的,我們按照上面的模擬 Ajax 的請求去獲取 json 資料並且解析的方式已經不那麼好用了,於是我們要祭出我們的終極殺器—-Selenium ,這個庫可以呼叫瀏覽器驅動或者是 phantomjs 來模擬瀏覽器的請求,有了它我們就可以通過指令碼去驅動瀏覽器,這樣哪些動態載入的資料就不用我們自己去獲取了,非常方便。
1.分析網頁確定思路
開啟淘寶,輸入“美食”,回車
我們想要獲取網頁上載入的圖片,但是我們找到頁面的原始請求的頁面的結果,我們會發現當我們剛一翻就已經出現頁尾的程式碼了,實際上頁面的主體還不知道在哪,我嘗試翻找了一下 XHR 請求發現依然不是很明顯,這種情況下為了減輕我們的抓取負擔,我們可以使用 selenium 配合 Chromedriver 去獲取載入好的完整頁面,然後我們再使用正則去抓取圖片,這樣就非常輕鬆容易了。
思路梳理:
(1)利用 selenium 庫配合chromedriver 請求淘寶並輸入“美食”搜尋引數,獲取商品列表
(2)獲取頁碼,並模擬滑鼠點選操作獲取後面頁碼的商品資訊
(3)使用 PyQuery 分析原始碼,得到商品的詳細資訊
(4)將商品資訊儲存到 MongoDB 資料庫中
2.程式碼實現
config.py
MONGO_URL = 'localhost' MONGO_DB = 'taobao' MONGO_TABLE = 'product'
spider.py
from selenium import webdriver from selenium.common.exceptions import TimeoutException from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC import re from pyquery import PyQuery as pq from config import * import pymongo client = pymongo.MongoClient(MONGO_URL) db = client[MONGO_DB] browser = webdriver.Chrome() wait = WebDriverWait(browser, 100) def search(): try: browser.get('https://www.taobao.com/') # 判斷所需的元素是否載入成功(wait until 中會存在判斷條件,因此常常用作判斷) input = wait.until( EC.presence_of_element_located((By.CSS_SELECTOR, "#q")) ) submit = wait.until( EC.element_to_be_clickable((By.CSS_SELECTOR, "#J_TSearchForm > div.search-button > button")) ) #輸入+點選 input.send_keys("美食") submit.click() #檢視頁數是否載入成功 total = wait.until( EC.presence_of_element_located((By.CSS_SELECTOR, "#mainsrp-pager > div > div > div > div.total")) ) get_products() return total.text except TimeoutException: return search() def next_page(page_number): try: input = wait.until( EC.presence_of_element_located((By.CSS_SELECTOR, "#mainsrp-pager > div > div > div > div.form > input")) ) submit = wait.until( EC.element_to_be_clickable((By.CSS_SELECTOR, "#mainsrp-pager > div > div > div > div.form > span.btn.J_Submit")) ) input.clear() input.send_keys(page_number) submit.click() wait.until( EC.text_to_be_present_in_element((By.CSS_SELECTOR, "#mainsrp-pager > div > div > div > ul > li.item.active > span"),str(page_number)) ) get_products() except TimeoutException: next_page(page_number) def get_products(): wait.until( # 這裡的 CSS 是手寫的,因為從控制檯複製的話只能得到一個 item EC.presence_of_element_located((By.CSS_SELECTOR, "#mainsrp-itemlist .items .item")) ) html = browser.page_source doc = pq(html) items = doc('#mainsrp-itemlist .items .item').items() for item in items: product = { 'title': item.find('.title').text(), 'image': item.find('.pic .img').attr('src'), 'price': item.find('.price').text(), 'deal': item.find('.deal-cnt').text()[:-3], 'shop': item.find('.shop').text(), 'location':item.find('.location').text(), } print(product) save_to_mongo(product) def save_to_mongo(result): try: if db[MONGO_TABLE].insert(result): print("儲存到 MongoDB 成功",result) except Exception: print("儲存到 MongoDB 失敗") def main(): try: total = int(re.compile('(\d+)').search(search()).group(1)) for i in range(2,total + 1): next_page(i) except Exception: print('出錯了') finally: browser.close() if __name__ == '__main__': main()
3.執行效果
4.存在問題
事實上這個指令碼並不能完全實現自動化,因為由我們 selenium + chromdriver 開啟的淘寶在搜尋的時候回彈出登入提示框,我們還需要手動去登入一下才能進行下面的爬取工作,聽起來似乎不是很要緊,現在登陸一下只要掃描以下二維碼就可以了,但是這樣我們就沒法使用 chrome headless 模式進行靜默訪問,很是不爽,於是我們還需要對這段程式碼進行改進。
5.嘗試解決
對於 headless 問題,我的解決思路是這樣的,因為我們想要用二維碼登入,那樣的話我們必須要求出現介面,但是這個介面的作用僅僅是一個登入,於是我考慮使用兩個 driver ,一個專門用來登入,然後將登入後的 cookie 儲存起來,儲存在檔案中,另一個負責爬取資料的 driver 使用 Headless 模式,然後迴圈讀取本地儲存好的 cookie 訪問網站,這樣就很優雅的解決了我們的問題,下面是我改進後的程式碼:
spiser.py
import json from selenium import webdriver from selenium.common.exceptions import TimeoutException from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC import re from pyquery import PyQuery as pq from config import * import pymongo from selenium.webdriver.chrome.options import Options # 資料庫配置資訊 client = pymongo.MongoClient(MONGO_URL) db = client[MONGO_DB] # 全域性設定 options = Options() options.add_argument("--headless") browser = webdriver.Chrome(options=options) wait = WebDriverWait(browser, 20) def get_cookie_to_save(): try: driver = webdriver.Chrome() driver.get('https://login.taobao.com/member/login.jhtml') # 判斷是否已經成功登陸 # 這裡需要重新獲取頁面,因為頁面跳轉了 driver 無法識別 source = driver.page_source doc = pq(source) if(doc('#J_SiteNavMytaobao > div.site-nav-menu-hd > a > span') == u'我的淘寶'): dictCookies = driver.get_cookies() jsonCookies = json.dumps(dictCookies) # 登入完成後,將cookies儲存到本地檔案 with open("cookies_tao.json","w") as f: f.write(jsonCookies) except Exception: print('error') finally: driver.close() def get_the_cookie(): browser.get('https://www.taobao.com/') # 刪除本地的所有cookie browser.delete_all_cookies() # 讀取登入時儲存到本地的cookie with open("cookies_tao.json", "r", encoding="utf8") as f: ListCookies = json.loads(f.read()) # 迴圈遍歷新增 cookie for cookie in ListCookies: #print(cookie) browser.add_cookie(cookie) def search(): try: browser.get('https://www.taobao.com/') # 判斷所需的元素是否載入成功(wait until 中會存在判斷條件,因此常常用作判斷) input = wait.until( EC.presence_of_element_located((By.CSS_SELECTOR, "#q")) ) submit = wait.until( EC.element_to_be_clickable((By.CSS_SELECTOR, "#J_TSearchForm > div.search-button > button")) ) #輸入+點選 input.send_keys("美食") submit.click() #檢視頁數是否載入成功 total = wait.until( EC.presence_of_element_located((By.CSS_SELECTOR, "#mainsrp-pager > div > div > div > div.total")) ) get_products() return total.text except TimeoutException: return search() def next_page(page_number): try: input = wait.until( EC.presence_of_element_located((By.CSS_SELECTOR, "#mainsrp-pager > div > div > div > div.form > input")) ) submit = wait.until( EC.element_to_be_clickable((By.CSS_SELECTOR, "#mainsrp-pager > div > div > div > div.form > span.btn.J_Submit")) ) input.clear() input.send_keys(page_number) submit.click() wait.until( EC.text_to_be_present_in_element((By.CSS_SELECTOR, "#mainsrp-pager > div > div > div > ul > li.item.active > span"),str(page_number)) ) get_products() except TimeoutException: next_page(page_number) def get_products(): wait.until( # 這裡的 CSS 是手寫的,因為從控制檯複製的話只能得到一個 item EC.presence_of_element_located((By.CSS_SELECTOR, "#mainsrp-itemlist .items .item")) ) html = browser.page_source doc = pq(html) items = doc('#mainsrp-itemlist .items .item').items() for item in items: product = { 'title': item.find('.title').text(), 'image': item.find('.pic .img').attr('src'), 'price': item.find('.price').text(), 'deal': item.find('.deal-cnt').text()[:-3], 'shop': item.find('.shop').text(), 'location':item.find('.location').text(), } print(product) save_to_mongo(product) def save_to_mongo(result): try: if db[MONGO_TABLE].insert(result): print("儲存到 MongoDB 成功",result) except Exception: print("儲存到 MongoDB 失敗") def main(): try: get_cookie_to_save() get_the_cookie() total = int(re.compile('(\d+)').search(search()).group(1)) for i in range(2,total + 1): next_page(i) except Exception: print('出錯了') finally: browser.close() if __name__ == '__main__': main()
0X04 Flask + Redis 維護代理池
1.為什麼需要維護代理池
我們知道很多網站都是由反爬蟲的機制的,於是我們就需要對我們的 ip 進行偽裝,也是因為這個原因,網上也有很多的免費代理 IP 可以使用,但是這些 ip 質量參差不齊,於是我們就需要對其進行進一步的過濾,所以我們需要自己維護一個自己的好用的代理池,這就是我們這一節的目的,我們使用的 Redis 就是用來儲存我們的代理 ip 資訊的,flask 主要為我們提供一個方便的呼叫介面
2.代理池的基本要求
(1)多佔抓取,非同步檢測
(2)定時篩選持續更新
(3)提供介面,易於獲取
3.代理池的架構
4.程式碼實現
注:
這裡的程式碼實現來源於以下專案地址: https://github.com/Python3WebSpider/ProxyPool
(1)入口檔案 run.py
import... sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') def main(): try: # 這裡呼叫了排程器來執行起來整個代理池框架 s = Scheduler() s.run() except: main() if __name__ == '__main__': main()
(2)排程中心 scheduler.py
import... class Scheduler(): def schedule_tester(self, cycle=TESTER_CYCLE): """ 定時測試代理 """ tester = Tester() while True: print('測試器開始執行') tester.run() time.sleep(cycle) def schedule_getter(self, cycle=GETTER_CYCLE): """ 定時獲取代理 """ getter = Getter() while True: print('開始抓取代理') getter.run() time.sleep(cycle) def schedule_api(self): """ 開啟API """ app.run(API_HOST, API_PORT) def run(self): print('代理池開始執行') #使用多程序對三個重要函式進行呼叫 if TESTER_ENABLED: #呼叫tester 測試 ip 的可用性 tester_process = Process(target=self.schedule_tester) tester_process.start() if GETTER_ENABLED: #呼叫 getter 函式從網站中爬取代理 ip getter_process = Process(target=self.schedule_getter) getter_process.start() if API_ENABLED: #呼叫 api 函式,提供對外的介面並開啟對資料庫的介面 api_process = Process(target=self.schedule_api) api_process.start()
(3)代理ip獲取
getter.py
import... class Getter(): def __init__(self): self.redis = RedisClient() self.crawler = Crawler() def is_over_threshold(self): """ 判斷是否達到了代理池限制 """ if self.redis.count() >= POOL_UPPER_THRESHOLD: return True else: return False def run(self): print('獲取器開始執行') if not self.is_over_threshold(): #通過我們元類設定的屬性(方法列表和方法個數)迴圈呼叫不同的方法獲取代理 ip for callback_label in range(self.crawler.__CrawlFuncCount__): callback = self.crawler.__CrawlFunc__[callback_label] # 獲取代理 proxies = self.crawler.get_proxies(callback) sys.stdout.flush() for proxy in proxies: self.redis.add(proxy)
crawler.py
#定義一個元類來攔截類的建立,給類添加了一個__CrawlFunc__屬性記錄所有的爬蟲方法名 #__CrawlFuncCount__屬性記錄已經設定好的爬蟲方法 class ProxyMetaclass(type): def __new__(cls, name, bases, attrs): count = 0 attrs['__CrawlFunc__'] = [] for k, v in attrs.items(): if 'crawl_' in k: attrs['__CrawlFunc__'].append(k) count += 1 attrs['__CrawlFuncCount__'] = count return type.__new__(cls, name, bases, attrs) class Crawler(object, metaclass=ProxyMetaclass): # get_proxy 根據傳入的方法名稱,再通eval() 去執行從而對外統一了呼叫的介面 def get_proxies(self, callback): proxies = [] for proxy in eval("self.{}()".format(callback)): print('成功獲取到代理', proxy) proxies.append(proxy) return proxies def crawl_daili66(self, page_count=4): """ 獲取代理66 :param page_count: 頁碼 :return: 代理 """ start_url = 'http://www.66ip.cn/{}.html' urls = [start_url.format(page) for page in range(1, page_count + 1)] for url in urls: print('Crawling', url) html = get_page(url) if html: doc = pq(html) trs = doc('.containerbox table tr:gt(0)').items() for tr in trs: ip = tr.find('td:nth-child(1)').text() port = tr.find('td:nth-child(2)').text() yield ':'.join([ip, port]) def crawl_ip3366(self): ... yield result.replace(' ', '') def crawl_kuaidaili(self): ...
關鍵技術解釋:
雖然我在註釋中大概把關鍵的點都說了一下,但是這個技術非常重要,於是我還想再寫一下
(1)解決很多爬蟲配合執行的問題
因為我們的獲取代理 ip 的網站有很多,這樣我們就需要些很多的爬蟲,那麼這些爬蟲應該怎樣被我們排程就成了一個比較重要的問題,我們最好的想法就是每次呼叫一個網站,每次從這個網站中返回一個代理 ip 存入資料庫,那我們第一個想到的應該就是 用 yield 作為每個爬蟲的返回值的形式,這樣不僅能實現按照我們自定義的統一格式返回的目的,而且還能完美實現我們每次返回一個然後下一次還能接著繼續返回的目的
除此之外,想要配合執行我們還需要一個統一的函式呼叫介面,這個的實現方法是使用的 callback 回撥函式作為我們函式呼叫的引數,然後傳入我們的函式名,並通過 eval() 去執行我們的函式
(2)解決動態獲取方法名和方法個數問題
這個問題就比較神奇了,也是我們需要學習的重點,這裡使用的是 元類 來劫持類的構建並且為其新增對應的屬性的方法來解決這個問題,Python 中一切皆物件,元類簡單的說就是建立類的物件,我們還是重點再看一下程式碼
class ProxyMetaclass(type): def __new__(cls, name, bases, attrs): count = 0 attrs['__CrawlFunc__'] = [] for k, v in attrs.items(): if 'crawl_' in k: attrs['__CrawlFunc__'].append(k) count += 1 attrs['__CrawlFuncCount__'] = count return type.__new__(cls, name, bases, attrs)
解釋
__new__
是在 __init__
之前被呼叫的特殊方法,它用來建立物件並返回建立後的物件,各個引數說明如下:
# cls: 當前準備建立的類 # name: 類的名字 # bases: 類的父類集合 # attrs: 類的屬性和方法,是一個字典。
attrs 可以獲取到類的所有屬性和方法,於是我們只要給我們想要的方法一個統一的命名規範就可以了,在這裡的命名規範是方法名前都有 crawl_ 這個字串,這樣我們就能快速對其進行收集並且計數
(4)測試模組 test.py
import... class Tester(object): def __init__(self): self.redis = RedisClient() #async 表示使用協程的方式執行該函式 async def test_single_proxy(self, proxy): """ 測試單個代理 :param proxy: :return: """ #定義聯結器並取消ssl安全驗證 conn = aiohttp.TCPConnector(verify_ssl=False) #首先我們建立一個session物件 async with aiohttp.ClientSession(connector=conn) as session: try: if isinstance(proxy, bytes): proxy = proxy.decode('utf-8') real_proxy = 'http://' + proxy print('正在測試', proxy) #使用建立的 session 物件請求具體的網站 async with session.get(TEST_URL, proxy=real_proxy, timeout=15, allow_redirects=False) as response: if response.status in VALID_STATUS_CODES: self.redis.max(proxy) print('代理可用', proxy) else: self.redis.decrease(proxy) print('請求響應碼不合法 ', response.status, 'IP', proxy) except (ClientError, aiohttp.client_exceptions.ClientConnectorError, asyncio.TimeoutError, AttributeError): self.redis.decrease(proxy) print('代理請求失敗', proxy) def run(self): """ 測試主函式 :return: """ print('測試器開始執行') try: count = self.redis.count() print('當前剩餘', count, '個代理') for i in range(0, count, BATCH_TEST_SIZE): start = i stop = min(i + BATCH_TEST_SIZE, count) print('正在測試第', start + 1, '-', stop, '個代理') #批量獲取代理 test_proxies = self.redis.batch(start, stop) #asyncio.get_event_loop方法可以建立一個事件迴圈 #我們可以在事件迴圈中註冊協程物件(async 修飾的函式) loop = asyncio.get_event_loop() #將多個任務封裝到一起併發執行 tasks = [self.test_single_proxy(proxy) for proxy in test_proxies] #run_until_complete將協程註冊到事件迴圈,並啟動事件迴圈。 loop.run_until_complete(asyncio.wait(tasks)) sys.stdout.flush() time.sleep(5) except Exception as e: print('測試器發生錯誤', e.args)
解釋:
這裡用到的比較關鍵的技術是非同步網路請求,因為我們的 requests 庫是同步的,請求一個必須等到結果返回才能請求另一個,這不是我們想要的,於是非同步網路請求模組 aiohttp 就出現了,這是在 python3.5 以後新新增的內建功能(本質使用的是 Python 的協程)
對於類似爬蟲這種延時的IO操作,協程是個大利器,優點很多,他可以在一個阻塞發生時,掛起當前程式,跑去執行其他程式,把事件註冊到迴圈中,實現多程式併發,據說超越了10k限制,不過我沒有試驗過極限。
現在講一講協程的簡單的用法,當你爬一個網站,有100個網頁,正常是請求一次,回來一次,這樣效率很低,但協程可以一次發起100個請求(其實也是一個一個發),不同的是協程不會死等返回,而是發一個請求,掛起,再發一個再掛起,發起100個,掛起100個,然後同時等待100個返回,效率提升了100倍。可以理解為同時做100件事,相對於多執行緒,做到了由自己排程而不是交給CPU,程式流程可控,節約資源,效率極大提升。
具體的使用方法,我在上面程式碼中的註釋部分已經寫了,下面對關鍵步驟再簡單梳理一下:
1.定義聯結器並取消ssl安全驗證
conn = aiohttp.TCPConnector(verify_ssl=False)
2.建立一個session物件
async with aiohttp.ClientSession(connector=conn) as session:
3.使用建立的 session 物件請求具體的網站
async with session.get(TEST_URL, proxy=real_proxy, timeout=15, allow_redirects=False) as response:
4.asyncio.get_event_loop方法建立一個事件迴圈
loop = asyncio.get_event_loop()
5.將多個任務封裝到一起
tasks = [self.test_single_proxy(proxy) for proxy in test_proxies]
6.run_until_complete將協程註冊到事件迴圈,並啟動事件迴圈,多工併發執行
loop.run_until_complete(asyncio.wait(tasks))
(5)對外介面 api.py
import... __all__ = ['app'] app = Flask(__name__) def get_conn(): if not hasattr(g, 'redis'): g.redis = RedisClient() return g.redis @app.route('/') def index(): return '<h2>Welcome to Proxy Pool System</h2>' #對外介面直接呼叫資料庫返回隨機值 @app.route('/random') def get_proxy(): """ Get a proxy :return: 隨機代理 """ conn = get_conn() return conn.random() #對外介面呼叫資料庫返回代理個數 @app.route('/count') def get_counts(): """ Get the count of proxies :return: 代理池總量 """ conn = get_conn() return str(conn.count()) if __name__ == '__main__': app.run()
5.代理池使用
import... dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, dir) #先用 requests 庫請求一下api 獲取代理ip def get_proxy(): r = requests.get('http://127.0.0.1:5000/get') proxy = BeautifulSoup(r.text, "lxml").get_text() return proxy def crawl(url, proxy): proxies = {'http': proxy} r = requests.get(url, proxies=proxies) return r.text def main(): proxy = get_proxy() html = crawl('http://docs.jinkan.org/docs/flask/', proxy) print(html) if __name__ == '__main__': main()
0X05 使用代理處理反爬抓取微信文章
1.分析網頁確定思路
我們這次準備爬取搜狗的微信搜尋頁面的結果,以風景為例:
可以看到這和我們之前爬取過的案例幾乎類似,沒什麼新意,但是這裡有一個比較神奇的地方就是10頁以後的內容需要掃碼登入微信才能檢視
另外,在請求次數過多的時候還會出現封禁 ip 的情況,對應我們頁面的狀態碼就是 出現 302 跳轉
思路梳理:
(1)requests 請求目標站點,得到索引頁的原始碼,返回結果
(2)如果遇到 302 則說明 ip 被封,切換代理後重試
(3)請求詳情頁,分析得到文章標題和內容
(4)將結構化資料儲存到 MongoDB 資料庫
注意點:
我們直接看瀏覽器的位址列我們能看到很多的引數,但是實際上很大一部分是不需要的,那麼為了我們的寫程式碼的方便,我們儘量對引數進行簡化,只留下最核心的引數
2.程式碼實現
config.py
# 資料庫配置 MONGO_URL = 'localhost' MONGO_DB = 'weixin' MONGO_TABLE = 'articles' #引數設定 KEYWORD = '風景' MAX_COUNT = 5 BASE_URL = 'https://weixin.sogou.com/weixin?' #代理設定 APP_KEY = "" IP_PORT = 'transfer.mogumiao.com:9001' PROXIES = {"http": "http://" + IP_PORT, "https": "https://" + IP_PORT} HEADERS = { 'Cookie':'', 'Host':'weixin.sogou.com', 'Upgrade-Insecure-Requests':'1', 'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36', 'Proxy-Authorization': 'Basic '+ APP_KEY, 'Referer':'https://weixin.sogou.com/weixin' }
spider.py
from urllib.parse import urlencode import requests from pyquery import PyQuery as pq import re import pymongo from config import * #資料庫連線物件 client = pymongo.MongoClient(MONGO_URL) db = client[MONGO_DB] def get_html(url,count=1): global MAX_COUNT if count >= MAX_COUNT: print('Tried too many counts') return None try: res = requests.get(url,allow_redirects=False,headers=HEADERS,verify=False,proxies=PROXIES,timeout = 30) print(res.status_code) if res.status_code == 200: return res.text if res.status_code == 302: return get_html(url) except ConnectionError as e: print('Error Occurred',e.args) count += 1 return get_html(url,count) def get_index(keyword,page): data = { 'query':keyword, 'type':2, 'page':page, } queries = urlencode(data) url = BASE_URL + queries html = get_html(url) return html def parse_index(html): doc = pq(html) items = doc('.news-box .news-list li .txt-box h3 a').items() for item in items: yield item.attr('data-share') def get_detail(url): try: res = requests.get(url) if res.status_code == 200: return res.text return None except ConnectionError: return None def parse_detail(html): try: #print(html) doc = pq(html) title = doc('.rich_media_title').text() #date 是使用 js 變數動態載入的,我們需要使用正則匹配 js 變數 date = re.search('var publish_time = "(.*?)"',html) if date: date = date.group(1) date = None nickname = doc('#js_name').text() wechat = doc('#js_profile_qrcode > div > p:nth-child(3) > span').text() return { 'title':title, 'date':date, 'nickname ':nickname , 'wechat':wechat, } except ConnectionError: return None def save_to_mongo(data): #這裡使用更新的方法,如果標題重複就不在重新插入直接更新 if db[MONGO_TABLE].update({'title':data['title']},{'$set':data},True): print('Save to MongoDB',data['title']) else: print('Save to MongoDB Failed',data['title']) def main(): for page in range(1,101): html = get_index(KEYWORD,page) if html: urls = parse_index(html) for url in urls: html = get_detail(url) if html: article_data = parse_detail(html) save_to_mongo(article_data) if __name__ == '__main__': main()