python操作MySQL資料庫心得(numpy陣列寫入資料庫)
最近突發奇想,想把以前用Matlab實現的對高頻彩的開獎資料進行抓取並儲存到本地的專案重新用python做一遍。加上前段時間學習的MySQL,想將讀取回來的開獎資料存放到資料庫裡試試看。
廢話不多說,實操看看。
網頁下載器
這部分就比較簡單了,不過值得一提的是,response需要encoding一下才能正常識別出內容,不然就是一大堆的亂碼。
import requests import re class HtmlDownloader(object): def download(self, url): ''' 實現對輸入網址的資料請求以及處理響應內容並輸出 :param url: 輸入網址 :return: 響應的網頁內容 ''' if url is None: return None user_agent = "Chrome/67.0.3396.99" headers = {"User-Agent": user_agent} signal = False index = 0 # 這裡我並沒有設定timeout,因為採集資料過程不希望被超時打斷,而是設定了while迴圈,以跳出訊號或是連線次數判斷跳出 while not signal: try: response = requests.get(url, headers=headers) response.encoding = "utf-8" signal = True except: print("連線失敗,正在重新連線...") index += 1 # 設定了5次重新連線的機會,5次都沒有響應,就認倒黴吧,繼續下一個頁面好了 if index >= 5: return None return response.text def dateChange(self, date): ''' 實現對日期格式的轉換,輸出格式為XXXXXXXX :param date: 輸入未處理格式的日期 :return: 輸出所需格式的日期 ''' reg = re.compile(r"(\d{4})\D?(\d{2})\D?(\d{2})") if reg.match(date) is not None: result = reg.findall(date)[0] return result[0] + result[1] + result[2] else: return None
因為要爬取的網頁http://caipiao.163.com/award/cqssc/XXXXXXXX.html,其中XXXXXXXX就是日期,例如20180501,所以網頁下載器htmldownloader類的其中一個小方法就是日期轉換datechange()。
網頁解析器
在網頁點開F12,點選選取元素髮現,在這樣的class="start" 的td標籤裡面包含著兩個我們想要的資訊,開獎期號以及開獎號碼。並且期數是以40為倍數3期3期第走的,得到的資料排列順序是001,041,081,002,042,082....,所以網頁解析器就可以這麼寫:
這裡使用了bs4庫中的BeautifulSoup模組,原本soup物件我是直接soup = BeautifulSoup(html_text)的,但是會有寫法錯誤警告,雖然警告並不影響程式正常執行,但是處於強迫症,研究了一番,在後面加一個引數"lxml"就好了。
from bs4 import BeautifulSoup import numpy as np class HtmlParser(object): def dataGet(self, html_text): ''' 解析並獲取html_text中所需的內容 :param html_text: 網頁內容 :return: data:numpy陣列格式的資料,每個維度分別是開獎期號以及5個開獎號碼 ''' if html_text is None: return None soup = BeautifulSoup(html_text, "lxml") periods = soup.find_all('td', class_="start") # per和data列表分別儲存期號和開獎號 data = [] per = [] for period in periods: per.append(period.get("data-period")) data.append(period.get("data-win-number")) # 如果data的長度為0,那就沒有繼續往下處理的必要了,直接return None if len(data) == 0: return None # 把per和data交由專門對資料進行處理的私有方法去完成,將輸出的內容return return self._dataHandle(per, data) def _dataHandle(self, periods, datas): ''' 對資料進行處理,該方法繫結dataget方法使用 :param periods: 期數 :param datas: 開獎號 :return: 處理後的numpy陣列 ''' # 網頁中的資料共120條,分3列,每列40個,讀取到的開獎期號排列為001,041,081,002,042,082...以此類推 # 還沒開,跳開,漏開的那一期對應的資料,會是一個空字串或者是None,這些我們要跳過它們 jump = 3 tmp = [] for index in range(3): for i in range(40): if datas[i * jump + index] is None or datas[i * jump + index] == "": continue # 資料是一個以空格分隔的5個數字,所以要用split方法把5個數字切出來放到列表中 data = datas[i * jump + index].split(" ") tmp.append([int(periods[i * jump + index]), int(data[0]), int(data[1]), int(data[2]), int(data[3]), int(data[4])]) return np.array(tmp)
資料儲存器
這裡使用了pymysql庫
import pymysql
class DataMemorizer(object):
def __init__(self):
self.database = pymysql.connect("localhost", "root", "root", "cqssc")
self.cur = self.database.cursor()
def createTable(self, year):
'''
在資料庫中建立新表
:param year: 年份,表的命名格式為“year+年份”,如year2018
:return:
'''
table_name = "year%s" % year
table = self._toChar(table_name)
if self._hasThisTable(table_name):
# 如果這個表已存在,直接返回
return table_name
# 主鍵欄位dates,為日期,存放格式為XXXX的月日4位字串
# reward存放當天一整天的開獎資料,是一個120*6的陣列,資料型別設定為longblob,其實我覺得blob也可以
sql = "create table " + table + "(dates char(4) not null primary key, reward longblob not null) ENGINE=myisam DEFAULT CHARSET=utf8;"
self.cur.execute(sql)
self.database.commit()
print("建立新表:%s..." % table_name)
return table_name
def insertData(self, table_name, dateID, data):
'''
向資料庫插入資料
:param table_name:表名
:param dateID: 主鍵值
:param data: 資料
:return:
'''
table = self._toChar(table_name)
date = self._toStr(dateID)
if self._hasThisId(table_name, dateID):
# 如果這個ID已經存在,跳轉到修改資料方法
self.updateData(table_name, dateID, data)
else:
# 先要將numpy陣列轉換成二進位制流,才能存到資料庫中
b_data = data.tostring()
sql = "insert into " + table + " values(" + date + ", %s);"
self.cur.execute(sql, (b_data,))
self.database.commit()
print("已插入資料:%s." % dateID)
def updateData(self, table_name, dateID, data):
'''
更新資料庫資料
:param table_name:表名
:param dateID: 主鍵值
:param data: 需要修改成的資料
:return:
'''
table = self._toChar(table_name)
date = self._toStr(dateID)
if not self._hasThisId(table_name, dateID):
# 如果沒有這個主鍵值,那還改個屁,直接返回
return
# 同樣也是將data這個numpy陣列轉換一下成二進位制流資料
b_data = data.tostring()
sql = "update " + table + " set reward = %s where dates = %s;"
self.cur.execute(sql, (b_data, date))
self.database.commit()
print("已更新資料:%s..." % dateID)
def _hasThisTable(self, table_name):
'''
判斷是否存在此表
:param table_name:表名
:return: True or False
'''
sql = "show tables;"
self.cur.execute(sql)
results = self.cur.fetchall()
for r in results:
if r[0] == table_name:
return True
else:
return False
def _hasThisId(self, table_name, dateID):
'''
判斷在此表中是否已經有此主鍵
:param table_name: 表名
:param dateID: 主鍵值
:return: True or False
'''
sql = "select dates from " + table_name + ";"
self.cur.execute(sql)
ids = self.cur.fetchall()
for i in ids:
if i[0] == dateID:
return True
else:
return False
def _toChar(self, string):
'''
為輸入的字串新增一對反引號,用於表名、欄位名等對關鍵字的規避
:param string:
:return:
'''
return "`%s`" % string
def _toStr(self, string):
'''
為輸入的字串新增一對單引號,用於數值處理,規避字串拼接後原字串暴露問題
:param string:
:return:
'''
return "'%s'" % string
def __del__(self):
'''
臨走之前記得關燈關電關空調,還有關閉資料庫資源
:return:
'''
self.database.close()
這裡就有個地方磨了我很久,關於python對MySQL資料庫寫入二進位制資料,存在以下這麼些問題。在這裡我建立了一個year2019的表用於演示。
首先,我要寫入的是numpy陣列,自然要先將numpy轉換為二進位制資料,那簡單,用 .tostring()方法就行
# 1.匯入模組
import pymysql
import numpy as np
# 2.連線資料庫
db = pymysql.connect('localhost', 'root', 'root', 'cqssc')
# 3.建立遊標
csr = db.cursor()
# 4.執行操作
arr = np.array([1, 2, 3, 4, 5])
b_arr = arr.tostring()
csr.execute("insert into year2019 values('1234', %s);", (b_arr))
db.commit()
# 5.列印結果
print(b_arr, type(b_arr))
# 6.關閉連線
db.close()
測試執行結果
沒有報錯,到資料庫裡看看
這難道是西行取經的加上白龍馬之後的師徒五人嗎?沒道理,那就再讀出來看看吧
csr.execute("select reward from year2019 where dates='1234';")
c_arr = csr.fetchone()[0]
print(c_arr, type(c_arr))
既然可以讀出來,那就這樣吧哈哈哈,我一開始做的時候mysql可沒有這麼聽話,各種報語法錯誤,算了,做出來就行。
資料獲取器
這個資料獲取器在這裡就只貼上來其中一個功能,就是獲取資料庫中已有資料的最後一天的日期。其他的方法當然包含將資料庫的資料取出來的還有其他的,但這裡重點講獲取最後日期的方法。
import pymysql
import numpy as np
class DataGetter(object):
def __init__(self):
# 連線資料庫,建立遊標
self.database = pymysql.connect("localhost", "root", "root", "cqssc")
self.cur = self.database.cursor()
def getEndDate(self):
'''
獲取目前已有資料的最後一天的日期
防止一段時間不爬取存在資料缺失
:return: 存在資料的最後一天
'''
# 先看一下所有表
sql = "show tables;"
self.cur.execute(sql)
tables = self.cur.fetchall()
length = len(tables)
if length == 0:
return None
# 通過表名後四位的數字,得到最大年份
for i in range(length):
if i == 0:
maxyear = int(tables[i][0][4:])
continue
if int(tables[i][0][4:]) > maxyear:
maxyear = int(tables[i][0][4:])
while True:
# 這裡try是為了防止下面maxyear -= 1減到表名不存在而報錯,表名不存在當然就是返回None
try:
table_name = "year" + str(maxyear)
sql = "select dates from " + table_name + ";"
self.cur.execute(sql)
except:
return None
values = self.cur.fetchall()
length = len(values)
if length > 0:
# 設定while True是為了防止第一次讀取,有表但是資料為空,所以當有資料,就break
break
else:
maxyear -= 1
for i in range(length):
if i == 0:
maxday = int(values[i][0])
continue
if int(values[i][0]) > maxday:
maxday = int(values[i][0])
return str(maxyear) + self._dayToStr(maxday)
def _dayToStr(self,day):
day = str(day)
if len(day) == 4:
return day
return "0" + day
處理過程比較冗雜,具體思路是先show tables,檢視資料庫中現有表名,去掉命名格式前面的year後int一下比較大小,取出最大的年份,然後讀取那一年的表中的資料,如果沒有資料,年數減一,如果在年數遞減的過程中,出現不存在該表的情況,直接返回None,讀取到資料的話就在讀取到的資料中繼續找出最大的一天,從而獲得擁有資料的最後一天的日期並return返回。目的是為了在一段時間之後再次爬取,不會重複爬取過多的內容。
爬蟲排程器
就是排程各單位工作的副總經理啦,責任巨大
from dataMemorizer.dataMemorizer import DataMemorizer
from htmlDownloader.htmlDownloader import HtmlDownloader
from htmlParser.htmlParser import HtmlParser
from dataGetter.dataGetter import DataGetter
import datetime
class SpiderMan(object):
def __init__(self):
self.memorizer = DataMemorizer()
self.downloader = HtmlDownloader()
self.parser = HtmlParser()
self.getter = DataGetter()
def crawlS(self):
endday = self.getter.getEndDate()
self._longCatch(endday)
def _longCatch(self, enddate=None):
'''
120期全資料抓取
:param enddate: 存在資料的最後一天的日期
:return:
'''
if enddate is not None:
enddate = self.downloader.dateChange(enddate)
date = datetime.datetime.now()
datenow = ""
# 因為這裡datenow還不是正確的格式,所以就算日期相同,也可以往while迴圈裡面進去一次,多更新一天的資料總是好的
index = 0
while not datenow == enddate:
datenow = date - datetime.timedelta(days=index)
datenow = datenow.strftime('%Y%m%d')
print("正在處理日期:%s......" % datenow)
url = r"http://caipiao.163.com/award/cqssc/%s.html" % datenow
html = self.downloader.download(url)
arr = self.parser.dataGet(html)
index += 1
# 每年中都會存在一段時間都不開的情況,如新年,貌似每年有那麼8天是不開獎的
# 所以,為了防止誤判,專門判斷在無資料的情況下,年份如果小於2000年,那也沒必要再往下爬資料了。
if len(arr) == 0:
if int(datenow[0:4]) <= 2000:
print("當日無資料且年份小於2000,停止執行")
break
else:
print("當日無資料,日期為%s,請查證,正在跳過此日資料..." % datenow)
continue
table_name = self.memorizer.createTable(datenow[0:4])
self.memorizer.insertData(table_name, datenow[4:], arr)
def __del__(self):
'''
總是要關燈關電關空調的,通過這個來間接性關閉資料庫資源
:return:
'''
del self.memorizer
if __name__ == "__main__":
spider = SpiderMan()
signal = spider.crawl()
del spider
大概就是這樣了,中間就資料庫儲存二進位制資料的時候,因為各種語法上的錯誤卡了我好長時間。寫個部落格也寫了一晚上,真失敗。希望以後寫的程式會越寫越有乾貨。
【如有語法錯誤,歡迎在評論區糾正】