1. 程式人生 > >Python sqlalchemy增刪改查,多表查詢join操作

Python sqlalchemy增刪改查,多表查詢join操作

sqlalchemy物件:

from sqlalchemy import Column
from sqlalchemy import DateTime
from sqlalchemy import BIGINT
from sqlalchemy import INTEGER
from sqlalchemy import String
from db_manager import TTDModel


class CertTB(TTDModel):
    """
    證書表
    """
    __tablename__ = 'cert_info'
    uuid = Column(BIGINT, primary_key=True)
    create_user_name = Column(String(255), default=None)  #
    create_user_uuid = Column(BIGINT, default=None)  #
    file_paths = Column(String(255), default=None)  #
    file_ids = Column(String(255), default=None)  #
    cert_no = Column(String(255), default=None)  #
    cert_type = Column(INTEGER, default=None)  # 1:t1 2:t2
    validity_start_time = Column(DateTime, default=None)  #
    validity_end_time = Column(DateTime, default=None)  #

    def __init__(self):
        self.__table__ = None

    def get_columns(self):
        return [col.name for col in self.__table__.columns]

    def to_dict(self):
        return dict([(col, getattr(self, col)) for col in self.get_columns()])

db:

engine = create_engine(DB_URI,
                       pool_size=100,
                       max_overflow=0,
                       pool_timeout=60,  # 超時時間
                       pool_pre_ping=True)  # 從連線池獲取連線之前檢查有效性

Session = sessionmaker(bind=engine)
db = Session()

1.新增/新增

		# 獲取使用者的請求引數
		cType = request.form['cType']
        cNo = request.form['cNo']
        startDate = request.form['startDate']
        endDate = request.form['endDate']
        filePaths = request.form.get("filePaths");
        if filePaths:
            filePaths = request.form['filePaths']
		
		#組裝物件資料
        cert = CertTB()
        cert.cert_no = cNo
        cert.cert_type = cType
        if filePaths:
            cert.file_paths = filePaths
        cert.validity_start_time = date_format.get_date_str_ymdhms(startDate)
        cert.validity_end_time = date_format.get_date_str_ymdhms(endDate)
        db.add(cert)   # 新增物件資料到資料庫
        db.commit()   # 提交
        db.close()		# 關閉資料庫連線

2 更新
①查詢結果直接更新

db.query(Certificate) \    #查詢證書
        .filter(Certificate.uuid == cert_id) \  # 查詢條件
        .update({Certificate.cNumber: cert_code,   # “:”後面部分是更新後的值
                 Certificate.operationCer: cert_type,
                 Certificate.startDate: approve_date,
                 Certificate.cExpireDate: end_date})
db.flush()

②查詢結果,更新結果,提交

		uuid = request.form['certId']
        cType = request.form['cType']
        cNo = request.form['cNo']
        startDate = request.form['startDate']
        endDate = request.form['endDate']
        filePaths = request.form.get("filePaths");
        if filePaths:
            filePaths = request.form['filePaths']

        cert = db.query(CertTB).filter(CertTB.uuid == uuid).first()
        if not cert:
            return make_api_respone(201, '未找到相關技工認證證書', {}, None)
        cert.cert_no = cNo
        cert.cert_type = cType
        if filePaths:
            cert.file_paths = filePaths
        cert.validity_start_time = startDate   # 設定新資料
        cert.validity_end_time = endDate    # 設定新資料
        db.commit()  # 提交更新
        db.close()

3 刪除

db.query(CerAttachment).filter(CerAttachment.cerId == cert_id).delete()   # 查詢並刪除
db.flush()  # 處理
刪除多條資料[temp是一個元祖]
ev_qrs = db.query(EvQrCode).filter(EvQrCode.evId.in_(temp)).all()
# 刪除老的二維碼
if ev_qrs:
     for item in ev_qrs:
          db.delete(item)
    db.commit()
注意:db.query(EvQrCode).filter(EvQrCode.evId.in_(temp)).delete()會報錯

4 查詢
單表查詢就不介紹了,這裡直接多表查詢,join操作【提示:join操作是一個費時操作,儘量少用】

	hlog.info("查詢count") 
    count = db.query(EvInfo.uuid.label("evId"), EvInfo.evOrder, ProjectInfo.name, ProjectInfo.address) \
        .filter(and_(ProjectInfo.city == city_id, ProjectInfo.name.like("%" + project_name + "%"))) \
        .join(ProjectInfo, EvInfo.projectId == ProjectInfo.uuid) \
        .count()
    hlog.info("批量查詢電梯")
    data = db.query(EvInfo.uuid.label("evId"), EvInfo.evOrder, ProjectInfo.name, ProjectInfo.address) \
        .filter(and_(ProjectInfo.city == city_id, ProjectInfo.name.like("%" + project_name + "%"))) \
        .join(ProjectInfo, EvInfo.projectId == ProjectInfo.uuid) \		# join 專案表
        .limit(page_size) \   # page_size 分頁—每頁的數量
        .offset((page_num - 1) * page_size) \    # page_num 當前頁數
        .all()

下面這段程式碼:查詢在CLocation中“不存在”的WoInfo

data = db.query(WoInfo.uuid.label("woId"), WoInfo.type.label('woType'), CElevator.evId, WoInfo.date) \
        .filter(and_(WoInfo.userId == user_id, ~exists().where(CLocation.woId == WoInfo.uuid))) \
        .join(CElevator, CElevator.woId == WoInfo.uuid) \
        .limit(page_size) \
        .offset((page - 1) * page_size) \
        .all()