python中使用原生sql操作資料庫
使用 sqlalchemy 有3種方式:
方式1, 使用raw sql;
方式2, 使用SqlAlchemy的sql expression;
方式3, 使用ORM.
前兩種方式可以統稱為 core 方式. 本文講解 core 方式訪問資料庫, 不涉及 ORM.
對於絕大多數應用, 推薦使用 SqlAlchemy. 即使是使用raw sql, SqlAlchemy 也可以帶來如下好處:
1. 內建資料庫連線池. [注意]如果是sqlalchemy+cx_oracle的話, 需要禁掉 connection pool, 否則會有異常. 方法是設定sqlalchemy.poolclass為sqlalchemy.pool.NullPool
2. 強大的log功能
3. 資料庫中立的寫法, 包括: sql引數寫法, limit語法
4. 特別提一下, where()條件的==your_value, 如果your_value等於None, 真正的Sql會轉為Is None
SqlAlchemy的sql expression和raw sql的比較:
1. sql expression 寫法是純python程式碼, 閱讀性更好, 尤其是在使用insert()方法時, 欄位名和取值成對出現.
2. raw sql 比 sql expression 更靈活, 如果SQL/DDL很複雜, raw sql就更有優勢了.
==============================
sqlalchemy 超簡單教程
==============================
http://solovyov.net/en/2011 /04/23/basic-sqlalchemy/
http://flask.pocoo.org/docs/patterns/sqlalchemy/#sql-abstraction-layer
http://www.blog.pythonlibrary.org/2010/09/10/sqlalchemy-connecting-to-pre-existing-databases/
http://www.blog.pythonlibrary.org/2010/02/03/another-step-by-step-sqlalchemy-tutorial-part-1-of-2/
http://www.rmunn.com/sqlalchemy-tutorial/tutorial.html
http://mapfish.org/doc/tutorials/sqlalchemy.html
sqlalchemy 官網的pdf文件, 可以作為 reference 使用
==============================
常用的資料庫連線字串
==============================
#sqlite
sqlite_db = create_engine('sqlite:////absolute/path/database.db3')
sqlite_db = create_engine('sqlite://') # in-memory database
sqlite_db = create_engine('sqlite:///:memory:') # in-memory database
# postgresql
pg_db = create_engine('postgres://scott:[email protected]/mydatabase')
# mysql
mysql_db = create_engine('mysql://scott:[email protected]/mydatabase')
# oracle
oracle_db = create_engine('oracle://scott:[email protected]:1521/sidname')
# oracle via TNS name
oracle_db = create_engine('oracle://scott:[email protected]')
# mssql using ODBC datasource names. PyODBC is the default driver.
mssql_db = create_engine('mssql://mydsn')
mssql_db = create_engine('mssql://scott:[email protected]')
# firebird
firebird_db = create_engine('firebird://scott:[email protected]/sometest.gdm')
==============================
關於一些非主流資料庫缺少DB API介面的問題
==============================
比如teradata, 沒有專門的DB API實現, 但 odbc driver肯定會提供的, 否則就無法在江湖上混了. pypyodbc + ODBC driver 應該是一個選項. pypyodbc 和 pyodbc介面一致, 同時它是純 python實現, 理論上應該支援 python/ironpython/jython. 另外, 我猜想sqlalchemy應該能基於這一組合訪問所有的資料庫, 待驗證.
pypyodbc 主頁: http://code.google.com/p/pypyodbc/
==============================
#connnectionless執行和connnection執行
==============================
1. 直接使用engine執行sql的方式, 叫做connnectionless執行,
2. 先使用 engine.connect()獲取conn, 然後通過conn執行sql, 叫做connection執行
如果要在transaction模式下執行, 推薦使用connection方式, 如果不涉及transaction, 兩種方法效果是一樣的.
==============================
#sqlalchemy推薦使用text()函式封裝一下sql字串
==============================
好處巨多:
1. 不同資料庫, 可以使用統一的sql引數傳遞寫法. 引數須以:號引出. 在呼叫execute()的時候, 使用dict結構將實參傳進去.
from sqlalchemy import text
result = db.execute(text('select * from table where id < :id and typeName=:type'), {'id': 2,'type':'USER_TABLE'})
2. 如果不指定parameter的型別, 預設為字串型別; 如果要傳日期引數, 需要使用text()的bindparams引數來宣告
from sqlalchemy import DateTime
date_param=datetime.today()+timedelta(days=-1*10)
sql="delete from caw_job_alarm_log where alarm_time<:alarm_time_param"
t=text(sql, bindparams=[bindparam('alarm_time_param', type_=DateTime, required=True)])
db.execute(t,{"alarm_time_param": date_param})
引數bindparam可以使用type_來指定引數的型別, 也可以使用 initial 值來指定引數型別
bindparam('alarm_time_param', type_=DateTime) #直接指定引數型別
bindparam('alarm_time_param', DateTime()) #使用初始值指定引數型別
3. 如要轉換查詢的結果中的資料型別, 可以通過text()的引數typemap引數指定. 這點比mybatis還靈活,
t = text("SELECT id, name FROM users",
typemap={
'id':Integer,
'name':Unicode
}
)
4. 還有其他, 詳見sqlalchemy\sql\expression.py中的docstring.
==============================
#sqlalchemy訪問資料庫的示例
==============================
#-----------------------------------
#獲取資料庫
#-----------------------------------
from sqlalchemy import create_engine
db=create_engine("sqlite:///:memory:", echo=True)
#-----------------------------------
#DDL
#-----------------------------------
db.execute("create table users(userid char(10), username char(50))")
#-----------------------------------
#DML
#-----------------------------------
resultProxy=db.execute("insert into users (userid,username) values('user1','tony')")
resultProxy.rowcount #return rows affected by an UPDATE or DELETE statement
#-----------------------------------
#Query
#-----------------------------------
resultProxy=db.execute("select * from users")
resultProxy.close(), resultProxy 用完之後, 需要close
resultProxy.scalar(), 可以返回一個標量查詢的值
ResultProxy 類是對Cursor類的封裝(在檔案sqlalchemy\engine\base.py),
ResultProxy 類有個屬性cursor即對應著原來的cursor.
ResultProxy 類有很多方法對應著Cursor類的方法, 另外有擴充套件了一些屬性/方法.
resultProxy.fetchall()
resultProxy.fetchmany()
resultProxy.fetchone()
resultProxy.first()
resultProxy.scalar()
resultProxy.returns_rows #True if this ResultProxy returns rows.
resultProxy.rowcount #return rows affected by an UPDATE or DELETE statement. It is not intended to provide the number of rows present from a SELECT.
****遍歷ResultProxy時, 得到的每一個行都是RowProxy物件, 獲取欄位的方法非常靈活, 下標和欄位名甚至屬性都行. rowproxy[0] == rowproxy['id'] == rowproxy.id, 看得出 RowProxy 已經具備基本 POJO 類特性.
#-----------------------------------
#使用transaction
#-----------------------------------
#SqlAlchemy支援支援事務, 甚至事務可以巢狀. 預設事務是自動提交,即執行一條SQL就自動提交。
#-如果更精準地控制事務, 最簡單的方法是使用 connection, 然後通過connection獲取transaction物件
connection = db.connect()
trans = connection.begin()
try:
dosomething(connection)
trans.commit()
except:
trans.rollback()
#-還有一種方式是,在建立engine時指定strategy='threadlocal'引數,這樣會自動建立一個執行緒區域性的連線,對於後續的無連線的執行都會自動使用這個連線,這樣在處理事務時,只要使用 engine 物件來操作事務就行了。如:
#參見 http://hi.baidu.com/limodou/blog/item/83f4b2194e94604043a9ad9c.html
db = create_engine(connection, strategy='threadlocal')
db.begin()
try:
dosomething()
except:
db.rollback()
else:
db.commit()
#-預設事務是自動提交,即執行一條SQL就自動提交. 也可以在connection和statement上通過execution_options()方法修改為手動commit模式
conn.execution_options(autocommit=False)
設定為手動提交模式後, 要提交, 需要呼叫conn.commit()
#-----------------------------------
#如何使用 pyDbRowFactory
#-----------------------------------
#pyDbRowFactory是我開發的一個通用RowFactory, 可以繫結cursor和你的 model pojo 類, 新版本的pyDbRowFactoryResultProxy. 下面示例是pyDbRowFactory的最基本用法
#方法1, 使用 cursor物件
cursor=resultProxy.cursor
from pyDbRowFactory import DbRowFactory
rowFactory=DbRowFactory(cursor, "your_module.your_row_class")
lst=factory.fetchAllRowObjects()
#方法2, 直接使用 resultProxy
from pyDbRowFactory import DbRowFactory
factory=DbRowFactory.fromSqlAlchemyResultProxy(resultProxy, "your_module.your_row_class")
lst=factory.fetchAllRowObjects()
前面講過, SQLAlchemy使用 ResultProxy封裝了cursor, ResultProxy的每一個行記錄是一個RowProxy 類物件. RowProxy 使用起來非常方便, 對於查詢select userName from users,
每一個行結果都可以使用rowproxy來訪問, 寫法相當靈活. rowproxy.userName=rowproxy["userName"]==rowproxy[0], 所以有了RowProxy, 很多時候, 沒有必要再為每個表建立一個 model pojo 類.
#-----------------------------------
#連線池
#-----------------------------------
sqlalchemy 預設的連線池演算法選用規則為:
1.連線記憶體中的sqlite, 預設的連線池演算法為 SingletonThreadPool 類, 即每個執行緒允許一個連線
2.連線基於檔案的sqlite, 預設的連線池演算法為 NullPool 類, 即沒有連線池
3.對於其他情況, 預設的連線池演算法為 QueuePool 類
當然, 我們也可以實現自己的連線池演算法,
db = create_engine('sqlite:///file.db', poolclass=YourPoolClass)
create_engine()函式和連線池相關的引數有:
-pool_recycle, 預設為-1, 推薦設定為7200, 即如果connection空閒了7200秒, 自動重新獲取, 以防止connection被db server關閉.
-pool_size=5, 連線數大小,預設為5,正式環境該數值太小,需根據實際情況調大
-max_overflow=10, 超出pool_size後可允許的最大連線數,預設為10, 這10個連線在使用過後, 不放在pool中, 而是被真正關閉的.
-pool_timeout=30, 獲取連線的超時閾值, 預設為30秒
#-----------------------------------
#log輸出
#-----------------------------------
--如果只需在sys.stdout輸出, 用不著引用 logging 模組就能實現
db = create_engine('sqlite:///file.db', echo=True)
--如果要在檔案中輸出, log檔案不具備rotate功能, 不推薦在生產環境中使用.
import logging
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
#-----------------------------------
#使用 sqlalchemy core 的最佳實踐
#-----------------------------------
我不太喜歡使用ORM方式, 主要是ORM學習成本比較高, 另外, 構建複雜的查詢也比較困難. 更多的時候是使用raw sql和sql expression方法.
1. declarative 是 SqlAlchemy的一個新的擴充套件, 只能用在 ORM 中, 不能用在SQL Expression中
2. 如果要使用ORM時, table必須有主鍵; 使用 raw sql和sql expression, 沒有這個約束.
使用心得:
1. 查詢不管是否複雜, 直接使用 raw sql; 增刪改多是單表操作, 使用sql expression 就足夠了.
2. 具體講, 對於增刪改, 比如一個User類, 可包含一個固定的 _table 的成員, _table=Table('users', metadata, autoload=True), 增刪改直接使用_table物件來完成.
3. 對於查詢, 若結果集能對映到一個實體物件, 使用pyDbRowFactory完成物件例項化. 若結果集涉及多個實體, 直接使用ResultProxy, ResultProxy的每一行物件也具有基本的物件特徵, 多數情況下沒有必要再專門對映成一個特別的類.
4. 表之間關係的處理, 比如: users 表和 addresses 表, 存在 1:n的關係, 對應地, User 類也會有個AddressList的成員, 在實體化一個User物件後, 我們可立即查詢 addresses 表, 獲取該使用者的address列表, 分2步就可以完成這種1:n的關係對映.
使用 sqlalchemy 的寫法太靈活了, 下面僅僅是我喜歡的一種寫法, 僅僅從排版看, 就相當漂亮.
構建insert語句: _table.insert().values(f1=value1,f2=value2,)
構建update語句: _table.update().values(f1=newvalue1,f2=newvalue2).where(_table.c.f1==value1).where(_table.c.f2==value2)
構建delete語句: _table.delete().where(_table.c.f1==value1).where(_table.c.f2==value2)
批量insert/update/delete, 將每行資料組成一個dict, 再將這些dict組成一個list, 和_table.insert()/update()/delete()一起作為引數傳給 conn.execute().
conn.execute(_table.insert(), [
{’user_id’: 1, ’email_address’ : ’jack@yahoo.com’},
{’user_id’: 1, ’email_address’ : ’jack@msn.com’},
{’user_id’: 2, ’email_address’ : ’www@www.org’},
{’user_id’: 2, ’email_address’ : ’wendy@aol.com’},
])
sql expression 也可以像raw sql的text函式一樣使用bindparam, 方法是, 在呼叫insert()/update()/delete()時宣告引數, 然後在conn.execute()執行時候, 將實參傳進去.
d=_table.delete().where(_table.c.hiredate<=bindparam("hire_day",DateTime(), required=True))
conn.execute(d, {"hire_day":datetime.today()})
where()和ORM中的filter()接受的引數是一樣, 各種SQL條件都支援.
#equals:
where(_table.c.name == ’ed’)
#not equals:
where(_table.c.name != ’ed’)
#LIKE:
where(_table.c.name.like(’%ed%’))
#IN:
where(_table.c.name.in_([’ed’, ’wendy’, ’jack’]))
#NOT IN:
where(~_table.c.name.in_([’ed’, ’wendy’, ’jack’]))
#IS NULL:
where(_table.c.name == None)
#IS NOT NULL:
where(_table.c.name != None)
#AND:
from sqlalchemy import and_
where(and_(_table.c.name == ’ed’, _table.c.fullname == ’Ed Jones’))
#AND也可以通過多次呼叫where()來實現
where(_table.c.name == ’ed’).where(_table.c.fullname == ’Ed Jones’)
#OR:
from sqlalchemy import or_
where(or_(_table.c.name == ’ed’, _table.c.name == ’wendy’))
#match: The contents of the match parameter are database backend specific.
where(_table.c.name.match(’wendy’))
--==========================
--python file: mydatabase.py
--==========================
from sqlalchemy import create_engine
from sqlalchemy.schema import MetaData
#db = create_engine('sqlite:///:memory:', echo=True)
db = create_engine('sqlite:///c://caw.sqlite.db', echo=True)
metadata = MetaData(bind=db)
--==========================
--python file: dal.py
--==========================
from sqlalchemy.sql.expression import text, bindparam
from sqlalchemy.sql import select,insert, delete, update
from sqlalchemy.schema import Table
from mydatabase import db,metadata
from pyDbRowFactory import DbRowFactory
class caw_job(object):
FULL_NAME="dal.caw_job"
tablename="caw_job"
_table=Table(tablename, metadata, autoload=True)
def __init__(self):
self.app_domain =None
self.job_code =None
self.job_group =None
self.cron_year =None
self.cron_month =None
self.cron_day =None
self.cron_week =None
self.cron_day_of_week =None
self.cron_hour =None
self.cron_minute =None
self.description =None
@classmethod
def getEntity(cls, app_domain, jobCode):
sql="select * from caw_job where app_domain=:app_domain and job_code=:job_code";
resultProxy=db.execute(text(sql),{'app_domain':app_domain,
'job_code':jobCode})
DbRowFactory.fromSqlAlchemyResultProxy(resultProxy, cls.FULL_NAME)
return DbRowFactory.fetchOneRowObject()
def insert(self):
i=self._table.insert().values(
app_domain =self.app_domain ,
job_code =self.job_code ,
job_group =self.job_group ,
cron_year =self.cron_year ,
cron_month =self.cron_month ,
cron_day =self.cron_day ,
cron_week =self.cron_week ,
cron_day_of_week=self.cron_day_of_week,
cron_hour =self.cron_hour ,
cron_minute =self.cron_minute ,
description =self.description ,
)
db.execute(i)
def update(self):
u=self._table.update().values(
app_domain =self.app_domain ,
job_code =self.job_code ,
job_group =self.job_group ,
cron_year =self.cron_year ,
cron_month =self.cron_month ,
cron_day =self.cron_day ,
cron_week =self.cron_week ,
cron_day_of_week =self.cron_day_of_week ,
cron_hour =self.cron_hour ,
cron_minute =self.cron_minute ,
description =self.description , ,
).where(self._table.c.app_domain==self.app_domain)\
.where(self._table.c.job_code==self.job_code)
db.execute(u)
def delete(self):
d=self._table.delete().where(self._table.c.app_domain==self.app_domain)\
.where(self._table.c.job_code==self.job_code)
db.execute(d)
#-----------------------------------
#使用sqlalchemy.ext.declarative 來生成表, 所有的表都必須有主鍵.
#在系統初期, 資料模型往往需要經常調整, 使用這種方式修改表結構更方便些.
#-----------------------------------
--python file: models.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String , Boolean, DateTime, Float
engine = create_engine('sqlite:///:memory:', echo=True)
Base = declarative_base()
# ddl_caw_job 是專門用來生成資料庫物件的, 沒有其他用處
class ddl_caw_job(Base):
__tablename__="caw_job"
job_name =Column(String(200), primary_key=True)
job_group =Column(String(200))
def init_db():
Base.metadata.create_all(bind=engine,)
相關推薦
python中使用原生sql操作資料庫
使用 sqlalchemy 有3種方式: 方式1, 使用raw sql; 方式2, 使用SqlAlchemy的sql expression; 方式3, 使用ORM. 前兩種方式可以統稱為 core 方式. 本文講解 core 方式訪問資料庫, 不涉及 O
laravel+Mysql 中DB原生SQL操作報1292 Truncated incorrect DOUBLE value問題解決
報1292 Truncated incorrect DOUBLE value問題的,基本都是因為SQL寫的不標準或者寫錯了。 昨天接到一個批量更新的需求。因為laravel不支援批量的更新操作,故使用了原生的sql進行拼接(可能laravel支援批量,我不知道)。 在做一個
Revit中Dynamo程式設計——在Python Script中結合sql server資料庫來儲存材料的外觀屬性
我們的Dynamo的Python Script環境其實也是 IronPython的一種,結合以上部落格可以很好的結合資料庫。 我的這篇讀取材料的外觀屬性,並存儲到sql server資料庫中。今天我來修改一下,把其中的的讀取過程做成dll檔案,然後在Pyt
Python中的列表操作
.so 排序 語句 res 發揮 第一個元素 刪除列 extend 刪除元素 Python的列表操作可謂是功能強大且方便(相對於Java)簡單、常規的操作就不說了(這不是一個入門教程),介紹幾個很有特點的例子添加 # 追加到結尾(append) li = [1, 2
【Python】Python中的列表操作
元素 提取 添加 sta 連接 not n個元素 none 格式 Python的列表操作可謂是功能強大且方便(相對於Java)簡單、常規的操作就不說了(這不是一個入門教程),介紹幾個很有特點的例子添加 # 追加到結尾(append) li = [1, 2, 3, 4, 5
python中列表切片操作
tar 元素 append() 插入 根據 ins class ever sta 1 a=[‘zhao‘,‘qian‘,‘sun‘,‘li‘,‘zhou‘,‘‘] 2 3 #增刪改查 4 #查 切片 [] 5 print(a[1:])#取到最後 6 pr
python中的 sql語句用法
mod location drop rop class turn price pytho 初始 函數中應用sql語句def _get_cust_number(self,cr,uid,ids,field_name,args,context=None): res={
Python 中的字典操作
語言 clear 組成 {} 返回 括號 做了 進行 不返回 字典:dict 字典在其他編程語言中又稱作關聯數組或散列表 通過鍵實現元素存取: 無序集合,可變類型容器,長度可變,異構,嵌套 表示方法: phonebook = {‘Alice‘:‘1234‘,‘Beth‘:
python中字串的操作方法
python中字串的操作方法大全 更新時間:2018年06月03日 10:08:51 作者:駿馬金龍 我要評論這篇文章主要給大家介紹了關於python中字串操作方法的相關資料,文中通過示例程式碼詳細介紹了關於python中字串的大小寫轉換、isXXX判斷、填充、子串搜尋、替換、分割、join以及修剪:st
python中字串的操作方法大全
轉自:https://www.jb51.net/article/141376.htm 這篇文章主要給大家介紹了關於python中字串操作方法的相關資料,文中通過示例程式碼詳細介紹了關於python中字串的大小寫轉換、isXXX判斷、填充、子串搜尋、替換、分割、join以及修剪:strip、l
Python中字串常用操作
字串常用操作 (1)find 檢測指定字串是否包含在當前字串中,如果是返回開始的索引值,否則返回-1 strs.find(str,start,end) start 為起始位置,end為結束位置 (2)index 作用和find()方法一樣,不過當要查詢的字串不存在時
python中list常用操作(不包括切片)
stus = ['abc‘,’dec',’dxq‘,’wzw‘] #下標,索引,角標 stus[3] stus = [] #空陣列 stus = list() #空列表 #增加元素 stus.append('zhaoyan') #在列表末尾增加一個元素 stus.inse
python中檔案及其操作(File)
檔案是用於資料儲存的單位 檔案通常用於長期儲存資料 檔案中是以位元組為單位順序儲存資料的 檔案的操作流程 開啟檔案 讀/寫檔案 關閉檔案 注: 任何的作
Python中字典的操作
存在 最好 是否 bag ems pri mon one 報錯 1、字典key-value,key是不能重復的stu_info={"name":"王誌華","age":18,"addr":"北京"}2、取值,查print(stu_info[‘name‘])print(stu
Python中檔案的操作
檔案的開啟和關閉: f = open('test.txt','w') r:以只讀方式開啟檔案。檔案的指標將會放在檔案的開頭。這是預設模式。 w:開啟一個檔案只用於寫入。如果該檔案已存在則將其覆蓋。如果該檔案不存在,建立新檔案。 a:開啟一個檔案用於追加。如果該檔案已存在,檔案指標將會放在
python中 os模組操作檔案路徑
python中的os.path模組用法: dirname() 用於去掉檔名,返回目錄所在的路徑 如: >>> import os >>> os.path.dirname('d:\\library\\book.txt') >>>
laravel原生sql操作以及like模糊查詢的坑點
此篇文章基於 laravel版本:5.5 laravel基於DB的原生查詢可以使用 DB facade 執行查詢。DB facade 為每種型別的查詢提供了方法:select、update、insert、delete 和 statement。以下是知識點: (1)執行
Python中String字串操作
關於字串的一些操作方法: ascii:字母,數字,特殊字元:1個位元組,8位 Unicode:16位 兩個位元組 升級 32 位 四個位元組 utf-8:最少一個位元組 8位表示。 英文字母 8位 1個位
Python 中 "is" 與 "==" 操作有什麼區別?
在 Python 中,比較兩個物件(變數)是否相等,可以用 “is” 和 “==” 操作,但它倆有什麼區別?什麼時候用 “is”,什麼時候用 “==” ?在面試時,發現不少候選人很難把這兩者完全說清楚,因此在這篇文章中,「Python之禪」將對二者進行深入淺出的對比介紹。 先舉個例子 小黃最
python中關於檔案操作的總結
本來想對一個檔案重複執行某一個函式,但是隻有第一次執行函式時有結果,後面的幾次都沒有結果,最終發現是在函式外開啟檔案,第一次執行沒有問題,第二次執行時,檔案已經開啟,函式找不到該檔案了,或者是檔案沒有關閉,具體原因需要檢視python內部的執行機制。舉例如下: 定義一個函式,輸出文字檔案裡的