1. 程式人生 > >實習了一個多月!師傅終於教我案例了!分散式爬蟲!這是我的筆記

實習了一個多月!師傅終於教我案例了!分散式爬蟲!這是我的筆記

實習了一個多月!師傅終於教我案例了!分散式爬蟲!這是我的筆記

 

要抓微博資料,第一步便是模擬登陸,因為很多資訊(比如使用者資訊,使用者主頁微博資料翻頁等各種翻頁)都需要在登入狀態下才能檢視

這裡我簡單說一下,做爬蟲的同學不要老想著用什麼機器學習的方法去識別複雜驗證碼,真的難度非常大,這應該也不是一個爬蟲工程師的工作重點,當然這只是我的個人建議。工程化的專案,我還是建議大家通過打碼平臺來解決驗證碼的問題。

實習了一個多月!師傅終於教我案例了!分散式爬蟲!這是我的筆記

 

策略我們都清楚了。就該是分析和編碼了。

我們先來分析如何構造使用者資訊的URL。這裡我以微博名為一起神吐槽的博主為例進行分析。做爬蟲的話,一個很重要的意識就是爬蟲能抓的資料都是人能看到的資料,反過來,人能在瀏覽器上看到的資料,爬蟲幾乎都能抓。這裡用的是幾乎,因為有的資料抓取難度特別。我們首先需要以正常人的流程看看怎麼獲取到使用者的資訊。我們先進入該博主的主頁,如下圖

實習了一個多月!師傅終於教我案例了!分散式爬蟲!這是我的筆記

 

進群:548377875    即可獲取大量的PDF以及教學視訊哦!希望你能通過Python拿到高薪!

實習了一個多月!師傅終於教我案例了!分散式爬蟲!這是我的筆記

 

種子博主具體資訊

這裡我們就看到了他的具體資訊了。然後,我們看該頁面的url構造

weibo.com/p/100505175…

我直接copy的位址列的url。這樣做有啥不好的呢?對於老鳥來說,一下就看出來了,這樣做的話,可能會導致資訊不全,因為可能有些資訊是動態載入的。所以,我們需要通過抓包來判斷到底微博會通過該url返回所有資訊,還是需要請求一些ajax 連結才會返回一些關鍵資訊。這裡我就重複一下我的觀點:抓包很重要,抓包很重要,抓包很重要!重要的事情說三遍。

我們抓完包,發現並沒有ajax請求。那麼可以肯定請求前面的url,會返回所有資訊。我們通過點選滑鼠右鍵,檢視網頁原始碼,然後ctrl+a、ctrl+c將所有的頁面原始碼儲存到本地,這裡我命名為personinfo.html。我們用瀏覽器開啟該檔案,發現我們需要的所有資訊都在這段原始碼中,這個工作和抓包判斷資料是否全面有些重複,但是在我看來是必不可少的,因為我們解析頁面資料的時候還可以用到這個html檔案,如果我們每次都通過網路請求去解析內容的話,那麼可能賬號沒一會兒就會被封了(因為頻繁訪問微博資訊),所以我們需要把要解析的檔案儲存到本地。

從上面分析中我們可以得知

weibo.com/p/100505175…

這個url就是獲取使用者資料的url。那麼我們在只知道使用者id的時候怎麼構造它呢?我們可以多拿幾個使用者id來做測試,看構造是否有規律,比如我這裡以使用者名稱為網易雲音樂的使用者做分析,發現它的使用者資訊頁面構造如下

weibo.com/1721030997/…

這個就和上面那個不同了。但是我們仔細觀察,可以發現上面那個是個人使用者,下面是企業微博使用者。我們嘗試一下把它們url格式都統一為第一種或者第二種的格式

weibo.com/1751195602/…

這樣會出現404,那麼統一成上面那種呢?

weibo.com/p/100505172…

這樣子的話,它會被重定向到使用者主頁,而不是使用者詳細資料頁。所以也就不對了。那麼該以什麼依據判斷何時用第一種url格式,何時用第二種url格式呢?我們多翻幾個使用者,會發現除了100505之外,還有100305、100206等字首,那麼我猜想這個應該可以區分不同使用者。這個字首在哪裡可以得到呢?我們開啟我們剛儲存的頁面原始碼,搜尋100505,可以發現

實習了一個多月!師傅終於教我案例了!分散式爬蟲!這是我的筆記

 

domain

微博應該是根據這個來區分不同使用者型別的。這裡大家可以自己也可以試試,看不同使用者的domain是否不同。為了資料能全面,我也是做了大量測試,發現個人使用者的domain是1005051,作家是100305,其他基本都是認證的企業號。前兩個個人資訊的url構造就是

weibo.com/p/domain+ui…

後者的是

weibo.com/uid/about

弄清楚了個人資訊url的構造方式,但是還有一個問題。我們已知只有uid啊,沒有domain啊。如果是企業號,我們通過domain=100505會被重定向到主頁,如果是作家等(domain=100305或者100306),也會被重定向主頁。我們在主頁把domain提取出來,再請求一次,不就能拿到使用者詳細資訊了嗎?

關於如何構造獲取使用者資訊的url的相關分析就到這裡了。因為我們是在登入的情況下進行資料抓取的,可能在抓取的時候,某個賬號突然就被封了,或者由於網路原因,某次請求失敗了,該如何處理?對於前者,我們需要判斷每次請求返回的內容是否符合預期,也就是看response url是否正常,看response content是否是404或者讓你驗證手機號等,對於後者,我們可以做一個簡單的重試策略,大概程式碼如下

@timeout_decorator

def get_page(url, user_verify=True, need_login=True):

"""

:param url: 待抓取url

:param user_verify: 是否為可能出現驗證碼的頁面(ajax連線不會出現驗證碼,如果是請求微博或者使用者資訊可能出現驗證碼),否為抓取轉發的ajax連線

:param need_login: 抓取頁面是否需要登入,這樣做可以減小一些賬號的壓力

:return: 返回請求的資料,如果出現404或者403,或者是別的異常,都返回空字串

"""

crawler.info('本次抓取的url為{url}'.format(url=url))

count = 0

while count < max_retries:
 if need_login:
 # 每次重試的時候都換cookies,並且和上次不同,如果只有一個賬號,那麼就允許相同
 name_cookies = Cookies.fetch_cookies()
 if name_cookies is None:
 crawler.warning('cookie池中不存在cookie,正在檢查是否有可用賬號')
 rs = get_login_info()
 # 選擇狀態正常的賬號進行登入,賬號都不可用就停掉celery worker
 if len(rs) == 0:
 crawler.error('賬號均不可用,請檢查賬號健康狀況')
 # 殺死所有關於celery的程序
 if 'win32' in sys.platform:
 os.popen('taskkill /F /IM "celery*"')
 else:
 os.popen('pkill -f "celery"')
 else:
 crawler.info('重新獲取cookie中...')
 login.excute_login_task()
 time.sleep(10)
 try:
 if need_login:
 resp = requests.get(url, headers=headers, cookies=name_cookies[1], timeout=time_out, verify=False)
 if "$CONFIG['islogin'] = '0'" in resp.text:
 crawler.warning('賬號{}出現異常'.format(name_cookies[0]))
 freeze_account(name_cookies[0], 0)
 Cookies.delete_cookies(name_cookies[0])
 continue
 else:
 resp = requests.get(url, headers=headers, timeout=time_out, verify=False)
 page = resp.text
 if page:
 page = page.encode('utf-8', 'ignore').decode('utf-8')
 else:
 continue
 # 每次抓取過後程式sleep的時間,降低封號危險
 time.sleep(interal)
 if user_verify:
 if 'unfreeze' in resp.url or 'accessdeny' in resp.url or 'userblock' in resp.url or is_403(page):
 crawler.warning('賬號{}已經被凍結'.format(name_cookies[0]))
 freeze_account(name_cookies[0], 0)
 Cookies.delete_cookies(name_cookies[0])
 count += 1
 continue
 if 'verifybmobile' in resp.url:
 crawler.warning('賬號{}功能被鎖定,需要手機解鎖'.format(name_cookies[0]))
 freeze_account(name_cookies[0], -1)
 Cookies.delete_cookies(name_cookies[0])
 continue
 if not is_complete(page):
 count += 1
 continue
 if is_404(page):
 crawler.warning('url為{url}的連線不存在'.format(url=url))
 return ''
 except (requests.exceptions.ReadTimeout, requests.exceptions.ConnectionError, AttributeError) as e:
 crawler.warning('抓取{}出現異常,具體資訊是{}'.format(url, e))
 count += 1
 time.sleep(excp_interal)
 else:
 Urls.store_crawl_url(url, 1)
 return page
crawler.warning('抓取{}已達到最大重試次數,請在redis的失敗佇列中檢視該url並檢查原因'.format(url))
Urls.store_crawl_url(url, 0)
return ''

這裡大家把上述程式碼當一段虛擬碼讀就行了,主要看看如何處理抓取時候的異常。因為如果貼整個使用者抓取的程式碼,不是很現實,程式碼量有點大。

下面講頁面解析的分析。有一些做PC端微博資訊抓取的同學,可能曾經遇到過這麼個問題:儲存到本地的html檔案開啟都能看到所有資訊啊,為啥在頁面原始碼中找不到呢?因為PC端微博頁面的關鍵資訊都是像下圖這樣,被FM.view()包裹起來的,裡面的資料可能被json encode過。

實習了一個多月!師傅終於教我案例了!分散式爬蟲!這是我的筆記

 

標籤

那麼這麼多的FM.view(),我們怎麼知道該提取哪個呢?這裡有一個小技巧,由於只有中文會被編碼,英文還是原來的樣子,所以我們可以看哪段script中包含了渲染後的頁面中的字元,那麼那段應該就可能包含所有頁面資訊。我們這裡以頂部的頭像為例,如圖

實習了一個多月!師傅終於教我案例了!分散式爬蟲!這是我的筆記

 

我們在頁面原始碼中搜索,只發現一個script中有該字串,那麼就是那段script是頁面相關資訊。我們可以通過正則表示式把該script提取出來,然後把其中的html也提取出來,再儲存到本地,看看資訊是否全面。這裡我就不截圖了。感覺還有很多要寫的,不然篇幅太長了。

另外,對於具體頁面的解析,我也不做太多的介紹了。太細的東西還是建議讀讀原始碼。我只講一下,我覺得的一種處理異常的比較優雅的方式。微博爬蟲的話,主要是頁面樣式太多,如果你打算包含所有不同的使用者的模版,那麼我覺得幾乎不可能,不同使用者模版,用到的解析規則就不一樣。那麼出現解析異常如何處理?尤其是你沒有catch到的異常。很可能因為這個問題,程式就崩掉。其實對於Python這門語言來說,我們可以通過 裝飾器 來捕捉我們沒有考慮到的異常,比如我這個裝飾器

def parse_decorator(return_type):

"""

:param return_type: 用於捕捉頁面解析的異常, 0表示返回數字0, 1表示返回空字串, 2表示返回[],3表示返回False, 4表示返回{}, 5返回None

:return: 0,'',[],False,{},None

"""

def page_parse(func): br/>@wraps(func)

def handle_error( keys):

try:

keys)

except Exception as e:

parser.error(e)

if return_type == 5:
 return None
 elif return_type == 4:
 return {}
 elif return_type == 3:
 return False
 elif return_type == 2:
 return []
 elif return_type == 1:
 return ''
 else:
 return 0
 return handle_error
return page_parse

上面的程式碼就是處理解析頁面發生異常的情況,我們只能在資料的準確性、全面性和程式的健壯性之間做一些取捨。用裝飾器的話,程式中不用寫太多的 try語句,程式碼重複率也會減少很多。

頁面的解析由於篇幅所限,我就講到這裡了。沒有涉及太具體的解析,其中一個還有一個比較難的點,就是資料的全面性,讀者可以去多觀察幾個微博使用者的個人資訊,就會發現有的個人資訊,有的使用者有填寫,有的並沒有。解析的時候要考慮完的話,建議從自己的微博的個人資訊入手,看到底有哪些可以填。這樣可以保證幾乎不會漏掉一些重要的資訊。

最後,我再切合本文的標題,講如何搭建一個分散式的微博爬蟲。開發過程中,我們可以先就做單機單執行緒的爬蟲,然後再改成使用celery的方式。這裡這樣做是為了方便開發和測試,因為你單機搭起來並且跑得通了,那麼分散式的話,就很容易改了,因為celery的API使用本來就很簡潔。

我們抓取的是使用者資訊和他的關注和粉絲uid。使用者資訊的話,我們一個請求大概能抓取一個使用者的資訊,而粉絲和關注我們一個請求可以抓取18個左右(因為這個抓的是列表),顯然可以發現使用者資訊應該多佔一些請求的資源。這時候就該介紹理論篇沒有介紹的關於celery的一個高階特性了,它叫做任務路由。直白點說,它可以規定哪個分散式節點能做哪些任務,不能做哪些任務。它的存在可以讓資源分配更加合理, 分散式微博爬蟲專案初期,就沒有使用任務路由,然後抓了十多萬條關注和分析,結果發現使用者資訊抓幾萬條,這就是資源分配得不合理。那麼如何進行任務路由呢?

coding:utf-8

import os

from datetime import timedelta

from celery import Celery

from kombu import Exchange, Queue

from config.conf import get_broker_or_backend

from celery import platforms

允許celery以root身份啟動

platforms.C_FORCE_ROOT = True

worker_log_path = os.path.join(os.path.dirname(os.path.dirname( file ))+'/logs', 'celery.log')

beat_log_path = os.path.join(os.path.dirname(os.path.dirname( file ))+'/logs', 'beat.log')

tasks = ['tasks.login', 'tasks.user']

include的作用就是註冊服務化函式

app = Celery('weibo_task', include=tasks, broker=get_broker_or_backend(1), backend=get_broker_or_backend(2))

app.conf.update(

CELERY_TIMEZONE='Asia/Shanghai',

CELERY_ENABLE_UTC=True,

CELERYD_LOG_FILE=worker_log_path,

CELERYBEAT_LOG_FILE=beat_log_path,

CELERY_ACCEPT_CONTENT=['json'],

CELERY_TASK_SERIALIZER='json',

CELERY_RESULT_SERIALIZER='json',

CELERY_QUEUES=(

Queue('login_queue', exchange=Exchange('login', type='direct'), routing_key='for_login'),

Queue('user_crawler', exchange=Exchange('user_info', type='direct'), routing_key='for_user_info'),

Queue('fans_followers', exchange=Exchange('fans_followers', type='direct'), routing_key='for_fans_followers'),

)

上述程式碼我指定了有login_queue、user_crawler、fans_followers三個任務佇列。它們分別的作用是登入、使用者資訊抓取、粉絲和關注抓取。現在假設我有三臺爬蟲伺服器A、B和C。我想讓我所有的賬號登入任務分散到三臺伺服器、讓使用者抓取在A和B上執行,讓粉絲和關注抓取在C上執行,那麼啟動A、B、C三個伺服器的celery worker的命令就分別是

celery -A tasks.workers -Q login_queue,user_crawler worker -l info -c 1 # A伺服器和B伺服器啟動worker的命令,它們只會執行登入和使用者資訊抓取任務

celery -A tasks.workers -Q login_queue,fans_followers worker -l info -c 1 # C伺服器啟動worker的命令,它只會執行登入、粉絲和關注抓取任務

然後我們通過命令列或者程式碼(如下)就能傳送所有任務給各個節點執行了

coding:utf-8

from tasks.workers import app

from page_get import user as user_get

from db.seed_ids import get_seed_ids, get_seed_by_id, insert_seeds, set_seed_other_crawled

@app.task(ignore_result=True)

def crawl_follower_fans(uid):

seed = get_seed_by_id(uid)

if seed.other_crawled == 0:

rs = user_get.get_fans_or_followers_ids(uid, 1)

rs.extend(user_get.get_fans_or_followers_ids(uid, 2))

datas = set(rs)

重複資料跳過插入

if datas:
 insert_seeds(datas)
 set_seed_other_crawled(uid)

@app.task(ignore_result=True)

def crawl_person_infos(uid):

"""

根據使用者id來爬取使用者相關資料和使用者的關注數和粉絲數(由於微博服務端限制,預設爬取前五頁,企業號的關注和粉絲也不能檢視)

:param uid: 使用者id

:return:

"""

if not uid:

return

# 由於與別的任務共享資料表,所以需要先判斷資料庫是否有該使用者資訊,再進行抓取
user = user_get.get_profile(uid)
# 不抓取企業號
if user.verify_type == 2:
 set_seed_other_crawled(uid)
 return
app.send_task('tasks.user.crawl_follower_fans', args=(uid,), queue='fans_followers',
 routing_key='for_fans_followers')

@app.task(ignore_result=True)

def excute_user_task():

seeds = get_seed_ids()

if seeds:

for seed in seeds:

在send_task的時候指定任務佇列

app.send_task('tasks.user.crawl_person_infos', args=(seed.uid,), queue='user_crawler',
 routing_key='for_user_info')

這裡我們是通過 queue='user_crawler',routing_key='for_user_info'來將任務和worker進行關聯的。

關於celery任務路由的更詳細的資料請閱讀官方文件。

到這裡,基本把微博資訊抓取的過程和分散式進行抓取的過程都講完了,具體實現分散式的方法,可以讀讀基礎篇。由於程式碼量比較大,我並沒有貼上完整的程式碼,只講了要點。分析過程是講的抓取過程的分析和頁面解析的分析,並在最後,結合分散式,講了一下使用任務佇列來讓分散式爬蟲更加靈活和可擴充套件。

如果有同學想跟著做一遍,可能需要參考分散式微博爬蟲的原始碼,自己動手實現一下,或者跑一下,印象可能會更加深刻。