1. 程式人生 > >pandas資料處理(一)pymongo資料庫量大插入時去重速度慢

pandas資料處理(一)pymongo資料庫量大插入時去重速度慢

  之前寫指令碼爬鬥魚主播資訊時用了一個pymongo的去重語句

db['host_info'].update({'主播': data['主播'], '時間': data['時間']}, {'$set': data}, True):

  這句話以主播和時間為索引判斷資料庫中如果沒有同一主播同一時間的資料就更新到資料庫。一開始還是很好用的,爬取速度還可以,但是我的計劃是每天晚上爬取黃金時間整點段的資料,幾個小時過後資料量就達到了十幾萬條,然後速度越來越慢,mongodb程序佔用cpu率很高,可以看到資料是一條條地存進去。畢竟以十幾萬條資料為基準去重工作量很大,隨著資料量的增長會更加慢,直到我的電腦爆掉。

 

  仔細分析了一下,我用主播和時間作為索引,每一個整點爬取一次,所以每一次爬取的時間肯定不一樣,也就是每一次爬的過程中可能會有重複資料,次與次之間不會存在重複資料。於是就把資料先放入一個空的臨時資料表中,仍然用上面的去重方法做去重,當去重成立後再把資料存入主資料表。這樣一來也就相當於只對單次爬取的資料做去重,效率提升極大。

#-*- coding:utf-8 -*-
#_author:John
#date:2018/12/29 20:11
import time
from functools import partial
import requests
import json
from multiprocessing import Pool import pymongo import datetime client = pymongo.MongoClient('localhost') db = client['douyu'] def single_page_info(page, cur_time): # 防止網路無響應,重試3次 for i in range(3): try: respones = requests.get('https://www.douyu.com/gapi/rkc/directory/0_0/{}'.format(page))
break except Exception as E: print(E) respones = None time.sleep(10) if respones: datas = json.loads(respones.text) items = datas['data']['rl'] for item in items: data = { '標題': item['rn'], '主播': item['nn'], '人氣': item['ol'], '類別': item['c2name'], '房間號': item['rid'], '時間': cur_time } # 用臨時資料表完成此次爬蟲的去重工作,在程式結束前把臨時資料表刪除 if db['host_info_draft'].update({'主播': data['主播'], '時間': data['時間']}, {'$set': data}, True): # 經過去重的資料存入主資料表 db['host_info'].insert(data) print('Save to Mongo, {}'.format(data)) else: print('Save to Mong fail, {}'.format(data)) print('已經完成第{}頁'.format(page)) # 刪除臨時資料表 def drop_draft(): db.drop_collection('host_info_draft') if __name__ == '__main__': pool = Pool() print('start') # 多執行緒抓200頁 while True: # 判斷當前時間是否為整點,如果是則開始爬蟲程式 minute = datetime.datetime.now().strftime('%M') if int(minute) < 2: # 把初始時間傳入作為此次爬蟲統一時間,python3用partial可以在map中傳入的函式傳遞引數 cur_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M') pool.map(partial(single_page_info, cur_time=cur_time), [page for page in range(1, 201)]) print('End with this cycle') else: time.sleep(58) # cur_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M') # pool.map(partial(single_page_info, cur_time=cur_time), [page for page in range(1, 201)]) # drop_draft()