1. 程式人生 > >[Python] [爬蟲] 7.批量政府網站的招投標、中標資訊爬取和推送的自動化爬蟲——資料處理器

[Python] [爬蟲] 7.批量政府網站的招投標、中標資訊爬取和推送的自動化爬蟲——資料處理器

目錄

1.Intro

2.Source


1.Intro

檔名:dataDisposer.py

模組名:資料處理器

引用庫:

pymongo datetime
time sys

自定義引用檔案:configManager

功能:可以對爬取的資料進行儲存、刪除、更新、清洗等操作,可以返回資料庫物件。

 

2.Source

#!/usr/bin/env Python
# -*- coding: utf-8 -*-
'''
# Author  : YSW
# Time    : 2018/6/6 14:04
# File    : dataDisposer.py
# Version : 1.0
# Describe: 資料處理器
# Update  :
        1.資料更新
        2.資料清洗
'''

import pymongo
import datetime
import time
import configManager
import sys
# 設定預設編碼,防止出現中文字元亂碼
defaultencoding = 'utf-8'
if sys.getdefaultencoding() != defaultencoding:
    reload(sys)
    sys.setdefaultencoding(defaultencoding)

# 獲取資料庫配置檔案
__dbParam = configManager.dataBaseParams
# 連線 mongodb
__client = pymongo.MongoClient(__dbParam["userName"], __dbParam["port"])
# 建立資料庫
tenderDB = __client[__dbParam["dataBaseName"]]
# 建立資料庫
logDB = __client[__dbParam["logBaseName"]]

def current_time():
    '''
    獲取當前時間
    :return: 返回格式化後的時間資料
    '''
    print("[*] 正在獲取當前時間")
    currentTime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
    date = datetime.datetime.strptime(currentTime, '%Y-%m-%d %H:%M:%S')
    print("[+] 獲取完成")
    return date

class DataOperate(object):
    '''
    單獨的資料庫類,返回資料表庫物件
    '''
    @staticmethod
    def dataOperate():
        return tenderDB

    @staticmethod
    def logOperate():
        return logDB
		
class DataStore(object):
    '''
    資料庫操作類
    '''
    def __init__(self, tenderDBName):
        print("[*] 正在初始化資料庫操作模組")
        # 建立資料表,用於儲存招標資訊
        self.__tenderTable = tenderDB[tenderDBName]

    def tender_table(self):
        '''
        資料表
        :return: 返回資料表
        '''
        return self.__tenderTable

    def insert_data(self, dict_data):
        '''
        資料插入
        :param dict_data: 資料字典
        '''
        self.__tenderTable.insert_one(dict_data)

    def delete_data(self, dict_data):
        '''
        資料刪除
        :param dict_data: 資料字典
        '''
        self.__tenderTable.delete_one(dict_data)

    def delete_none_data(self):
        '''
        刪除連結或特定字元為 None 的資料
        '''
        self.__tenderTable.delete_many({"連結": None})

    def clean_data(self, list_index):
        '''
        資料清洗
        :param 需要進行重複性檢查的欄位列表
        通過 Mongo Shell 命令對資料庫資料進行清洗,去除重複資料

        命令:
        db.test.aggregate([
            {
                $group: { _id: {公告標題: '$公告標題',專案編號: '$專案編號'},
                count: {$sum: 1},
                dups: {$addToSet: '$_id'}}
            },
            {
                $match: {count: {$gt: 1}}
            }
        ]).forEach(function(doc){
            doc.dups.shift();
            db.test.remove({_id: {$in: doc.dups}});
        })
        '''
        print("[+] 開始進行資料清洗")
        _id = {}
        for index in list_index:
            _id[str(index)] = '$' + str(index)

        pipeline = [
            {
                '$group': {
                    '_id': _id,
                    'count': {'$sum': 1},
                    'dups': {'$addToSet': '$_id'}
                },
            },
            {
                '$match': {'count': {'$gt': 1}}
            }
        ]

        try:
            map_id = map(lambda doc: doc['dups'][1:], self.__tenderTable.aggregate(pipeline=pipeline))
            list_id = [item for sublist in map_id for item in sublist]
            list(map(lambda _id: self.delete_data({'_id': _id}), list_id))
            print("[+] 資料清洗完成")
        except Exception, e:
            print("[-] 資料清洗失敗")
            print("ERROR: " + str(e.message))

    def update_date_by_time(self, update_filed):
        '''
        資料更新
        當前時間小於或者等於截止時間時
        狀態欄位就更新為已截止
        :param: 需要進行更新的欄位
        :return:
        '''
        print("[+] 開始進行資料更新")
        currentDateTime = current_time()
        try:
            self.__tenderTable.update(
                {update_filed: {"$lte": currentDateTime}},
                {'$set': {'狀態': '已截止'}},
                multi=True,
                upsert=True
            )
            print("[+] 資料更新完成")
        except Exception, e:
            print("[-] 資料更新失敗")
            print("ERRRO: " + str(e.message))