1. 程式人生 > >python操作MySQL資料庫心得(numpy陣列寫入資料庫)

python操作MySQL資料庫心得(numpy陣列寫入資料庫)

    最近突發奇想,想把以前用Matlab實現的對高頻彩的開獎資料進行抓取並儲存到本地的專案重新用python做一遍。加上前段時間學習的MySQL,想將讀取回來的開獎資料存放到資料庫裡試試看。

    廢話不多說,實操看看。

網頁下載器

這部分就比較簡單了,不過值得一提的是,response需要encoding一下才能正常識別出內容,不然就是一大堆的亂碼。

import requests
import re
class HtmlDownloader(object):
    def download(self, url):
        '''
        實現對輸入網址的資料請求以及處理響應內容並輸出
        :param url: 輸入網址
        :return: 響應的網頁內容
        '''
        if url is None:
            return None
        user_agent = "Chrome/67.0.3396.99"
        headers = {"User-Agent": user_agent}
        signal = False
        index = 0
        # 這裡我並沒有設定timeout,因為採集資料過程不希望被超時打斷,而是設定了while迴圈,以跳出訊號或是連線次數判斷跳出
        while not signal:
            try:
                response = requests.get(url, headers=headers)
                response.encoding = "utf-8"
                signal = True
            except:
                print("連線失敗,正在重新連線...")
                index += 1
            # 設定了5次重新連線的機會,5次都沒有響應,就認倒黴吧,繼續下一個頁面好了
            if index >= 5:
                return None
        return response.text
    
    def dateChange(self, date):
        '''
        實現對日期格式的轉換,輸出格式為XXXXXXXX
        :param date: 輸入未處理格式的日期
        :return: 輸出所需格式的日期
        '''
        reg = re.compile(r"(\d{4})\D?(\d{2})\D?(\d{2})")
        if reg.match(date) is not None:
            result = reg.findall(date)[0]
            return result[0] + result[1] + result[2]
        else:
            return None

    因為要爬取的網頁http://caipiao.163.com/award/cqssc/XXXXXXXX.html,其中XXXXXXXX就是日期,例如20180501,所以網頁下載器htmldownloader類的其中一個小方法就是日期轉換datechange()。

網頁解析器

在網頁點開F12,點選選取元素髮現,在這樣的class="start" 的td標籤裡面包含著兩個我們想要的資訊,開獎期號以及開獎號碼。並且期數是以40為倍數3期3期第走的,得到的資料排列順序是001,041,081,002,042,082....,所以網頁解析器就可以這麼寫:

這裡使用了bs4庫中的BeautifulSoup模組,原本soup物件我是直接soup = BeautifulSoup(html_text)的,但是會有寫法錯誤警告,雖然警告並不影響程式正常執行,但是處於強迫症,研究了一番,在後面加一個引數"lxml"就好了。

from bs4 import BeautifulSoup
import numpy as np
class HtmlParser(object):
    def dataGet(self, html_text):
        '''
        解析並獲取html_text中所需的內容
        :param html_text: 網頁內容
        :return: data:numpy陣列格式的資料,每個維度分別是開獎期號以及5個開獎號碼
        '''
        if html_text is None:
            return None
        soup = BeautifulSoup(html_text, "lxml")
        periods = soup.find_all('td', class_="start")
        # per和data列表分別儲存期號和開獎號
        data = []
        per = []
        for period in periods:
            per.append(period.get("data-period"))
            data.append(period.get("data-win-number"))
        # 如果data的長度為0,那就沒有繼續往下處理的必要了,直接return None
        if len(data) == 0:
            return None
        # 把per和data交由專門對資料進行處理的私有方法去完成,將輸出的內容return
        return self._dataHandle(per, data)

    def _dataHandle(self, periods, datas):
        '''
        對資料進行處理,該方法繫結dataget方法使用
        :param periods: 期數
        :param datas: 開獎號
        :return: 處理後的numpy陣列
        '''
        # 網頁中的資料共120條,分3列,每列40個,讀取到的開獎期號排列為001,041,081,002,042,082...以此類推
        # 還沒開,跳開,漏開的那一期對應的資料,會是一個空字串或者是None,這些我們要跳過它們
        jump = 3
        tmp = []
        for index in range(3):
            for i in range(40):
                if datas[i * jump + index] is None or datas[i * jump + index] == "":
                    continue
                # 資料是一個以空格分隔的5個數字,所以要用split方法把5個數字切出來放到列表中
                data = datas[i * jump + index].split(" ")
                tmp.append([int(periods[i * jump + index]), int(data[0]), int(data[1]), int(data[2]), int(data[3]),
                            int(data[4])])
        return np.array(tmp)

資料儲存器

    這裡使用了pymysql庫

import pymysql
class DataMemorizer(object):
    def __init__(self):
        self.database = pymysql.connect("localhost", "root", "root", "cqssc")
        self.cur = self.database.cursor()

    def createTable(self, year):
        '''
        在資料庫中建立新表
        :param year: 年份,表的命名格式為“year+年份”,如year2018
        :return: 
        '''
        table_name = "year%s" % year
        table = self._toChar(table_name)
        if self._hasThisTable(table_name):
            # 如果這個表已存在,直接返回
            return table_name
        # 主鍵欄位dates,為日期,存放格式為XXXX的月日4位字串
        # reward存放當天一整天的開獎資料,是一個120*6的陣列,資料型別設定為longblob,其實我覺得blob也可以
        sql = "create table " + table + "(dates char(4) not null primary key, reward longblob not null) ENGINE=myisam DEFAULT CHARSET=utf8;"
        self.cur.execute(sql)
        self.database.commit()
        print("建立新表:%s..." % table_name)
        return table_name

    def insertData(self, table_name, dateID, data):
        '''
        向資料庫插入資料
        :param table_name:表名 
        :param dateID: 主鍵值
        :param data: 資料
        :return: 
        '''
        table = self._toChar(table_name)
        date = self._toStr(dateID)
        if self._hasThisId(table_name, dateID):
            # 如果這個ID已經存在,跳轉到修改資料方法
            self.updateData(table_name, dateID, data)
        else:
            # 先要將numpy陣列轉換成二進位制流,才能存到資料庫中
            b_data = data.tostring()
            sql = "insert into " + table + " values(" + date + ", %s);"
            self.cur.execute(sql, (b_data,))
            self.database.commit()
            print("已插入資料:%s." % dateID)

    def updateData(self, table_name, dateID, data):
        '''
        更新資料庫資料
        :param table_name:表名 
        :param dateID: 主鍵值
        :param data: 需要修改成的資料
        :return: 
        '''
        table = self._toChar(table_name)
        date = self._toStr(dateID)
        if not self._hasThisId(table_name, dateID):
            # 如果沒有這個主鍵值,那還改個屁,直接返回
            return
        # 同樣也是將data這個numpy陣列轉換一下成二進位制流資料
        b_data = data.tostring()
        sql = "update " + table + " set reward = %s where dates = %s;"
        self.cur.execute(sql, (b_data, date))
        self.database.commit()
        print("已更新資料:%s..." % dateID)

    def _hasThisTable(self, table_name):
        '''
        判斷是否存在此表
        :param table_name:表名 
        :return: True  or  False
        '''
        sql = "show tables;"
        self.cur.execute(sql)
        results = self.cur.fetchall()
        for r in results:
            if r[0] == table_name:
                return True
        else:
            return False

    def _hasThisId(self, table_name, dateID):
        '''
        判斷在此表中是否已經有此主鍵
        :param table_name: 表名
        :param dateID: 主鍵值
        :return: True  or  False
        '''
        sql = "select dates from " + table_name + ";"
        self.cur.execute(sql)
        ids = self.cur.fetchall()
        for i in ids:
            if i[0] == dateID:
                return True
        else:
            return False

    def _toChar(self, string):
        '''
        為輸入的字串新增一對反引號,用於表名、欄位名等對關鍵字的規避
        :param string: 
        :return: 
        '''
        return "`%s`" % string

    def _toStr(self, string):
        '''
        為輸入的字串新增一對單引號,用於數值處理,規避字串拼接後原字串暴露問題
        :param string: 
        :return: 
        '''
        return "'%s'" % string

    def __del__(self):
        '''
        臨走之前記得關燈關電關空調,還有關閉資料庫資源
        :return: 
        '''
        self.database.close()

這裡就有個地方磨了我很久,關於python對MySQL資料庫寫入二進位制資料,存在以下這麼些問題。在這裡我建立了一個year2019的表用於演示。

首先,我要寫入的是numpy陣列,自然要先將numpy轉換為二進位制資料,那簡單,用  .tostring()方法就行

# 1.匯入模組
import pymysql
import numpy as np
# 2.連線資料庫
db = pymysql.connect('localhost', 'root', 'root', 'cqssc')
# 3.建立遊標
csr = db.cursor()
# 4.執行操作
arr = np.array([1, 2, 3, 4, 5])
b_arr = arr.tostring()
csr.execute("insert into year2019 values('1234', %s);", (b_arr))
db.commit()
# 5.列印結果
print(b_arr, type(b_arr))
# 6.關閉連線
db.close()

測試執行結果

沒有報錯,到資料庫裡看看

 

這難道是西行取經的加上白龍馬之後的師徒五人嗎?沒道理,那就再讀出來看看吧

csr.execute("select reward from year2019 where dates='1234';")

c_arr = csr.fetchone()[0]

print(c_arr, type(c_arr))

既然可以讀出來,那就這樣吧哈哈哈,我一開始做的時候mysql可沒有這麼聽話,各種報語法錯誤,算了,做出來就行。

資料獲取器

    這個資料獲取器在這裡就只貼上來其中一個功能,就是獲取資料庫中已有資料的最後一天的日期。其他的方法當然包含將資料庫的資料取出來的還有其他的,但這裡重點講獲取最後日期的方法。

import pymysql
import numpy as np
class DataGetter(object):
    def __init__(self):
        # 連線資料庫,建立遊標
        self.database = pymysql.connect("localhost", "root", "root", "cqssc")
        self.cur = self.database.cursor()

    def getEndDate(self):
        '''
        獲取目前已有資料的最後一天的日期
        防止一段時間不爬取存在資料缺失
        :return: 存在資料的最後一天
        '''
        # 先看一下所有表
        sql = "show tables;"
        self.cur.execute(sql)
        tables = self.cur.fetchall()
        length = len(tables)
        if length == 0:
            return None
        # 通過表名後四位的數字,得到最大年份
        for i in range(length):
            if i == 0:
                maxyear = int(tables[i][0][4:])
                continue
            if int(tables[i][0][4:]) > maxyear:
                maxyear = int(tables[i][0][4:])
        while True:
            # 這裡try是為了防止下面maxyear -= 1減到表名不存在而報錯,表名不存在當然就是返回None
            try:
                table_name = "year" + str(maxyear)
                sql = "select dates from " + table_name + ";"
                self.cur.execute(sql)
            except:
                return None
            values = self.cur.fetchall()
            length = len(values)
            if length > 0:
                # 設定while True是為了防止第一次讀取,有表但是資料為空,所以當有資料,就break
                break
            else:
                maxyear -= 1

        for i in range(length):
            if i == 0:
                maxday = int(values[i][0])
                continue
            if int(values[i][0]) > maxday:
                maxday = int(values[i][0])
        return str(maxyear) + self._dayToStr(maxday)
    def _dayToStr(self,day):
        day = str(day)
        if len(day) == 4:
            return day
        return "0" + day

    處理過程比較冗雜,具體思路是先show tables,檢視資料庫中現有表名,去掉命名格式前面的year後int一下比較大小,取出最大的年份,然後讀取那一年的表中的資料,如果沒有資料,年數減一,如果在年數遞減的過程中,出現不存在該表的情況,直接返回None,讀取到資料的話就在讀取到的資料中繼續找出最大的一天,從而獲得擁有資料的最後一天的日期並return返回。目的是為了在一段時間之後再次爬取,不會重複爬取過多的內容。

爬蟲排程器

就是排程各單位工作的副總經理啦,責任巨大

from dataMemorizer.dataMemorizer import DataMemorizer
from htmlDownloader.htmlDownloader import HtmlDownloader
from htmlParser.htmlParser import HtmlParser
from dataGetter.dataGetter import DataGetter
import datetime

class SpiderMan(object):
    def __init__(self):
        self.memorizer = DataMemorizer()
        self.downloader = HtmlDownloader()
        self.parser = HtmlParser()
        self.getter = DataGetter()
    def crawlS(self):
        endday = self.getter.getEndDate()
        self._longCatch(endday)

    def _longCatch(self, enddate=None):
        '''
        120期全資料抓取
        :param enddate: 存在資料的最後一天的日期
        :return:
        '''
        if enddate is not None:
            enddate = self.downloader.dateChange(enddate)
        date = datetime.datetime.now()
        datenow = ""
        # 因為這裡datenow還不是正確的格式,所以就算日期相同,也可以往while迴圈裡面進去一次,多更新一天的資料總是好的
        index = 0
        while not datenow == enddate:
            datenow = date - datetime.timedelta(days=index)
            datenow = datenow.strftime('%Y%m%d')
            print("正在處理日期:%s......" % datenow)
            url = r"http://caipiao.163.com/award/cqssc/%s.html" % datenow
            html = self.downloader.download(url)
            arr = self.parser.dataGet(html)
            index += 1
            # 每年中都會存在一段時間都不開的情況,如新年,貌似每年有那麼8天是不開獎的
            # 所以,為了防止誤判,專門判斷在無資料的情況下,年份如果小於2000年,那也沒必要再往下爬資料了。
            if len(arr) == 0:
                if int(datenow[0:4]) <= 2000:
                    print("當日無資料且年份小於2000,停止執行")
                    break
                else:
                    print("當日無資料,日期為%s,請查證,正在跳過此日資料..." % datenow)
                    continue
            table_name = self.memorizer.createTable(datenow[0:4])
            self.memorizer.insertData(table_name, datenow[4:], arr)
    def __del__(self):
        '''
        總是要關燈關電關空調的,通過這個來間接性關閉資料庫資源
        :return:
        '''
        del self.memorizer
if __name__ == "__main__":
    spider = SpiderMan()
    signal = spider.crawl()
    del spider

    大概就是這樣了,中間就資料庫儲存二進位制資料的時候,因為各種語法上的錯誤卡了我好長時間。寫個部落格也寫了一晚上,真失敗。希望以後寫的程式會越寫越有乾貨。

【如有語法錯誤,歡迎在評論區糾正】