1. 程式人生 > >【Python模組】sqlalchemy orm模組--基礎(連線資料庫,建表,增刪改查)

【Python模組】sqlalchemy orm模組--基礎(連線資料庫,建表,增刪改查)

SQLAlchemy是Python程式語言下的一款ORM框架,該框架建立在資料庫API之上,使用關係物件對映進行資料庫操作,簡言之便是:將物件轉換成SQL,然後使用資料庫API執行SQL並獲取執行結果。


安裝:

pip3 install SQLAlchemy

版本檢查:

import sqlalchemy
sqlalchemy.__version__


不同資料庫配置:

根據配置檔案的不同調用不同的資料庫API,從而實現對資料庫的操作:

格式:'資料庫型別+資料庫驅動名稱://使用者名稱:口令@機器地址:埠號/資料庫名'

MySQL-Python
    mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
 
pymysql
    mysql+pymysql://<username>:<password>@<host>[:<port>]/<dbname>[?<options>]
 
MySQL-Connector
    mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
 
cx_Oracle
    oracle+cx_oracle://user:
[email protected]
:port/dbname[?key=value&key=value...]

 

連線資料庫:

from sqlalchemy import create_engine
engine = create_engine("mysql+pymysql://root:[email protected]/school",encoding='utf-8',echo=True)

宣告對映:

declaractive用來表示類與表的關係。

宣告基類,將類對映到資料表,自定義資料表類必須繼承這個基類。

from sqlalchemy.ext.declaractive import  declaractive_base
Base = declaractive_base()

建立表結構類:

一個表結構類必須包含一個__tablename__和primary_key的欄位

欄位的型別:

SmallInteger
Integer
BigInteger
Float
Numeric(precision=None, scale=None, decimal_return_scale=None, asdecimal=True)

Boolean
Enum

Date
DateTime
Time
Interval(native=True, second_precision=None, day_precision=None)

LargeBinary(length=None)
MatchType(create_constraint=True, name=None, _create_events=True)
PickleType(protocol=4, pickler=None, comparator=None)
SchemaType(name=None, schema=None, metadata=None, inherit_schema=False, quote=None, _create_events=True)
String(length=None, collation=None, convert_unicode=False, unicode_error=None, _warn_on_bytestring=False)
Text(length=None, collation=None, convert_unicode=False, unicode_error=None, _warn_on_bytestring=False)
Unicode(length=None, **kwargs)

欄位的選項:

Column(
nullable=True,      # 可以為空

autoincrement=True, # 數值自增

default='red'       # 指定預設值

primary_key = True  # 設為主鍵

ForeignKey = 'table_name.id'    # 指定外來鍵,ForeignKey需要匯入
)

 

建立sqlalchemy連線:

 

from sqlalchemy import create_engine
engine = create_engine()

宣告表結構:

class School(Base):
    __tablename__ = "school"                # 這個才是表名
    id = Column(Integer, primary_key = True)
    sch_name = Column(String(32))
    sch_addr = Column(String(255))
    sch_tel = Column(Integer)
    
    def __repr__(self):
        # 查詢的時候顯示的是值,需不是一個記憶體地址。
        return "school name:{},tel:{}".format(self.sch_name,self.sch_tel)

 

 

動態新增表字段。

def add_filed(table_name,):
for i in range(3):

    setattr(table_class,'Col'+str(i),(Column('Col'+str(i), String(50),comment='Col'+str(i))))

Base.metadata.create_all(engine)

 

 

建立表:

Base.metadata.create_all(engine)
# 刪除表 Base.metadata.drop_all(engine)

增加一條記錄:

from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=engine)        # 建立一個與資料庫的會話,生成的物件是類
session = Session()                        # 例項化這個會話類

class AddRecord(object):
    # 互動新增記錄類。
    def __init__(self,*args):
        # self.table_name = table_name
        self.school_feild = {'sch_name':'學校名:','sch_addr':'學校地址:','sch_tel':'學校電話:'}
    
    def school(self):
        f = {}
        for k,v in self.school.items():
            f[k] = input('{}'.format(v)).strip()
        return f            # 返回一個字典
        
add = AddRecord()
school_attr = add.school()    # 獲取輸入的值
# 下面a1是建立一條記錄
a1 = School(sch_name = school_attr[sch_name],sch_addr= school_attr[sch_addr],sch_tel = school_attr[sch_tel])
session.add(a1)                # 把記錄新增到資料庫
session.commit()               # 提交到資料庫,最終對資料庫更改。


 增、查、

這四個操作都需要匯入sessionmaker ,用sessionmaker建立一個會話,除了建庫外,所有操作都在會話中完成。

from sqlalchemy.orm import sessionmaker
Session = sessionmaker()
session =Session()

 

 增:

 





add(記錄)
記錄:由表類生成的例項 增加一條記錄

d1=School(name='DL',addr='GJZ')

session.add(d1)

add_all(記錄列表)
記錄列表:由表類生成的例項列表 同時增加多條記錄

d1=School(name='DL',addr='GJZ')

d2=School(name='BJ',addr='HLG')

session.add([d1,d2])






 查:

 這三個操作都需要用到query

query 引數 作用
query(table)

table:表名。

 

查詢表中所有資料

以表結構__repr__中定義的格式顯示

session.query(table).all()
query(table.field1,table.field2,……)

table:表名。

field:欄位名,可多個。

顯示指定欄位,不以表結構__repr__格式輸出 session.query(table.id,table.name).all()
結果顯示的方法:


all()
顯示所有結果,結果是一個列表 session.query(table).all()
first()
只顯示第一個結果 session.query(table).first()
one()

如果沒有獲得結果或者返回了多個結果,則會產生一個 error

結果是一個例項類。

a=session.query(table).filter(table.id==6).one()

a.id#就可以檢視id的值。

scalar()

幹啥用的?

感覺和one一樣。。。


count()
計數 session.query(table).count()
[m:n]
切片,讀取指定的結果和列表切片一樣 session.query(table)[:3]
offset(2)
從第3條資料開始讀 session.query(table).offset(2).all()
limit(3)
只顯示前三條 session.query(table).limit(3).all()
order_by


desc

降序

查詢結果降序顯示

session.query(t).order_by(t.id.desc()).filter(t.id>3).all()
asc
升序

查詢結果升序顯示

session.query(t).order_by(t.id.asc()).filter(t.id>3).all()




query給表和欄位重新命名


lable()

給欄位重新命名

呼叫時可以呼叫lable內的名字。

a=session.query(table.filed.lable('other')).all()

for i in a:

      print (i.other)

aliased
給表重新命名,需要匯入 from sqlalchemy.orm import aliased
table_a = aliased(table)

query使用text執行SQL語句

text需要匯入

 


 

from sqlalchemy import text

text('SQL') SQL:SQL語句或SQL表示式或表的欄位



在filter()方法中使用

指定過濾條件 session.query(t).filter(text('id >2')).all()

在order_by()方法中使用 指定排序的欄位 session.query(t).order_by(text('id desc')).all()

在from_statement()方法中使用 執行完整的SQL語句

session.query(t).from_statement(text(

'select * from talbe where id>2'

))

params(變數1=值1,變數2=值2...)

TEXT中的變數前面必須加冒號。

例:text('id = id')

給text中的變數指定值。

session.query(t).filter(text('id>:id')).params(id=2).all()

相當於text('id>2')

query.filter 條件查詢

支援所有SQL條件表示式(where部分)




filter(表名.欄位 == 值)

表名.欄位:固定格式,不能省略表名。

==:條件表示式,==,>=,<=,>,<,!=

根據條件查詢資料 session.query(table).filter(table.id <2).all()
filter(表名.欄位.關鍵字(值) )

表名.欄位:固定格式,不能省略表名。

關鍵字:in_,like,notin_,notlike,between,contains,is_,notis

值:可以是列表,元組,字串,數字

       字串:可以用萬用字元%,例:"%a%",單字元通配 _

1在列表裡

2沒在列表裡

3匹配字串

4不匹配字串

5匹配字串,不區分大小寫

6不匹配字串,不區分大小寫

7在2和6之間,含2和6

8包含字元a

9field是真

10field是不是真

11匹配索引,匹配表報錯

1-filter(table.field.in_([1,2,'1']))

2-filter(table.field.notin_([1,2,'1']))

3-filter(table.field.like('%a%'))

4-filter(table.field.notlike('%a%'))

5-filter(table.field.ilike('%a%'))

6-filter(table.field.notilike('%a%'))

7-filter(table.field.between(2,6))

8-filter(table.field.contains('a'))

9-filter(table.field.is_(True))

10-filter(table.field.notis(True))

11-filter(table.field.match('dage'))

fileter多條件查詢,or_,and_


filter(邏輯運算子(條件表示式1,條件表示式2,......))
邏輯運算子:and_,or_

需要從sqlalchemy匯入

from sqlalchmy import and_,or_

filter(and_(table.id >1,table.id<6))

filter(or_(table.id>1,table.name=='dage'))





func寫在query方法裡,可以和欄位一起顯示。


func.引數

avg:求平均值

count:統計數量

sum:求和

max:最大

min:最小

對分組和欄位進行簡單的數學運算。

如果不符合邏輯會提示錯誤“Inaggregated query without GROUP BY

session.query(table_b.name

,func.count(table_b.age),

func.min(table_b.age)).group_by(table_b.name).all()

這一句是對name分組,顯示name,顯示每個name的個數,和每組最小的age

 

改:

有兩種方法:


query.fileter(條件).update(值) 值:字典格式。{表名.欄位:值} 更新所有filter篩選的記錄的指定欄位 session.query(table_b).filter(table_b.name == 'erge').update({table_b.score : 60})
例項.欄位=值

先建立一個查詢結果的例項

再通過例項改欄位的值

提交更改

sql=seesion.query(table_b).filter(table_b.id ==3).first()

sql.name = 'new name'

session.commit()

例項.欄位=值

批量更新



sql=seesion.query(table_b).filter(table_b.age ==20).all()

for i in range(len(sql)):

    sql(i).score= 100

session.commit()

update(值,synchronize_session=False)
synchronize_session=False立即提交,更新的時候速度更快,同時批量更新時不加此引數報錯 session.query(table_b).filter(table_b.name == 'erge').update({table_b.score : 60},synchronize_session =False)
user = User(id=1, name='通過主鍵改內容')
session.merge(user)

merge的作用是合併,查詢primary key是否一致,一致則合併,不一致則新建

 

刪:





delete()

qurey後面加limit.all,first,等方法時,

不能直接加delete()

需要使用for刪除。


sql1 = session.query(table_b).filter(and_(table_b.name == 'dage', table_b.age == 19)).delete()

或者:

sql1 = session.query(table_b).filter(and_(table_b.name == 'dage', table_b.age == 19).first()

sql1.delete()

in_查詢的結果,刪除會報錯


錯誤提示:

sqlalchemy.exc.InvalidRequestError: Could not evaluate current criteria in Python.  Specify 'fetch' or False for the synchronize_session parameter

解決方法:

在delete中,新增synchronize_session=False,含義是同步刪除。

delete(synchronize_session=False)

sql1 = session.query(table_b).filter(table_b.name.in_('dage', 'erge')).delete(synchronize_session=False)

刪除指定條數 刪除d開頭的前三條記錄

sql1 = session.query(table_b).filter(table_b.name.like('d%')).limit(3).all()

for i in sql1:

       session.delete(i)

 

 

一個基本表的建立與操作:

 

from sqlalchemy import create_engine, ForeignKey, Table, Column, String, Integer, Boolean, Enum
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy.ext.declarative import declarative_base
mysql_name = 'david'
mysql_pw = 'Yaotiao&shunv666'
mysql_server = '192.168.2.120'
database_name = 'test'


# 1、連線資料庫
engine = create_engine("mysql+pymysql://{name}:{pw}@{server}/{database}".
                        format(mysql_name,mysql_pw,mysql_server,database_name),
                        encoding ='utf-8'
                        )

# 2、生成ORM基類
Base = declarative_base()


# 3、繼承基類,定義表結構
class Product(Base):
    # 建立orm物件
    __tablename__ = 'products'                # 資料表名
    id = Column(Integer, primary_key = True)    # 欄位,設為主鍵,預設不用賦值,此欄位會自增
    name = Column(String(32))
    price = Column(Integer)

    def __repr__(self):
        # 列印查詢結果例項顯示結果,不加此項顯示的是類的記憶體地址
        return "name:{},price:{}".format(self.name,self.price)
    
# 4、建立資料表
Base.metadata.create_all(engine)                # 在資料庫中建立表,已存在則不建立


# 5、新增多條資料
product1 = Product(name = '華為100', price = 999) # 建立兩個例項
product2 = Product(name = '華為400', price = 1999)
session.add_all([product1,product2 ])    # add_all把例項列表新增到資料庫

# 6、新增單條資料
product = Product(name = '一條', price = 9999)
session.add(product)


# 7、查詢資料

query_data = session.query(Product).all()       # 查所有,如果不加__repr__,結果是記憶體地址列表,使用for檢視

for i in query_data:
    # 檢視結果
    print (i.id,i.name,i.price)

# 8、修改一項資料
query_data = session.query(Product).filter(Product.name=='一條').first()
query_data.name = 'yitiao'        # 只改name一個數據

# 9、批量更新:把所有華為開頭的產品價格改成10000
session.query(Product).filter(Product.name.like('華為%')).update({Product.price:10000},synchronize_session =False)


# 10、通過主鍵改內容merge:
update_name = Product(id=3,price=3651) 
session.merge(update_name)                # id 和 name不變,只修改了價格price = 3651

# 11、刪除一條資料:把華為開頭的價格最高的一條資料刪掉
del_product = session.query(Product).fileter(Product.price.like('華為%'))).order_by(Product.price.desc()).first()
session.delete(del_product)

# 12、刪除所有符合條件的資料:
session.query(Product).filter(Product.name.like('華為%')).delete(synchronize_session = False)


 

 問題集:http://blog.51cto.com/13914991/2235437