[Python] [爬蟲] 7.批量政府網站的招投標、中標資訊爬取和推送的自動化爬蟲——資料處理器
阿新 • • 發佈:2018-11-09
目錄
1.Intro
檔名:dataDisposer.py
模組名:資料處理器
引用庫:
pymongo | datetime |
time | sys |
自定義引用檔案:configManager
功能:可以對爬取的資料進行儲存、刪除、更新、清洗等操作,可以返回資料庫物件。
2.Source
#!/usr/bin/env Python # -*- coding: utf-8 -*- ''' # Author : YSW # Time : 2018/6/6 14:04 # File : dataDisposer.py # Version : 1.0 # Describe: 資料處理器 # Update : 1.資料更新 2.資料清洗 ''' import pymongo import datetime import time import configManager import sys # 設定預設編碼,防止出現中文字元亂碼 defaultencoding = 'utf-8' if sys.getdefaultencoding() != defaultencoding: reload(sys) sys.setdefaultencoding(defaultencoding) # 獲取資料庫配置檔案 __dbParam = configManager.dataBaseParams # 連線 mongodb __client = pymongo.MongoClient(__dbParam["userName"], __dbParam["port"]) # 建立資料庫 tenderDB = __client[__dbParam["dataBaseName"]] # 建立資料庫 logDB = __client[__dbParam["logBaseName"]] def current_time(): ''' 獲取當前時間 :return: 返回格式化後的時間資料 ''' print("[*] 正在獲取當前時間") currentTime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) date = datetime.datetime.strptime(currentTime, '%Y-%m-%d %H:%M:%S') print("[+] 獲取完成") return date class DataOperate(object): ''' 單獨的資料庫類,返回資料表庫物件 ''' @staticmethod def dataOperate(): return tenderDB @staticmethod def logOperate(): return logDB class DataStore(object): ''' 資料庫操作類 ''' def __init__(self, tenderDBName): print("[*] 正在初始化資料庫操作模組") # 建立資料表,用於儲存招標資訊 self.__tenderTable = tenderDB[tenderDBName] def tender_table(self): ''' 資料表 :return: 返回資料表 ''' return self.__tenderTable def insert_data(self, dict_data): ''' 資料插入 :param dict_data: 資料字典 ''' self.__tenderTable.insert_one(dict_data) def delete_data(self, dict_data): ''' 資料刪除 :param dict_data: 資料字典 ''' self.__tenderTable.delete_one(dict_data) def delete_none_data(self): ''' 刪除連結或特定字元為 None 的資料 ''' self.__tenderTable.delete_many({"連結": None}) def clean_data(self, list_index): ''' 資料清洗 :param 需要進行重複性檢查的欄位列表 通過 Mongo Shell 命令對資料庫資料進行清洗,去除重複資料 命令: db.test.aggregate([ { $group: { _id: {公告標題: '$公告標題',專案編號: '$專案編號'}, count: {$sum: 1}, dups: {$addToSet: '$_id'}} }, { $match: {count: {$gt: 1}} } ]).forEach(function(doc){ doc.dups.shift(); db.test.remove({_id: {$in: doc.dups}}); }) ''' print("[+] 開始進行資料清洗") _id = {} for index in list_index: _id[str(index)] = '$' + str(index) pipeline = [ { '$group': { '_id': _id, 'count': {'$sum': 1}, 'dups': {'$addToSet': '$_id'} }, }, { '$match': {'count': {'$gt': 1}} } ] try: map_id = map(lambda doc: doc['dups'][1:], self.__tenderTable.aggregate(pipeline=pipeline)) list_id = [item for sublist in map_id for item in sublist] list(map(lambda _id: self.delete_data({'_id': _id}), list_id)) print("[+] 資料清洗完成") except Exception, e: print("[-] 資料清洗失敗") print("ERROR: " + str(e.message)) def update_date_by_time(self, update_filed): ''' 資料更新 當前時間小於或者等於截止時間時 狀態欄位就更新為已截止 :param: 需要進行更新的欄位 :return: ''' print("[+] 開始進行資料更新") currentDateTime = current_time() try: self.__tenderTable.update( {update_filed: {"$lte": currentDateTime}}, {'$set': {'狀態': '已截止'}}, multi=True, upsert=True ) print("[+] 資料更新完成") except Exception, e: print("[-] 資料更新失敗") print("ERRRO: " + str(e.message))