1. 程式人生 > >[Python] [爬蟲] 8.批量政府網站的招投標、中標資訊爬取和推送的自動化爬蟲——資料推送模組

[Python] [爬蟲] 8.批量政府網站的招投標、中標資訊爬取和推送的自動化爬蟲——資料推送模組

目錄

1.Intro

2.Source

(1)dataPusher

(2)dataPusher_HTML


1.Intro

檔名:dataPusher.py、dataPusher_HTML.py

模組名:資料推送模組

引用庫:

smtplib email pyExcelerator
sys time datetime

自定義引用檔案:dataDisposer、Console_Color、configManager

功能:從資料庫中獲取資料生成HTML檔案,更新推送標識,格式化郵件地址,傳送郵件。

 

2.Source

(1) dataPusher

#!/usr/bin/env Python
# -*- coding: utf-8 -*-
'''
# Author  : YSW
# Time    : 2018/6/6 14:05
# File    : dataPusher.py
# Version : 1.0
# Describe: 資料推送模組(舊版本推送方式)
# Update  :
'''

'''
    smtplib模組主要負責傳送郵件:
        是一個傳送郵件的動作,連線郵箱伺服器,登入郵箱,傳送郵件(有發件人,收信人,郵件內容)。
    
    email模組主要負責構造郵件:
        指的是郵箱頁面顯示的一些構造,如發件人,收件人,主題,正文,附件等。
    
    xlwt模組:
        操作excel
    
    pyExcelerator模組:
        操作excel,寫入excel較為方便
    
'''
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.header import Header
from email import encoders
from email.mime.base import MIMEBase
from email.utils import parseaddr, formataddr
import time
from pyExcelerator import *

class DataWrite(object):
    def __init__(self):
        print("[*] 正在初始化資料寫入模組")
        self.excel_Workbook = Workbook()
        self.excel_Workbook_parse = Workbook()

    def excel_name(self, logic_file_type):
        '''
        獲取當前時間,生成excel檔名
        檔名格式為:
            年月日_時分秒
            如:20180619_161819
        :return: excel檔名
        '''
        print("[+] 正在建立檔名稱")
        current_time = time.strftime('%Y%m%d %H:%M:%S', time.localtime(time.time())).replace(' ', '_').replace(':', '')
        file_name = ""

        if logic_file_type == 0:
            file_name = r".\history_file\{0}.xls".format(current_time)
        elif logic_file_type == 1:
            file_name = r".\history_file\{0}[keyword].xls".format(current_time)
        elif logic_file_type == 2:
            file_name = r".\history_file\{0}_ZB.xls".format(current_time)
        elif logic_file_type == 3:
            file_name = r".\history_file\{0}_ZB[keyword].xls".format(current_time)

        print("[+] 建立成功")
        return file_name

    def excel_header(self, row, excel_sheet, excel_head_data, excel_sheet_name):
        '''
        生成excel標題頭
        :param row: 當前標題的行數
        :param excel_sheet: 當前excel中的表
        :param excel_head_data: 標題列表資料
        :param excel_sheet_name: 表名
        :return:
        '''
        print("[*] 正在寫入標題,表名:{0}".format(excel_sheet_name))
        try:
            index = 0
            for data in excel_head_data:
                excel_sheet.write(row, index, data)
                index += 1
            print("[+] 寫入標題成功")
            return True
        except Exception, e:
            print("[-] 寫入標題失敗")
            print("ERROR: " + str(e.message))
            return False

    def excel_write(self, excel_sheet_name, excel_head_data, excel_data, logic_file_type):
        '''
        excel檔案寫入
        :param excel_sheet_name: excel的sheet表名
        :param excel_head_data: excel的標題列表資料
        :param excel_data: 要寫入excel的資料
        :param logic_file_type: 判斷檔案是否為關鍵詞提取檔案
        :return: 返回生成的excel檔案地址
        '''
        excel_name = self.excel_name(logic_file_type)
        try:
            print("[*] 正在寫入檔案")
            # 在excel檔案中對應生成每一張表
            excel_sheet = self.excel_Workbook.add_sheet(excel_sheet_name)

            if self.excel_header(0, excel_sheet, excel_head_data, excel_sheet_name):
                index = 1
                for data in excel_data:
                    column_index = 0
                    for item in excel_head_data:
                        excel_sheet.write(index, column_index, data[item])
                        column_index += 1
                    index += 1
                self.excel_Workbook.save(excel_name)
            print("[+] 寫入檔案成功")
            return excel_name
        except Exception, e:
            print("[-] 寫入檔案失敗")
            print("ERROR: " + str(e.message))
            return excel_name

    def excel_write_parse(self, excel_sheet_name, excel_head_data, excel_data, logic_file_type):
        '''
        excel檔案寫入(篩選後)
        :param excel_sheet_name: excel的sheet表名
        :param excel_head_data: excel的標題列表資料
        :param excel_data: 要寫入excel的資料
        :param logic_file_type: 判斷檔案是否為關鍵詞提取檔案
        :return: 返回生成的excel檔案地址
        '''
        excel_name = self.excel_name(logic_file_type)
        try:
            print("[*] 正在寫入檔案")
            # 在excel檔案中對應生成每一張表
            excel_sheet = self.excel_Workbook_parse.add_sheet(excel_sheet_name)

            if self.excel_header(0, excel_sheet, excel_head_data, excel_sheet_name):
                index = 1
                for data in excel_data:
                    column_index = 0
                    for item in excel_head_data:
                        excel_sheet.write(index, column_index, data[item])
                        column_index += 1
                    index += 1
                self.excel_Workbook_parse.save(excel_name)
            print("[+] 寫入檔案成功")
            return excel_name
        except Exception, e:
            print("[-] 寫入檔案失敗")
            print("ERROR: " + str(e.message))
            return excel_name

class DataSend(object):
    def __init__(self):
        print("[*] 正在初始化資料推送模組")

    def format_address(self, address):
        '''
        格式化郵件地址
        :param address: 郵件地址
        :return: 格式化後的郵件地址
        '''
        print("[+] 正在格式化郵件地址")
        name, addr = parseaddr(address)
        print("[+] 格式化完成")
        return formataddr((Header(name, 'utf-8').encode(), addr))

    def send_mail(self, body, attachment):
        '''
        傳送郵件
        :param body: 郵件正文
        :param attachment: 附件地址
        :return: 傳送成功返回True
        '''
        print("[+] 開始傳送郵件...")
        # 要傳送的伺服器
        smtp_server = 'smtp.qq.com'
        # 要傳送的郵箱使用者名稱/密碼
        from_mail = '傳送方郵箱地址'
        mail_pass = '郵箱SMTP服務密碼'
        # 接收的郵箱
        to_mail = '接收方郵箱地址'

        # 構造一個 MIMEMultipart 物件代表郵件本身
        msg = MIMEMultipart()

        # Header 對中文進行轉碼
        msg['From'] = self.format_address('爬蟲機器人 <%s>' % from_mail).encode()
        msg['To'] = to_mail
        msg['Subject'] = Header('今日份的招投標資訊', 'utf-8').encode()

        # # plain 代表純文字
        msg.attach(MIMEText(body, 'plain', 'utf-8'))
        # 二進位制方式模式檔案
        if len(attachment) != 0:
            for file_path in attachment:
                with open(file_path, 'rb') as excel:
                    # MIMEBase 表示附件的名字
                    mime = MIMEBase(file_path[str(file_path).rfind('\\') + 1: -4], 'xls',
                                    filename=file_path[str(file_path).rfind('\\') + 1:])

                    # filename 是顯示附件名字
                    mime.add_header('Content-Disposition', 'attachment',
                                    filename=file_path[str(file_path).rfind('\\') + 1:])

                    # 獲取附件內容
                    mime.set_payload(excel.read())
                    encoders.encode_base64(mime)

                    # 作為附件新增到郵件
                    msg.attach(mime)

        print("[+] 正在連線 SMTP 伺服器")
        email = smtplib.SMTP_SSL(smtp_server, 465)
        print("[+] 連線成功")
        print("[+] 正在授權 SMTP 服務")
        login_code = email.login(from_mail, mail_pass)
        if login_code[0] is 235:
            print("[+] 授權成功")
        else:
            print("[-] 授權失敗")
            return False
        try:
            # as_string()把 MIMEText 物件變成 str
            print("[+] 正在傳送郵件")
            email.sendmail(from_mail, to_mail, msg.as_string())
            email.quit()
            print("[+] 傳送成功")
            return True
        except Exception as e:
            print("[-] 傳送失敗")
            print("ERROR: " + str(e.message))
            return False

(2) dataPusher_HTML

#!/usr/bin/env Python
# -*- coding: utf-8 -*-
'''
# Author  : YSW
# Time    : 2018/8/14 14:05
# File    : dataPusher_HTML.py
# Version : 1.0
# Describe: 資料推送模組(HTML版)
# Update  :
'''

import sys
import time
from Lib import Console_Color
import configManager
import dataDisposer
import datetime
reload(sys)
sys.setdefaultencoding('utf-8')

# 關鍵詞列表
KEY_WORD = []
# 表的標題名
TABLE_TITLE = configManager.table_title
TENDER = dataDisposer.tenderDB

# 資料庫
TENDER_TABLE = dataDisposer.DataOperate.dataOperate()

# 時間
DATE = dataDisposer.current_time()
TODAY_TIME = datetime.datetime(DATE.year, DATE.month, DATE.day, 0, 0, 0)


class HTML_Content(object):
    def __init__(self):
        Console_Color.print_color("[*] 正在初始化HTML資料寫入模組")

    def get_data(self, table_name):
        '''
        資料獲取函式
        :param table_name: 表名
        :return: 返回資料列表
        '''
        tenderTable = TENDER_TABLE[table_name]
        # 獲取今日資料
        list_data = list(tenderTable.find(
            {
                '釋出時間': {"$gte": TODAY_TIME},
                # '推送': False
            })
        )
        tenderTable.update(
            {'推送': False},
            {'$set': {'推送': True}},
            multi=True,
            upsert=True
        )
        return list_data

    def delete_data(self, table_name):
        '''
        移除連結為空的資料行
        :param table_name: 資料表名稱
        '''
        sheet = TENDER[table_name]
        sheet.remove({"連結": None})

    def current_time(self):
        time_parse = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
        return time_parse

    def html_name(self, logic_file_type):
        '''
        獲取當前時間,生成 html 檔名
        檔名格式為:
        年月日_時分秒
        如:20180619_161819
        :return: html 檔名
        '''
        Console_Color.print_color("[+] 正在建立檔名稱")
        current_time = time.strftime('%Y%m%d %H:%M:%S', time.localtime(time.time())).replace(' ', '_').replace(':', '')
        file_name = ""
        if logic_file_type == 0:
            file_name = r".\history_file\{0}.html".format(current_time)
        elif logic_file_type == 1:
            file_name = r".\history_file\{0}[keyword].html".format(current_time)
        elif logic_file_type == 2:
            file_name = r".\history_file\{0}_ZB.html".format(current_time)
        elif logic_file_type == 3:
            file_name = r".\history_file\{0}_ZB[keyword].html".format(current_time)
        Console_Color.print_color("[+] 建立成功")
        return file_name

    def __html_1(self, title, name):
        '''
        HTML網頁第一部分
        :param title: 網頁標題,如 “招投標資訊”
        :param name: 當前網頁名稱,如 “今日份的招投標檔案”
        :param desc: 描述資訊
        :return: 返回網頁第一部分資訊
        '''
        desc = "推送時間:{0}".format(self.current_time())
        html1 = """
        <html>
        <head>
        <meta http-equiv="Content-Type" content="text/html; charset=utf-8">
        <title>{0}</title></head><body bgcolor="white">
        </head>
        <body>
        <Center><H2>{1}</h2></Center>
        <p align="center">{2}</p>
        <Hr width="100%">
        <BR>
        """.format(title, name, desc)
        return html1

    def __html_content_header(self, current_website_name):
        '''
        分隔每個網站的標題頭
        :param current_website_name: 標題名稱
        :return: 帶標題名稱的網頁資訊
        '''
        Console_Color.print_color("[+] 建立網站標題頭")
        html_header = """
        <hr width="100%" style="margin-top:-5px;border:3px solid blue;"/>
        <h3>{0}</h3>
        """.format(current_website_name)
        return html_header

    def __html_a(self, url, time_parse, name, dict_data):
        '''
        主要內容
        :param url: 子連結地址
        :param name: 標題
        :param time_parse: 時間
        :param *args: 其他內容
        :return: 返回主要內容
        '''
        Console_Color.print_color("[+] 寫入主要內容: {0}".format(name))
        html_a = """
        <Hr width="100%">
        ├─<a>[{1}] #### </a><a href="{0}" target="_blank">{2}</a><br>
        """.format(url, time_parse, name)
        html_a_second = """"""
        for key, value in dict_data.items():
            html_a_second_tmp = """
            ├───────<a>{0}</a><br>
            """.format("{0}: {1}".format(key, value))
            html_a_second += html_a_second_tmp

        html = html_a + html_a_second + "<Hr width='100%'>"
        return html

    # Fixed
    def __html2(self):
        '''
        HTML網頁第二部分
        :return: 返回網頁第二部分資訊
        '''
        html2 = """
        </body>
        </html>
        """
        return html2

    def html_content_func(self, list_data, current_website_name):
        '''
        網頁主內容方法
        :param list_data: 資料列表
        :param current_website_name: 當前網站名稱
        :return: 返回頁面資料
        '''
        print("[*] 正在寫入網頁資料")
        html_content = self.__html_content_header(current_website_name)
        for data in list_data:
            url = str(data[u"連結"]).encode('utf-8')
            data.pop(u"連結")
            try:
                project_name = str(data[u"工程名稱"]).encode('utf-8')
                data.pop(u"工程名稱")
            except KeyError:
                try:
                    project_name = str(data[u"公告標題"]).encode('utf-8')
                    data.pop(u"公告標題")
                except KeyError:
                    project_name = str(data[u"公告名稱"]).encode('utf-8')
                    data.pop(u"公告名稱")

            time_parse = str(data[u"釋出時間"]).encode('utf-8')
            data.pop(u"釋出時間")
            data.pop(u"_id")
            data.pop(u"推送")
            html_content += self.__html_a(url, time_parse, project_name, data) + '\n'
        Console_Color.print_color("[+] 寫入完成")
        return html_content

    def html_engine(self, title, name, html_content):
        '''
        HTML生成器
        :param title: 網頁標題,如 “招投標資訊”
        :param name: 當前網頁名稱,如 “今日份的招投標檔案”
        :param current_website_name: 當前網站的標題名稱,如 “雲南省公共資源交易中心電子服務系統_工程建設”
        :param html_content: 當前網站的主要內容
        :return: 全網頁
        '''
        Console_Color.print_color("[*] 正在生成HTML頁面")
        html = \
                self.__html_1(title, name) \
                + "\n" \
                + html_content \
                + "\n" \
                + self.__html2()
        Console_Color.print_color("[+] 生成成功")
        return html

    def html_write(self, title, name, dict_html_data_name, logic_file_type):
        '''
        HTML 檔案寫入方法
        :param title: 網頁標題
        :param name: 當前網頁的名稱
        :param func: 資料獲取的方法
        :param list_html_data_name: 包含資料庫表名和網站名稱的字典
        :param logic_file_type: 檔案標識
        :return html檔案路徑
        '''
        html_file_name = self.html_name(logic_file_type)
        html_con = """"""
        for table_name, table_value in dict_html_data_name.items():
            self.delete_data(table_name)
            current_website_name = table_value
            list_data = self.get_data(table_name)
            if list_data == []:
                continue
            html_content = self.html_content_func(list_data, current_website_name)
            html_con += html_content
        if html_con == """""":
            return ''
        html = self.html_engine(title, name, html_con)
        with open(html_file_name, "w") as f:
            f.write(html)
        return html_file_name

    def html_write_keywords(self, title, name, dict_html_data_name, logic_file_type):
        '''
        HTML 檔案寫入方法(加入關鍵詞篩選)
        :param title: 網頁標題
        :param name: 當前網頁的名稱
        :param func: 資料獲取的方法
        :param list_html_data_name: 包含資料庫表名和網站名稱的字典
        :param logic_file_type: 檔案標識
        :return html檔案路徑
        '''
        html_file_name = self.html_name(logic_file_type)
        html_con = """"""
        for table_name, table_value in dict_html_data_name.items():
            self.delete_data(table_name)
            current_website_name = table_value
            list_data = self.get_data(table_name)
            # 讀取關鍵詞檔案並生成關鍵字列表
            with open(r".\keyword_file\keyword.txt", 'r') as f:
                line = f.read()
                if line not in KEY_WORD:
                    KEY_WORD.append(line)
            key_word = str(KEY_WORD[0]).split('\n')

            # 篩選關鍵詞資訊
            list_data_parse = []
            for data in list_data:
                for key in key_word:
                    # 獲取每張表對應的標題欄位並判斷是否包含關鍵詞資訊
                    if key in data[TABLE_TITLE[table_name]] and data not in list_data_parse:
                        list_data_parse.append(data)
            if list_data_parse == []:
                continue
            html_content = self.html_content_func(list_data_parse, current_website_name)
            html_con += html_content
        if html_con == """""":
            return ''
        html = self.html_engine(title, name, html_con)
        with open(html_file_name, "w") as f:
            f.write(html)
        return html_file_name