1. 程式人生 > >Python3:sqlalchemy對mysql數據庫操作,非sql語句

Python3:sqlalchemy對mysql數據庫操作,非sql語句

quest 用戶 prim bind afa make -c 細節 strip()

Python3:sqlalchemy對mysql數據庫操作,非sql語句

# python3
# author lizm
# datetime 2018-02-01 10:00:00
# -*- coding: utf-8 -*-
‘‘‘
    數據起始日期:2015-05-08
    數據庫:mysql
‘‘‘
import requests
from bs4 import BeautifulSoup
import json
import pymysql
import datetime
import time
import sys
import logging
from selenium import webdriver
from sqlalchemy import Column,Integer, String,DateTime,create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import and_,func import configparser import math logger = logging.getLogger() #set loghandler file = logging.FileHandler(sys.path[0]+"
\py_zgjs_log"+time.strftime("%Y%m%d")+".log") logger.addHandler(file) #set formater formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s") file.setFormatter(formatter) #set log level logger.setLevel(logging.NOTSET) # 創建對象的基類: Base = declarative_base() class Yztzzqktjb(Base):
# 表名 __tablename__ = py_zgjs_yztzzqktjb # 表結構 id = Column(Integer,primary_key=True,autoincrement=True) mc = Column(String(200),nullable=False) begindate = Column(String(45),nullable=False) enddate = Column(String(45), nullable=False) sjmc = Column(String(200)) ssjmc = Column(String(200)) sl = Column(String(45)) create_time = Column(DateTime,nullable=False) update_time = Column(DateTime,nullable=False) def __init__(self,mc,begindate,enddate,sjmc,ssjmc,sl,create_time,update_time): self.mc = mc self.begindate = begindate self.enddate = enddate self.sjmc = sjmc self.ssjmc = ssjmc self.sl = sl self.create_time = create_time self.update_time = update_time class ZgjsEntry(object): def __init__(self, v1, v2,v3,v4,v5,v6): self.v1 = v1 self.v2 = v2 self.v3 = v3 self.v4 = v4 self.v5 = v5 self.v6 = v6 def __get__(self, instance, cls): if instance is None: return self else: return instance.__dict__[self.name] def __set__(self, instance, value): instance.__dict__[self.name] = value def __delete__(self, instance): del instance.__dict__[self.name] def dbconfig(): #生成config對象 cfg = configparser.ConfigParser() #用config對象讀取配置文件 path_ = sys.path[0] cfg.read(path_+"\dbconfig.ini") ip = cfg.get("dbserver", "ip") port = cfg.get("dbserver", "port") user = cfg.get("dbserver", "user") password = cfg.get("dbserver", "password") dbname = cfg.get("dbserver", "dbname") endtime = cfg.get("dbtime", "endtime") initdate = cfg.get("dbtime", "mzkbinitdate") interval = cfg.get("dbtime", "interval") return (ip,port,user,password,dbname,endtime,initdate,interval) def savrData(tableName,zgjsList): msgcode = 0 message = 數據保存成功 try: dbcfg = dbconfig() # 初始化數據庫連接, # 傳入參數:數據庫類型+連接庫+用戶名+密碼+主機,字符編碼,是否打印建表細節 engine = create_engine(mysql+mysqlconnector://+dbcfg[2]+:+dbcfg[3]+@+dbcfg[0]+:+dbcfg[1]+/+dbcfg[4],encoding=utf-8) # 創建DBSession類型: DBSession = sessionmaker(bind=engine) session = DBSession() try: # 增操作 items = [] if tableName == Yztzzqktjb: if len(zgjsList)>0: for i in range(0,len(zgjsList)): results = session.query(Yztzzqktjb).filter(and_(Yztzzqktjb.mc == zgjsList[i].v1,Yztzzqktjb.begindate == zgjsList[i].v2,Yztzzqktjb.enddate==zgjsList[i].v3,Yztzzqktjb.sjmc==zgjsList[i].v4,Yztzzqktjb.ssjmc==zgjsList[i].v5)).all() if len(results) > 0: session.query(Yztzzqktjb).filter(and_(Yztzzqktjb.mc == zgjsList[i].v1,Yztzzqktjb.begindate == zgjsList[i].v2,Yztzzqktjb.enddate==zgjsList[i].v3,Yztzzqktjb.sjmc==zgjsList[i].v4,Yztzzqktjb.ssjmc==zgjsList[i].v5)).update({Yztzzqktjb.sl: zgjsList[i].v6,Yztzzqktjb.update_time:time.strftime(%Y-%m-%d %H:%M:%S)}, synchronize_session=False) else: item = Yztzzqktjb(mc=zgjsList[i].v1,begindate=zgjsList[i].v2,enddate=zgjsList[i].v3,sjmc=zgjsList[i].v4,ssjmc=zgjsList[i].v5,sl=zgjsList[i].v6,create_time=time.strftime(%Y-%m-%d %H:%M:%S),update_time=time.strftime(%Y-%m-%d %H:%M:%S)) items.append(item) else: pass #print("len(items)>>>>>%s" %len(items)) if len(items) > 0: for i in range(0,len(items)): session.add(items[i]) #提交數據 session.commit() except Exception as e: msgcode = 1 message = 數據保存失敗 + str(e) session.rollback() finally: #關閉 session.close() except Exception as e: msgcode = 1 message = 數據庫連接失敗+str(e) logger.info(message) print(message) return msgcode def getData(jsDate, channelIdStr,tableName): zgjsList = [] dateStr = jsDate[0:4]+.+jsDate[5:7]+.+jsDate[8:10] # 查詢按鈕跳轉url: # http://www.******.cn/cms-search/view.action?action=china url = "http://www.******.cn/cms-search/view.action?action=china" headerDict = {Host: www.*******.cn, User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.31 Safari/537.36, Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8, Accept-Language: zh-CN,zh;q=0.8, Accept-Encoding: gzip, deflate, Referer: http://www.******.cn/cms-search/view.action?action=china, Connection: keep-alive} data = {dateType: ‘‘, dateStr: dateStr, channelIdStr: channelIdStr} # psot 傳遞參數 res = requests.post(url, data=data, headers=headerDict) # 獲取跳轉後的頁面源碼 soup = BeautifulSoup(res.content, "html.parser") #獲取周報的起始日期 SettlementTitle = soup.find(div,class_=SettlementTitle) if SettlementTitle is None: return zgjsList h2 = SettlementTitle.find(h2).text if h2 == 搜索結果: return zgjsList weekdate = h2.strip().split()[1].split()[0] begindate = weekdate.split(-)[0].replace(.,-) enddate = weekdate.split(-)[1].replace(.,-) settlementList = soup.find(id=settlementList) # print(settlementList) if settlementList is None: return zgjsList if settlementList.find(table) is None: return zgjsList table_ = settlementList.find(table) tr_list = table_.find(table).find_all(tr) # 上級名稱 sjmc_1 = ‘‘ sjmc_2 = ‘‘ sjmc_3 = ‘‘ sjmc_4 = ‘‘ sjmc_5 = ‘‘ sjmc_6 = ‘‘ # 上上級名稱 ssjmc_1 = ‘‘ for n in range(1,len(tr_list)): td_list = tr_list[n].find_all(td) if tableName == Yztzzqktjb: if n == 1: sjmc_1 = td_list[0].get_text().replace(一、,‘‘).strip() if n == 4: sjmc_2 = td_list[0].get_text().replace(二、,‘‘).strip() ssjmc_1 = td_list[0].get_text().replace(二、,‘‘).strip() if n == 5: sjmc_3 = td_list[0].get_text().replace(1、,‘‘).strip() if n == 9: sjmc_4 = td_list[0].get_text().replace(2、,‘‘).strip() if n == 13: sjmc_5 = td_list[0].get_text().replace(三、,‘‘).strip() if n == 17: sjmc_6 = td_list[0].get_text().replace(四、,‘‘).strip() if tableName == Yztzzqktjb: if n in (6,10,14,18): continue zgjs = ZgjsEntry(‘‘,‘‘,‘‘,‘‘,‘‘,‘‘) zgjs.v2 = begindate zgjs.v3 = enddate if tableName == Yztzzqktjb: # 上級名稱 if n in (2,3): zgjs.v4 = sjmc_1 if n in (5,9): zgjs.v4 = sjmc_2 if n in (7,8): zgjs.v4 = sjmc_3 zgjs.v5 = ssjmc_1 if n in (11,12): zgjs.v4 = sjmc_4 zgjs.v5 = ssjmc_1 if n in (15,16): zgjs.v4 = sjmc_5 if n in (19,20): zgjs.v4 = sjmc_6 for i in range(0,len(td_list)): if i == 0: zgjs.v1 =td_list[i].get_text().replace(一、,‘‘).replace(二、,‘‘).replace(三、,‘‘).replace(四、,‘‘).replace(1、,‘‘).replace(2、,‘‘).strip() if i == 1: zgjs.v6 =td_list[i].get_text().strip().replace(,,‘‘) if zgjs is not None: zgjsList.append(zgjs) return zgjsList # 獲取開始日期: def getBeginDate(bgdate,tableName): r_date = bgdate try: dbcfg = dbconfig() # 初始化數據庫連接, # 傳入參數:數據庫類型+連接庫+用戶名+密碼+主機,字符編碼,是否打印建表細節 engine = create_engine(mysql+mysqlconnector://+dbcfg[2]+:+dbcfg[3]+@+dbcfg[0]+:+dbcfg[1]+/+dbcfg[4],encoding=utf-8) # 創建DBSession類型: DBSession = sessionmaker(bind=engine) session = DBSession() try: if tableName == Yztzzqktjb: results = session.query(func.max(Yztzzqktjb.enddate)).all() if len(results) != 0: r_date = results[0] else: pass except Exception as e: print(獲取開始日期,查詢異常;%s%str(e)) logger(獲取開始日期,查詢異常;%s%str(e)) session.rollback() finally: #關閉 session.close() except Exception as e: print(獲取開始日期,數據庫連接失敗;%s%str(e)) logger(獲取開始日期,數據庫連接失敗;%s%str(e)) if r_date[0] is None: r_date = bgdate else: begin = time.strptime(r_date[0], "%Y-%m-%d") y,m,d = begin[0:3] r_date = datetime.date(y,m,d) + datetime.timedelta(days=7) r_date = r_date .strftime(%Y-%m-%d) return r_date def isCheckData(date_): r_code = 0 try: dbcfg = dbconfig() # 初始化數據庫連接, # 傳入參數:數據庫類型+連接庫+用戶名+密碼+主機,字符編碼,是否打印建表細節 engine = create_engine(mysql+mysqlconnector://+dbcfg[2]+:+dbcfg[3]+@+dbcfg[0]+:+dbcfg[1]+/+dbcfg[4],encoding=utf-8) # 創建DBSession類型: DBSession = sessionmaker(bind=engine) session = DBSession() try: # results = session.query(Yztzzqktjb).filter(and_(func.datediff(Yztzzqktjb.enddate,date_)<6,func.datediff(Yztzzqktjb.enddate,date_)>-2)).all() if len(results) == 0: r_code = 1 else: r_code = 0 except Exception as e: r_code = 1 print(判斷是否有數據異常;%s%str(e)) logger(判斷是否有數據異常;%s%str(e)) session.rollback() finally: #關閉 session.close() except Exception as e: r_code = 1 print(判斷是否有數據,數據庫連接異常;%s%str(e)) logger(判斷是否有數據,數據庫連接異常;%s%str(e)) return r_code # 執行入口 def main(initdate_): req_list = [ {report:6ac54ce22db4474abc234d6edbe53ae7,table:Yztzzqktjb} ] for req in req_list: #字符轉日期 begin = time.strptime(getBeginDate(initdate_,req[table]), "%Y-%m-%d") y,m,d = begin[0:3] #日期格式:2018-01-18 begin = datetime.date(y,m,d) #獲取當前日期 end = datetime.date.today() if (end- begin).days < 0: pass else: for i in range(math.ceil((end - begin).days/7)+1): list_szzj = [] # 日期轉字符 date_ = (begin+datetime.timedelta(days=i*7)).strftime(%Y-%m-%d) list_mzkb = getData(date_,req[report],req[table]) if len(list_mzkb): savrData(req[table],list_mzkb) else: pass time.sleep(0.5) if i % 350 == 0: time.sleep(15) if __name__ == __main__: vrg_date = 20150509 dbcfg = dbconfig() vrg_endtime = dbcfg[5][0:2]+":"+dbcfg[5][2:4]+":"+dbcfg[5][4:6] var_initdate = dbcfg[6][0:4]+"-"+dbcfg[6][4:6]+"-"+dbcfg[6][6:8] var_interval = int(dbcfg[7]) if len(vrg_date) ==8: vrg_date = str(vrg_date[0:4]) + "-" + str(vrg_date[4:6]) + "-" + str(vrg_date[6:8]) end_time = time.strptime(vrg_endtime, "%H:%M:%S") y,m,d = end_time[3:6] end_time = datetime.time(y,m,d) # 循環采集 while True: now_time = time.strftime("%H%M%S") main(var_initdate) if isCheckData(vrg_date,) == 0: logger.info("采集數據結束") print("采集數據結束") break # 時間到停止采集 if int(end_time.strftime(%H%M%S)) - int(now_time) <= 0: logger.info("采集數據結束") print("采集數據結束") break # 間隔執行時間 logger.info("**********************(%s):沒有采集到數據,任務繼續執行**********************" %vrg_date) print("********************(%s):沒有采集到數據,任務繼續執行**********************" %vrg_date) time.sleep(var_interval) else: logger.info("日期參數格式不正確,請用格式:20180205") print("日期參數格式不正確,請用格式:20180205")

Python3:sqlalchemy對mysql數據庫操作,非sql語句