1. 程式人生 > >python中使用原生sql操作資料庫

python中使用原生sql操作資料庫

使用 sqlalchemy 有3種方式:
方式1, 使用raw sql; 
方式2, 使用SqlAlchemy的sql expression; 
方式3, 使用ORM.  
前兩種方式可以統稱為 core 方式. 本文講解 core 方式訪問資料庫, 不涉及 ORM. 

對於絕大多數應用, 推薦使用 SqlAlchemy. 即使是使用raw sql, SqlAlchemy 也可以帶來如下好處: 
1. 內建資料庫連線池. [注意]如果是sqlalchemy+cx_oracle的話, 需要禁掉 connection pool, 否則會有異常. 方法是設定sqlalchemy.poolclass為sqlalchemy.pool.NullPool
2.
強大的log功能 3. 資料庫中立的寫法, 包括: sql引數寫法, limit語法 4. 特別提一下, where()條件的==your_value, 如果your_value等於None, 真正的Sql會轉為Is None SqlAlchemy的sql expression和raw sql的比較: 1. sql expression 寫法是純python程式碼, 閱讀性更好, 尤其是在使用insert()方法時, 欄位名和取值成對出現. 2. raw sql 比 sql expression 更靈活, 如果SQL/DDL很複雜, raw sql就更有優勢了. ============================== sqlalchemy 超簡單教程 ============================== http://solovyov.net/en/2011
/04/23/basic-sqlalchemy/ http://flask.pocoo.org/docs/patterns/sqlalchemy/#sql-abstraction-layer http://www.blog.pythonlibrary.org/2010/09/10/sqlalchemy-connecting-to-pre-existing-databases/ http://www.blog.pythonlibrary.org/2010/02/03/another-step-by-step-sqlalchemy-tutorial-part-1-of-2/ http://www.rmunn.com/sqlalchemy-tutorial/tutorial.html http://mapfish.org/doc/tutorials/sqlalchemy.html sqlalchemy 官網的pdf文件, 可以作為 reference 使用 ============================== 常用的資料庫連線字串 ============================== #sqlite
sqlite_db = create_engine('sqlite:////absolute/path/database.db3') sqlite_db = create_engine('sqlite://') # in-memory database sqlite_db = create_engine('sqlite:///:memory:') # in-memory database # postgresql pg_db = create_engine('postgres://scott:[email protected]/mydatabase') # mysql mysql_db = create_engine('mysql://scott:[email protected]/mydatabase') # oracle oracle_db = create_engine('oracle://scott:[email protected]:1521/sidname') # oracle via TNS name oracle_db = create_engine('oracle://scott:[email protected]') # mssql using ODBC datasource names. PyODBC is the default driver. mssql_db = create_engine('mssql://mydsn') mssql_db = create_engine('mssql://scott:[email protected]') # firebird firebird_db = create_engine('firebird://scott:[email protected]/sometest.gdm') ============================== 關於一些非主流資料庫缺少DB API介面的問題 ============================== 比如teradata, 沒有專門的DB API實現, 但 odbc driver肯定會提供的, 否則就無法在江湖上混了. pypyodbc + ODBC driver 應該是一個選項. pypyodbc 和 pyodbc介面一致, 同時它是純 python實現, 理論上應該支援 python/ironpython/jython. 另外, 我猜想sqlalchemy應該能基於這一組合訪問所有的資料庫, 待驗證. pypyodbc 主頁: http://code.google.com/p/pypyodbc/ ============================== #connnectionless執行和connnection執行 ============================== 1. 直接使用engine執行sql的方式, 叫做connnectionless執行, 2. 先使用 engine.connect()獲取conn, 然後通過conn執行sql, 叫做connection執行 如果要在transaction模式下執行, 推薦使用connection方式, 如果不涉及transaction, 兩種方法效果是一樣的. ============================== #sqlalchemy推薦使用text()函式封裝一下sql字串 ============================== 好處巨多: 1. 不同資料庫, 可以使用統一的sql引數傳遞寫法. 引數須以:號引出. 在呼叫execute()的時候, 使用dict結構將實參傳進去. from sqlalchemy import text result = db.execute(text('select * from table where id < :id and typeName=:type'), {'id': 2,'type':'USER_TABLE'}) 2. 如果不指定parameter的型別, 預設為字串型別; 如果要傳日期引數, 需要使用text()的bindparams引數來宣告 from sqlalchemy import DateTime date_param=datetime.today()+timedelta(days=-1*10) sql="delete from caw_job_alarm_log where alarm_time<:alarm_time_param" t=text(sql, bindparams=[bindparam('alarm_time_param', type_=DateTime, required=True)]) db.execute(t,{"alarm_time_param": date_param}) 引數bindparam可以使用type_來指定引數的型別, 也可以使用 initial 值來指定引數型別 bindparam('alarm_time_param', type_=DateTime) #直接指定引數型別 bindparam('alarm_time_param', DateTime()) #使用初始值指定引數型別 3. 如要轉換查詢的結果中的資料型別, 可以通過text()的引數typemap引數指定. 這點比mybatis還靈活, t = text("SELECT id, name FROM users", typemap={ 'id':Integer, 'name':Unicode } ) 4. 還有其他, 詳見sqlalchemy\sql\expression.py中的docstring. ============================== #sqlalchemy訪問資料庫的示例 ============================== #----------------------------------- #獲取資料庫 #----------------------------------- from sqlalchemy import create_engine db=create_engine("sqlite:///:memory:", echo=True) #----------------------------------- #DDL #----------------------------------- db.execute("create table users(userid char(10), username char(50))") #----------------------------------- #DML #----------------------------------- resultProxy=db.execute("insert into users (userid,username) values('user1','tony')") resultProxy.rowcount #return rows affected by an UPDATE or DELETE statement #----------------------------------- #Query #----------------------------------- resultProxy=db.execute("select * from users") resultProxy.close(), resultProxy 用完之後, 需要close resultProxy.scalar(), 可以返回一個標量查詢的值 ResultProxy 類是對Cursor類的封裝(在檔案sqlalchemy\engine\base.py), ResultProxy 類有個屬性cursor即對應著原來的cursor. ResultProxy 類有很多方法對應著Cursor類的方法, 另外有擴充套件了一些屬性/方法. resultProxy.fetchall() resultProxy.fetchmany() resultProxy.fetchone() resultProxy.first() resultProxy.scalar() resultProxy.returns_rows #True if this ResultProxy returns rows. resultProxy.rowcount #return rows affected by an UPDATE or DELETE statement. It is not intended to provide the number of rows present from a SELECT. ****遍歷ResultProxy時, 得到的每一個行都是RowProxy物件, 獲取欄位的方法非常靈活, 下標和欄位名甚至屬性都行. rowproxy[0] == rowproxy['id'] == rowproxy.id, 看得出 RowProxy 已經具備基本 POJO 類特性. #----------------------------------- #使用transaction #----------------------------------- #SqlAlchemy支援支援事務, 甚至事務可以巢狀. 預設事務是自動提交,即執行一條SQL就自動提交。 #-如果更精準地控制事務, 最簡單的方法是使用 connection, 然後通過connection獲取transaction物件 connection = db.connect() trans = connection.begin() try: dosomething(connection) trans.commit() except: trans.rollback() #-還有一種方式是,在建立engine時指定strategy='threadlocal'引數,這樣會自動建立一個執行緒區域性的連線,對於後續的無連線的執行都會自動使用這個連線,這樣在處理事務時,只要使用 engine 物件來操作事務就行了。如: #參見 http://hi.baidu.com/limodou/blog/item/83f4b2194e94604043a9ad9c.html db = create_engine(connection, strategy='threadlocal') db.begin() try: dosomething() except: db.rollback() else: db.commit() #-預設事務是自動提交,即執行一條SQL就自動提交. 也可以在connection和statement上通過execution_options()方法修改為手動commit模式 conn.execution_options(autocommit=False) 設定為手動提交模式後, 要提交, 需要呼叫conn.commit() #----------------------------------- #如何使用 pyDbRowFactory #----------------------------------- #pyDbRowFactory是我開發的一個通用RowFactory, 可以繫結cursor和你的 model pojo 類, 新版本的pyDbRowFactoryResultProxy. 下面示例是pyDbRowFactory的最基本用法 #方法1, 使用 cursor物件 cursor=resultProxy.cursor from pyDbRowFactory import DbRowFactory rowFactory=DbRowFactory(cursor, "your_module.your_row_class") lst=factory.fetchAllRowObjects() #方法2, 直接使用 resultProxy from pyDbRowFactory import DbRowFactory factory=DbRowFactory.fromSqlAlchemyResultProxy(resultProxy, "your_module.your_row_class") lst=factory.fetchAllRowObjects() 前面講過, SQLAlchemy使用 ResultProxy封裝了cursor, ResultProxy的每一個行記錄是一個RowProxy 類物件. RowProxy 使用起來非常方便, 對於查詢select userName from users, 每一個行結果都可以使用rowproxy來訪問, 寫法相當靈活. rowproxy.userName=rowproxy["userName"]==rowproxy[0], 所以有了RowProxy, 很多時候, 沒有必要再為每個表建立一個 model pojo 類. #----------------------------------- #連線池 #----------------------------------- sqlalchemy 預設的連線池演算法選用規則為: 1.連線記憶體中的sqlite, 預設的連線池演算法為 SingletonThreadPool 類, 即每個執行緒允許一個連線 2.連線基於檔案的sqlite, 預設的連線池演算法為 NullPool 類, 即沒有連線池 3.對於其他情況, 預設的連線池演算法為 QueuePool 類 當然, 我們也可以實現自己的連線池演算法, db = create_engine('sqlite:///file.db', poolclass=YourPoolClass) create_engine()函式和連線池相關的引數有: -pool_recycle, 預設為-1, 推薦設定為7200, 即如果connection空閒了7200秒, 自動重新獲取, 以防止connection被db server關閉. -pool_size=5, 連線數大小,預設為5,正式環境該數值太小,需根據實際情況調大 -max_overflow=10, 超出pool_size後可允許的最大連線數,預設為10, 這10個連線在使用過後, 不放在pool中, 而是被真正關閉的. -pool_timeout=30, 獲取連線的超時閾值, 預設為30#----------------------------------- #log輸出 #----------------------------------- --如果只需在sys.stdout輸出, 用不著引用 logging 模組就能實現 db = create_engine('sqlite:///file.db', echo=True) --如果要在檔案中輸出, log檔案不具備rotate功能, 不推薦在生產環境中使用. import logging logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO) #----------------------------------- #使用 sqlalchemy core 的最佳實踐 #----------------------------------- 我不太喜歡使用ORM方式, 主要是ORM學習成本比較高, 另外, 構建複雜的查詢也比較困難. 更多的時候是使用raw sql和sql expression方法. 1. declarative 是 SqlAlchemy的一個新的擴充套件, 只能用在 ORM 中, 不能用在SQL Expression中 2. 如果要使用ORM時, table必須有主鍵; 使用 raw sql和sql expression, 沒有這個約束. 使用心得: 1. 查詢不管是否複雜, 直接使用 raw sql; 增刪改多是單表操作, 使用sql expression 就足夠了. 2. 具體講, 對於增刪改, 比如一個User類, 可包含一個固定的 _table 的成員, _table=Table('users', metadata, autoload=True), 增刪改直接使用_table物件來完成. 3. 對於查詢, 若結果集能對映到一個實體物件, 使用pyDbRowFactory完成物件例項化. 若結果集涉及多個實體, 直接使用ResultProxy, ResultProxy的每一行物件也具有基本的物件特徵, 多數情況下沒有必要再專門對映成一個特別的類. 4. 表之間關係的處理, 比如: users 表和 addresses 表, 存在 1:n的關係, 對應地, User 類也會有個AddressList的成員, 在實體化一個User物件後, 我們可立即查詢 addresses 表, 獲取該使用者的address列表, 分2步就可以完成這種1:n的關係對映. 使用 sqlalchemy 的寫法太靈活了, 下面僅僅是我喜歡的一種寫法, 僅僅從排版看, 就相當漂亮. 構建insert語句: _table.insert().values(f1=value1,f2=value2,) 構建update語句: _table.update().values(f1=newvalue1,f2=newvalue2).where(_table.c.f1==value1).where(_table.c.f2==value2) 構建delete語句: _table.delete().where(_table.c.f1==value1).where(_table.c.f2==value2) 批量insert/update/delete, 將每行資料組成一個dict, 再將這些dict組成一個list, 和_table.insert()/update()/delete()一起作為引數傳給 conn.execute(). conn.execute(_table.insert(), [ {’user_id’: 1, ’email_address’ : ’jack@yahoo.com’}, {’user_id’: 1, ’email_address’ : ’jack@msn.com’}, {’user_id’: 2, ’email_address’ : ’www@www.org’}, {’user_id’: 2, ’email_address’ : ’wendy@aol.com’}, ]) sql expression 也可以像raw sql的text函式一樣使用bindparam, 方法是, 在呼叫insert()/update()/delete()時宣告引數, 然後在conn.execute()執行時候, 將實參傳進去. d=_table.delete().where(_table.c.hiredate<=bindparam("hire_day",DateTime(), required=True)) conn.execute(d, {"hire_day":datetime.today()}) where()和ORM中的filter()接受的引數是一樣, 各種SQL條件都支援. #equals: where(_table.c.name == ’ed’) #not equals: where(_table.c.name != ’ed’) #LIKE: where(_table.c.name.like(’%ed%’)) #IN: where(_table.c.name.in_([’ed’, ’wendy’, ’jack’])) #NOT IN: where(~_table.c.name.in_([’ed’, ’wendy’, ’jack’])) #IS NULL: where(_table.c.name == None) #IS NOT NULL: where(_table.c.name != None) #AND: from sqlalchemy import and_ where(and_(_table.c.name == ’ed’, _table.c.fullname == ’Ed Jones’)) #AND也可以通過多次呼叫where()來實現 where(_table.c.name == ’ed’).where(_table.c.fullname == ’Ed Jones’) #OR: from sqlalchemy import or_ where(or_(_table.c.name == ’ed’, _table.c.name == ’wendy’)) #match: The contents of the match parameter are database backend specific. where(_table.c.name.match(’wendy’)) --========================== --python file: mydatabase.py --========================== from sqlalchemy import create_engine from sqlalchemy.schema import MetaData #db = create_engine('sqlite:///:memory:', echo=True) db = create_engine('sqlite:///c://caw.sqlite.db', echo=True) metadata = MetaData(bind=db) --========================== --python file: dal.py --========================== from sqlalchemy.sql.expression import text, bindparam from sqlalchemy.sql import select,insert, delete, update from sqlalchemy.schema import Table from mydatabase import db,metadata from pyDbRowFactory import DbRowFactory class caw_job(object): FULL_NAME="dal.caw_job" tablename="caw_job" _table=Table(tablename, metadata, autoload=True) def __init__(self): self.app_domain =None self.job_code =None self.job_group =None self.cron_year =None self.cron_month =None self.cron_day =None self.cron_week =None self.cron_day_of_week =None self.cron_hour =None self.cron_minute =None self.description =None @classmethod def getEntity(cls, app_domain, jobCode): sql="select * from caw_job where app_domain=:app_domain and job_code=:job_code"; resultProxy=db.execute(text(sql),{'app_domain':app_domain, 'job_code':jobCode}) DbRowFactory.fromSqlAlchemyResultProxy(resultProxy, cls.FULL_NAME) return DbRowFactory.fetchOneRowObject() def insert(self): i=self._table.insert().values( app_domain =self.app_domain , job_code =self.job_code , job_group =self.job_group , cron_year =self.cron_year , cron_month =self.cron_month , cron_day =self.cron_day , cron_week =self.cron_week , cron_day_of_week=self.cron_day_of_week, cron_hour =self.cron_hour , cron_minute =self.cron_minute , description =self.description , ) db.execute(i) def update(self): u=self._table.update().values( app_domain =self.app_domain , job_code =self.job_code , job_group =self.job_group , cron_year =self.cron_year , cron_month =self.cron_month , cron_day =self.cron_day , cron_week =self.cron_week , cron_day_of_week =self.cron_day_of_week , cron_hour =self.cron_hour , cron_minute =self.cron_minute , description =self.description , , ).where(self._table.c.app_domain==self.app_domain)\ .where(self._table.c.job_code==self.job_code) db.execute(u) def delete(self): d=self._table.delete().where(self._table.c.app_domain==self.app_domain)\ .where(self._table.c.job_code==self.job_code) db.execute(d) #----------------------------------- #使用sqlalchemy.ext.declarative 來生成表, 所有的表都必須有主鍵. #在系統初期, 資料模型往往需要經常調整, 使用這種方式修改表結構更方便些. #----------------------------------- --python file: models.py from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String , Boolean, DateTime, Float engine = create_engine('sqlite:///:memory:', echo=True) Base = declarative_base() # ddl_caw_job 是專門用來生成資料庫物件的, 沒有其他用處 class ddl_caw_job(Base): __tablename__="caw_job" job_name =Column(String(200), primary_key=True) job_group =Column(String(200)) def init_db(): Base.metadata.create_all(bind=engine,)

相關推薦

python使用原生sql操作資料庫

使用 sqlalchemy 有3種方式: 方式1, 使用raw sql; 方式2, 使用SqlAlchemy的sql expression; 方式3, 使用ORM. 前兩種方式可以統稱為 core 方式. 本文講解 core 方式訪問資料庫, 不涉及 O

laravel+Mysql DB原生SQL操作報1292 Truncated incorrect DOUBLE value問題解決

報1292 Truncated incorrect DOUBLE value問題的,基本都是因為SQL寫的不標準或者寫錯了。 昨天接到一個批量更新的需求。因為laravel不支援批量的更新操作,故使用了原生的sql進行拼接(可能laravel支援批量,我不知道)。 在做一個

RevitDynamo程式設計——在Python Script結合sql server資料庫來儲存材料的外觀屬性

我們的Dynamo的Python Script環境其實也是 IronPython的一種,結合以上部落格可以很好的結合資料庫。 我的這篇讀取材料的外觀屬性,並存儲到sql server資料庫中。今天我來修改一下,把其中的的讀取過程做成dll檔案,然後在Pyt

Python的列表操作

.so 排序 語句 res 發揮 第一個元素 刪除列 extend 刪除元素 Python的列表操作可謂是功能強大且方便(相對於Java)簡單、常規的操作就不說了(這不是一個入門教程),介紹幾個很有特點的例子添加 # 追加到結尾(append) li = [1, 2

PythonPython的列表操作

元素 提取 添加 sta 連接 not n個元素 none 格式 Python的列表操作可謂是功能強大且方便(相對於Java)簡單、常規的操作就不說了(這不是一個入門教程),介紹幾個很有特點的例子添加 # 追加到結尾(append) li = [1, 2, 3, 4, 5

python列表切片操作

tar 元素 append() 插入 根據 ins class ever sta 1 a=[‘zhao‘,‘qian‘,‘sun‘,‘li‘,‘zhou‘,‘‘] 2 3 #增刪改查 4 #查 切片 [] 5 print(a[1:])#取到最後 6 pr

pythonsql語句用法

mod location drop rop class turn price pytho 初始 函數中應用sql語句def _get_cust_number(self,cr,uid,ids,field_name,args,context=None): res={

Python 的字典操作

語言 clear 組成 {} 返回 括號 做了 進行 不返回 字典:dict 字典在其他編程語言中又稱作關聯數組或散列表 通過鍵實現元素存取: 無序集合,可變類型容器,長度可變,異構,嵌套 表示方法: phonebook = {‘Alice‘:‘1234‘,‘Beth‘:

python字串的操作方法

python中字串的操作方法大全 更新時間:2018年06月03日 10:08:51 作者:駿馬金龍 我要評論這篇文章主要給大家介紹了關於python中字串操作方法的相關資料,文中通過示例程式碼詳細介紹了關於python中字串的大小寫轉換、isXXX判斷、填充、子串搜尋、替換、分割、join以及修剪:st

python字串的操作方法大全

轉自:https://www.jb51.net/article/141376.htm 這篇文章主要給大家介紹了關於python中字串操作方法的相關資料,文中通過示例程式碼詳細介紹了關於python中字串的大小寫轉換、isXXX判斷、填充、子串搜尋、替換、分割、join以及修剪:strip、l

Python字串常用操作

字串常用操作 (1)find 檢測指定字串是否包含在當前字串中,如果是返回開始的索引值,否則返回-1 strs.find(str,start,end) start 為起始位置,end為結束位置 (2)index 作用和find()方法一樣,不過當要查詢的字串不存在時

pythonlist常用操作(不包括切片)

stus = ['abc‘,’dec',’dxq‘,’wzw‘] #下標,索引,角標 stus[3] stus = [] #空陣列 stus = list() #空列表 #增加元素 stus.append('zhaoyan') #在列表末尾增加一個元素 stus.inse

python檔案及其操作(File)

檔案是用於資料儲存的單位 檔案通常用於長期儲存資料 檔案中是以位元組為單位順序儲存資料的 檔案的操作流程 開啟檔案 讀/寫檔案 關閉檔案   注:        任何的作

Python字典的操作

存在 最好 是否 bag ems pri mon one 報錯 1、字典key-value,key是不能重復的stu_info={"name":"王誌華","age":18,"addr":"北京"}2、取值,查print(stu_info[‘name‘])print(stu

Python檔案的操作

檔案的開啟和關閉: f = open('test.txt','w') r:以只讀方式開啟檔案。檔案的指標將會放在檔案的開頭。這是預設模式。 w:開啟一個檔案只用於寫入。如果該檔案已存在則將其覆蓋。如果該檔案不存在,建立新檔案。 a:開啟一個檔案用於追加。如果該檔案已存在,檔案指標將會放在

python os模組操作檔案路徑

python中的os.path模組用法: dirname() 用於去掉檔名,返回目錄所在的路徑 如: >>> import os >>> os.path.dirname('d:\\library\\book.txt') >>>

laravel原生sql操作以及like模糊查詢的坑點

此篇文章基於 laravel版本:5.5 laravel基於DB的原生查詢可以使用 DB facade 執行查詢。DB facade 為每種型別的查詢提供了方法:select、update、insert、delete 和 statement。以下是知識點: (1)執行

PythonString字串操作

關於字串的一些操作方法:  ascii:字母,數字,特殊字元:1個位元組,8位     Unicode:16位 兩個位元組  升級 32 位  四個位元組     utf-8:最少一個位元組 8位表示。 英文字母 8位 1個位

Python "is" 與 "==" 操作有什麼區別?

在 Python 中,比較兩個物件(變數)是否相等,可以用 “is” 和 “==” 操作,但它倆有什麼區別?什麼時候用 “is”,什麼時候用 “==” ?在面試時,發現不少候選人很難把這兩者完全說清楚,因此在這篇文章中,「Python之禪」將對二者進行深入淺出的對比介紹。 先舉個例子 小黃最

python關於檔案操作的總結

本來想對一個檔案重複執行某一個函式,但是隻有第一次執行函式時有結果,後面的幾次都沒有結果,最終發現是在函式外開啟檔案,第一次執行沒有問題,第二次執行時,檔案已經開啟,函式找不到該檔案了,或者是檔案沒有關閉,具體原因需要檢視python內部的執行機制。舉例如下: 定義一個函式,輸出文字檔案裡的