1. 程式人生 > >python網路爬蟲(10)分散式爬蟲爬取靜態資料

python網路爬蟲(10)分散式爬蟲爬取靜態資料

目的意義

爬蟲應該能夠快速高效的完成資料爬取和分析任務。使用多個程序協同完成一個任務,提高了資料爬取的效率。

以百度百科的一條為起點,抓取百度百科2000左右詞條資料。

說明

參閱模仿了:https://book.douban.com/subject/27061630/。

作者說是簡單的分散式爬蟲(hh),在書中有詳細的說明和註解。

這裡只是補漏和梳理。

因為程序傳遞引數的問題,搞了幾天還是放棄了在WIndows上跑,換用了Linux。

又因為各種各樣的問題,棄用CentOS(它確實是安全可靠的,但是...我不會裝QQ,輸入法等),換用了軟體容易安裝的Ubuntu。然後才裝了Eclipse等各種軟體後,才開始多程序的除錯。

構造

主節點和從節點的方案實現資訊爬取。結構應該讓各個節點高效工作。

從節點:

爬蟲爬取速度受到網路延時的影響和網頁資訊解析的影響比較嚴重,所以使用多個從節點用來專門負責下載網頁資訊,解析網頁資訊。

則分為三個檔案,爬取檔案,下載網頁檔案,解析網頁檔案。

爬取檔案接收來自主節點發送來的網頁地址。然後呼叫下載網頁檔案並完成解析,將處理好的資料傳送給主節點。

主節點:

主節點負責傳送給從節點網頁地址,並接收來自從節點的解析後的網頁資訊,將網頁資訊儲存下來。

主節點任務分為分發網址,接收從節點的資訊,儲存網頁三部分。在程式碼裡,他建立了三個程序,來分別實現。

主節點任務中,儲存資訊,定義一套儲存資訊的方法。分發網址,定義一套分發網址過程中可能用到的方法。主檔案中,設立三個函式,建立三個程序。

主節點設計

主節點的三個任務,分成三個程序,三個程序(分發網址,資料接收,資料儲存),做一個類。

資料接收與分發網址,需要分散式程序。分散式程序需要使用佇列Queue。這裡一定是multiprocessing中的匯入的佇列。網址分發、資料接收分別使用一個佇列。

註冊,設定地址,祕鑰,完成初始化過程,將url_q,result_q分別註冊到網路中。

然後設立分發任務,傳遞佇列給分發任務函式。分發任務使用url_q佇列完成資料的傳送。使用conn_q接收了新的網址,並進行儲存,再次分發到url_q上。

資料接收任務,完成了資料的接收過程,接收以後需要及時將資料儲存,在這裡使用了兩個佇列conn_q,放置接收資料中的地址資訊,store_q,放置接收資料中的網頁資訊。

資料儲存任務,接收資料接收任務中的store_q佇列資訊,及時寫入到磁碟中。

所有涉及到的檔案如下:

NodeManager.py

import time
#import sys
#sys.path.append('/home')#if needed ,add path as package
from UrlManager import UrlManager
from multiprocessing import Process,Queue
from multiprocessing.managers import BaseManager
from DataOutput import DataOutput

class NodeManager():
    def start_manager(self,url_q,result_q):
        BaseManager.register('get_task_queue', callable=lambda:url_q)
        BaseManager.register('get_result_queue',callable=lambda:result_q)
        manager=BaseManager(address=('127.0.0.1',8001),authkey='baike'.encode('utf-8'))
        return manager
    
    def url_manager_proc(self,url_q,conn_q,root_url):
        #send url to queue and receive new urls for storing to object
        url_manager=UrlManager()
        url_manager.add_new_url(root_url)
        while True:
            while(url_manager.has_new_url()):
                new_url=url_manager.get_new_url()
                url_q.put(new_url)
                print('old url size:'+str(url_manager.old_url_size()))
                if(url_manager.old_url_size()>2000):
                    url_q.put('end')
                    url_manager.save_process('new_urls.txt',url_manager.new_urls)
                    url_manager.save_process('old_urls.txt',url_manager.old_urls)
                    print('finish url_manager_proc')
                    return
            try:
                urls=conn_q.get()
                url_manager.add_new_urls(urls)
                print('get:'+urls)
            except Exception:
                time.sleep(0.1)
        
    
    def result_solve_proc(self,result_q,conn_q,store_q):
        while True:
            if not result_q.empty():
                content=result_q.get(True)
                if content['new_urls']=='end':
                    print('finish result_solve_proc')
                    store_q.put('end')
                    return
                conn_q.put(content["new_urls"])
                store_q.put(content["data"])
            else:
                time.sleep(0.1)
    
    def store_proc(self,store_q):
        output=DataOutput()
        while True:
            if not store_q.empty():
                data=store_q.get()
                if data =='end':
                    print('finish store_proc')
                    output.output_end(output.path)
                    return
                output.store_data(data)



if __name__=='__main__':
    url_q=Queue()#send url to workers
    result_q=Queue()#receive url's analytical data from works
    store_q=Queue()#analytical data which is fresh is used for storing to disk for further extract
    conn_q=Queue()#urls which is fresh are used for storing to object for further extract
    nodeObject=NodeManager()
    manager=nodeObject.start_manager(url_q,result_q)
    
    root_url='https://baike.baidu.com/item/%E7%BD%91%E7%BB%9C%E7%88%AC%E8%99%AB/5162711?fr=aladdin'
    url_manager=Process(target=nodeObject.url_manager_proc,args=(url_q,conn_q,root_url,))
    result_solve=Process(target=nodeObject.result_solve_proc,args=(result_q,conn_q,store_q,))
    store=Process(target=nodeObject.store_proc,args=(store_q,))
    url_manager.start()
    result_solve.start()
    store.start()
    manager.get_server().serve_forever()

 UrlManager.py

import hashlib
import pickle
class UrlManager():
    def __init__(self):
        self.old_urls=self.load_process('new_urls.txt')
        self.new_urls=self.load_process('old_urls.txt')
        pass
    
    def has_new_url(self):
        return self.new_url_size()!=0
    
    def new_url_size(self):
        return len(self.new_urls)
    
    def old_url_size(self):
        return len(self.old_urls)
    
    def get_new_url(self):
        new_url=self.new_urls.pop()
        m=hashlib.md5()
        m.update(new_url.encode("utf8"))
        self.old_urls.add(m.hexdigest()[8:-8])
        return new_url
    
    def add_new_url(self,url):
        if url is None:
            return
        m=hashlib.md5()
        m.update(url.encode('utf-8'))       
        url_md5=m.hexdigest()[8:-8]
        if url not in self.new_urls and url_md5 not in self.old_urls:
            self.new_urls.add(url)
    
    def add_new_urls(self,urls):
        if urls is None or len(urls) == 0:
            return
        for url in urls:
            self.add_new_url(url)
        pass
    
    def save_process(self,path,data):
        with open(path,'wb') as f:
            pickle.dump(data,f)
    
    def load_process(self,path):
        print('loading..')
        try:
            with open(path,'rb') as f:
                tmp=pickle.load(f)
                return tmp
        except:
            print('loading error maybe loading file not exist and will create it:'+path)
        newSet=set()
        self.save_process(path, newSet)
        return newSet

 DataOutput.py

import codecs
from os.path import os
class DataOutput(object):
    def __init__(self):
        self.path='baike.html'
        self.output_head(self.path)
        self.datas=[]
    
    def store_data(self,data):
        if data is None:
            return
        self.datas.append(data)
        self.output_html(self.path,data)
    
    def output_head(self,path):
        if os.path.exists(path):
            return
        fout=codecs.open('baike.html', 'w', encoding='utf-8')
        fout.write("<html>")
        fout.write("<head><meta charset='urf-8'></head>")
        fout.write("<body>")
        fout.write("<table border='1' width=1800  style='word-break:break-all;word-wrap:break-word;'>")
        fout.write("<tr>")
        fout.write("<td width='20'>序號</td>")
        fout.write("<td width='300'>URL</td>")
        fout.write("<td width='100'>標題</td>")
        fout.write("<td width='1200'>釋義</td>")
        fout.write("</tr>")   
        fout.close()
        
    def output_end(self,path):
        fout=codecs.open(path, 'a', encoding='utf-8')
        fout.write("</table>")  
        fout.write("</body>")      
        fout.write("</html>")
        fout.close()       
        
    def output_html(self,path,data):
        fout=codecs.open(path, 'a', encoding='utf-8')    
        fout.write("<tr>")
        fout.write("<td>%s</td>"%str(len(self.datas)))
        fout.write("<td><a href=%s>%s</a></td>"%(data['url'],data['url']))
        fout.write("<td>%s</td>"%data['title'])
        fout.write("<td>%s</td>"%data['summary'])
        fout.write("</tr>")
        fout.close()

從節點設計

從節點首先是連線到指定地址並驗證祕鑰。連線後獲取url_q、result_q。

從url_q中獲取發來的地址,呼叫HTML下載器下載資料,調動HTML解析器解析資料,然後把結果放到result_q佇列上。

程式碼如下

SpiderWork.py

from multiprocessing.managers import BaseManager
from HtmlDownloader import HtmlDownloader
from HtmlParser import HtmlParser
class SpiderWork():
    def __init__(self):
        BaseManager.register('get_task_queue')
        BaseManager.register('get_result_queue')
        server_addr='127.0.0.1'
        print('connect'+server_addr)
        self.m=BaseManager(address=(server_addr,8001),authkey='baike'.encode('utf-8'))
        self.m.connect()
        self.task=self.m.get_task_queue()
        self.result=self.m.get_result_queue()
        print(self.task)
        self.downloader=HtmlDownloader()
        self.parser=HtmlParser()
        print('initial finish')
    
    def crawl(self):
        while (True):
            try:
                if not self.task.empty():
                    url=self.task.get()
                    if url == 'end':
                        print('stop spider1')
                        self.result.put({'new_urls':'end','data':'end'})
                        return
                    print('working:'+url)#url
                    content=self.downloader.download(url)
                    new_urls,data=self.parser.parser(url,content)
                    self.result.put({"new_urls":new_urls,"data":data})
            except Exception as e:
                print(e,url)
                
if __name__=="__main__":
    spider=SpiderWork()
    spider.crawl()

 HtmlDownloader.py

import requests
import chardet
class HtmlDownloader(object):
    def download(self,url):
        if url is None:
            return None
        user_agent='Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 SE 2.X MetaSr 1.0'
        headers={'User-Agent':user_agent}
        r=requests.get(url,headers=headers)
        if r.status_code is 200:
            r.encoding=chardet.detect(r.content)['encoding']
            return r.text
        return None

 HtmlParser.py

import re
from urllib import parse
from bs4 import BeautifulSoup
class HtmlParser(object):
    def parser(self,page_url,html_cont):
        if page_url is None or html_cont is None:
            return
        
        soup=BeautifulSoup(html_cont,'lxml')
        
        new_urls=self.getNewUrls(page_url,soup)
        
        new_data=self.getNewData(page_url,soup)
        return new_urls,new_data
    
    def getNewUrls(self,page_url,soup):
        new_urls=set()
        links=soup.find_all('a',href=re.compile(r'/item/.*'))
        for link in links:
            new_url=link['href']
            new_full_url=parse.urljoin(page_url,new_url)
            new_urls.add(new_full_url)
        return new_urls

    def getNewData(self,page_url,soup):
        data={}
        data['url']=page_url
        title = soup.find('dd',class_='lemmaWgt-lemmaTitle-title').find('h1')
        data['title']=title.get_text()
        summary = soup.find('div',class_='lemma-summary')
        #獲取到tag中包含的所有文版內容包括子孫tag中的內容,並將結果作為Unicode字串返回
        data['summary']=summary.get_text()
        return data

結果

建立.sh檔案如下:

#!/bin/bash
rm -rf log/*
rm -rf baike.html
rm -rf new_urls.txt
rm -rf old_urls.txt
python3 control/NodeManager.py &> log/control.log & for ((i=1;i<=10;i++)) do python3 spider/SpiderWork.py &>log/spider$i.log & done

啟動主節點,然後啟動10個從節點。將它們所產生的日誌資訊記錄到log/下,並都是在後臺執行的程序。

兩分鐘左右,完成約1900條的資料獲取。

 

可能用到的命令:

kill -9 $(ps aux | grep python | awk '{print $2}')

!kill

可能用到的軟體:

Eclipse的pydev程序除錯。

最後

這程式碼裡面真的有好多的細節檔案,序列化操作與儲存,md5的壓縮方案等,都是值得思考的。

&n