SanicDB:簡化 Python 非同步 Web 框架 Sanic 操作 MySQL
SanicDB 是為 Python 的非同步 Web 框架 Sanic 方便操作 MySQL 而開發的工具,是對 aiomysql.Pool 的輕量級封裝。Sanic 是非同步IO的Web框架,同時用非同步IO讀寫MySQL才能更大發揮它的效率。雖然這個模組叫做 SanicDB,但是它可以用在任何非同步IO操作MySQL的地方。
SanicDB的靈感來源於tornado裡面的對MySQLdb(MySQL的C語言介面的Python封裝)的操作,後來tornado把它去除了,就有人把這部分程式碼寫成一個單獨的模組叫做 torndb,torndb是對python-mysql的封裝,不能進行非同步操作。但是它很簡潔,用起來已經非常習慣了。
在用Sanic的時候,發現有非同步IO的aiomysql可以用,但是用起來還有點麻煩,於是就著手對aiomysql進行封裝,既然aiomysql支援連線池,那就直接封裝aiomysql.Pool吧。
首先,看看它的初始化
class SanicDB: """A lightweight wrapper around aiomysql.Pool for easy to use """ def __init__(self, host, database, user, password, loop=None, sanic=None, minsize=3, maxsize=5, return_dict=True, pool_recycle=7*3600, autocommit=True, charset = "utf8mb4", **kwargs): '''
前面4個引數是對應資料庫的;
loop 是你的應用程式中的事件迴圈(event_loop);
sanic 是你的Sanic 應用中的sanic 物件;
loop 和 sanic 兩者只需提供一個即可,當同時存在時,連線資料庫其實用的是sanic裡面的loop,這在__init__()程式碼中可以看到。
minisize 和 maxsize 是連線池的個數限制;
return_dict 是返回的資料一條記錄為一個dict,key是MySQL 表的欄位名,value是欄位的值;
pool_recycle 是連線池重連的時間間隔,MySQL預設的連線閒置時間是8小時;
對MySQL 的讀寫操作方法:
async def query(self, query, *parameters, **kwparameters):
async def execute(self, query, *parameters, **kwparameters):
async def get(self, query, *parameters, **kwparameters):
從程式碼中可以看到,以上三個函式都是呼叫的cursor的execute() 執行 SQL 命令,但是query(), get() 用於讀操作返回資料,query返回全部資料(list),get只返回一條資料。
execute() 方法用來執行寫操作,返回受影響的行ID。
上面三個函式 都是一樣的:
query 是要執行的SQL語句, 後面兩個引數是用來引數化執行sql語句的。引數化執行sql是為了防止SQL注入的,那麼引數化執行SQL是什麼意思呢,舉個例子就目標了。
比如,進行使用者登入時,我們要執行一條SQL,Python拼接SQL如下:
sql = "select * from user where name='%s' and password='%s'" % (input_name, input_password)
input_name 和 input_password 都是使用者輸入的,如果使用者想搗鬼,他把input_name 輸入成 “ ‘ or 1 = 1 –”,拼接出來的sql就成了:
select * from user where name='' or 1 = 1 --' and password='anything'
or 後面的 1=1 就是True,而 — 後面都成了註釋,這條語句不管input_name, input_password 是什麼都可以執行,通過使用者驗證,達到了注入目的。
引數化執行SQL就是把拼接SQL的任務交給SanicDB(實際上是它後的aiomysql後面的pymysql),我們只把要拼接的引數當做*parameters傳給讀寫方法即可:
mydb.query("select * from user where user=%s and password=%s", input_name, input_password)
更高階的封裝:
query(), get(), execute() 就是基本的操作,在此基礎上,做了進一步封裝,讓寫應用更方便:
# 檢查一個table中是否含有某欄位為某值的記錄 async def table_has(self, table_name, field, value): # 把一個item插入一個表,item為dict,key是欄位名, # ignore_duplicated為True時,遇到唯一索引重複時不報錯 async def table_insert(self, table_name, item, ignore_duplicated=True): # 更新一個table的記錄,updates是dict,key為欄位名 async def table_update(self, table_name, updates, field_where, value_where):
示例
程式碼倉庫中有兩個示例:
test.py 是在普通asyncio 程式中使用非同步MySQL的例子
example.py 是在Sanic Web中使用的例子
特別說明一下,Sanic 物件 app 中還沒有生成 event_loop 物件(它是在app.run() 之後生成的),所以在Sanic中初始化SanicDB時(app.run())之前,執行sql語句會報錯。如果需要建立連線後初始化執行一些SQL操作,可以利用Sanic的Listner:
@app.listener('before_server_start') async def setup_db(app, loop): app.db = await db_setup()