golang sql連線池的實現解析 golang sql連線池的實現解析
golang sql連線池的實現解析
golang的”database/sql”是操作資料庫時常用的包,這個包定義了一些sql操作的介面,具體的實現還需要不同資料庫的實現,mysql比較優秀的一個驅動是:github.com/go-sql-driver/mysql
,在介面、驅動的設計上”database/sql”的實現非常優秀,對於類似設計有很多值得我們借鑑的地方,比如beego框架cache的實現模式就是借鑑了這個包的實現;”database/sql”除了定義介面外還有一個重要的功能:連線池,我們在實現其他網路通訊時也可以借鑑其實現。
連線池的作用這裡就不再多說了,我們先從一個簡單的示例看下”database/sql”怎麼用:
package main
import(
"fmt"
"database/sql"
_ "github.com/go-sql-driver/mysql"
)
func main(){
db, err := sql.Open("mysql", "username:[email protected](host)/db_name?charset=utf8&allowOldPasswords=1")
if err != nil {
fmt.Println(err)
return
}
defer db.Close()
rows,err := db.Query("select * from test")
for rows.Next(){
//row.Scan(...)
}
rows.Close()
}
用法很簡單,首先Open開啟一個數據庫,然後呼叫Query、Exec執行資料庫操作,github.com/go-sql-driver/mysql
具體實現了database/sql/driver
的介面,所以最終具體的資料庫操作都是呼叫github.com/go-sql-driver/mysql
實現的方法,同一個資料庫只需要呼叫一次Open即可,下面根據具體的操作分析下”database/sql”都幹了哪些事。
1.驅動註冊
import _ "github.com/go-sql-driver/mysql"
前面的”_”作用時不需要把該包都導進來,只執行包的init()
方法,mysql驅動正是通過這種方式註冊到”database/sql”中的:
//github.com/go-sql-driver/mysql/driver.go
func init() {
sql.Register("mysql", &MySQLDriver{})
}
type MySQLDriver struct{}
func (d MySQLDriver) Open(dsn string) (driver.Conn, error) {
...
}
init()
通過Register()
方法將mysql驅動新增到sql.drivers
(型別:make(map[string]driver.Driver))中,MySQLDriver實現了driver.Driver
介面:
//database/sql/sql.go
func Register(name string, driver driver.Driver) {
driversMu.Lock()
defer driversMu.Unlock()
if driver == nil {
panic("sql: Register driver is nil")
}
if _, dup := drivers[name]; dup {
panic("sql: Register called twice for driver " + name)
}
drivers[name] = driver
}
//database/sql/driver/driver.go
type Driver interface {
// Open returns a new connection to the database.
// The name is a string in a driver-specific format.
//
// Open may return a cached connection (one previously
// closed), but doing so is unnecessary; the sql package
// maintains a pool of idle connections for efficient re-use.
//
// The returned connection is only used by one goroutine at a
// time.
Open(name string) (Conn, error)
}
假如我們同時用到多種資料庫,就可以通過呼叫sql.Register
將不同資料庫的實現註冊到sql.drivers
中去,用的時候再根據註冊的name將對應的driver取出。
2.連線池實現
先看下連線池整體處理流程:
2.1 初始化DB
db, err := sql.Open("mysql", "username:[email protected](host)/db_name?charset=utf8&allowOldPasswords=1")
sql.Open()
是取出對應的db,這時mysql還沒有建立連線,只是初始化了一個sql.DB
結構,這是非常重要的一個結構,所有相關的資料都儲存在此結構中;Open同時啟動了一個connectionOpener
協程,後面再具體分析其作用。
type DB struct {
driver driver.Driver //資料庫實現驅動
dsn string //資料庫連線、配置引數資訊,比如username、host、password等
numClosed uint64
mu sync.Mutex //鎖,操作DB各成員時用到
freeConn []*driverConn //空閒連線
connRequests []chan connRequest //阻塞請求佇列,等連線數達到最大限制時,後續請求將插入此佇列等待可用連線
numOpen int //已建立連線或等待建立連線數
openerCh chan struct{} //用於connectionOpener
closed bool
dep map[finalCloser]depSet
lastPut map[*driverConn]string // stacktrace of last conn's put; debug only
maxIdle int //最大空閒連線數
maxOpen int //資料庫最大連線數
maxLifetime time.Duration //連線最長存活期,超過這個時間連線將不再被複用
cleanerCh chan struct{}
}
maxIdle
(預設值2)、maxOpen
(預設值0,無限制)、maxLifetime(預設值0,永不過期)
可以分別通過SetMaxIdleConns
、SetMaxOpenConns
、SetConnMaxLifetime
設定。
2.2 獲取連線
上面說了Open
時是沒有建立資料庫連線的,只有等用的時候才會實際建立連線,獲取可用連線的操作有兩種策略:cachedOrNewConn(有可用空閒連線則優先使用,沒有則建立)、alwaysNewConn(不管有沒有空閒連線都重新建立),下面以一個query的例子看下具體的操作:
rows, err := db.Query("select * from test")
database/sql/sql.go
:
func (db *DB) Query(query string, args ...interface{}) (*Rows, error) {
var rows *Rows
var err error
//maxBadConnRetries = 2
for i := 0; i < maxBadConnRetries; i++ {
rows, err = db.query(query, args, cachedOrNewConn)
if err != driver.ErrBadConn {
break
}
}
if err == driver.ErrBadConn {
return db.query(query, args, alwaysNewConn)
}
return rows, err
}
func (db *DB) query(query string, args []interface{}, strategy connReuseStrategy) (*Rows, error) {
ci, err := db.conn(strategy)
if err != nil {
return nil, err
}
//到這已經獲取到了可用連線,下面進行具體的資料庫操作
return db.queryConn(ci, ci.releaseConn, query, args)
}
資料庫連線由db.query()
獲取:
func (db *DB) conn(strategy connReuseStrategy) (*driverConn, error) {
db.mu.Lock()
if db.closed {
db.mu.Unlock()
return nil, errDBClosed
}
lifetime := db.maxLifetime
//從freeConn取一個空閒連線
numFree := len(db.freeConn)
if strategy == cachedOrNewConn && numFree > 0 {
conn := db.freeConn[0]
copy(db.freeConn, db.freeConn[1:])
db.freeConn = db.freeConn[:numFree-1]
conn.inUse = true
db.mu.Unlock()
if conn.expired(lifetime) {
conn.Close()
return nil, driver.ErrBadConn
}
return conn, nil
}
//如果沒有空閒連線,而且當前建立的連線數已經達到最大限制則將請求加入connRequests佇列,
//並阻塞在這裡,直到其它協程將佔用的連線釋放或connectionOpenner建立
if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
// Make the connRequest channel. It's buffered so that the
// connectionOpener doesn't block while waiting for the req to be read.
req := make(chan connRequest, 1)
db.connRequests = append(db.connRequests, req)
db.mu.Unlock()
ret, ok := <-req //阻塞
if !ok {
return nil, errDBClosed
}
if ret.err == nil && ret.conn.expired(lifetime) { //連線過期了
ret.conn.Close()
return nil, driver.ErrBadConn
}
return ret.conn, ret.err
}
db.numOpen++ //上面說了numOpen是已經建立或即將建立連線數,這裡還沒有建立連線,只是樂觀的認為後面會成功,失敗的時候再將此值減1
db.mu.Unlock()
ci, err := db.driver.Open(db.dsn) //呼叫driver的Open方法建立連線
if err != nil { //建立連線失敗
db.mu.Lock()
db.numOpen-- // correct for earlier optimism
db.maybeOpenNewConnections() //通知connectionOpener協程嘗試重新建立連線,否則在db.connRequests中等待的請求將一直阻塞,知道下次有連線建立
db.mu.Unlock()
return nil, err
}
db.mu.Lock()
dc := &driverConn{
db: db,
createdAt: nowFunc(),
ci: ci,
}
db.addDepLocked(dc, dc)
dc.inUse = true
db.mu.Unlock()
return dc, nil
}
總結一下上面獲取連線的過程:
* step1:首先檢查下freeConn裡是否有空閒連線,如果有且未超時則直接複用,返回連線,如果沒有或連線已經過期則進入下一步;
* step2:檢查當前已經建立及準備建立的連線數是否已經達到最大值,如果達到最大值也就意味著無法再建立新的連線了,當前請求需要在這等著連線釋放,這時當前協程將建立一個channel:chan connRequest
,並將其插入db.connRequests
佇列,然後阻塞在接收chan connRequest
上,等到有連線可用時這裡將拿到釋放的連線,檢查可用後返回;如果還未達到最大值則進入下一步;
* step3:建立一個連線,首先將numOpen加1,然後再建立連線,如果等到建立完連線再把numOpen加1會導致多個協程同時建立連線時一部分會浪費,所以提前將numOpen佔住,建立失敗再將其減掉;如果建立連線成功則返回連線,失敗則進入下一步
* step4:建立連線失敗時有一個善後操作,當然並不僅僅是將最初佔用的numOpen數減掉,更重要的一個操作是通知connectionOpener協程根據db.connRequests
等待的長度建立連線,這個操作的原因是:
numOpen在連線成功建立前就加了1,這時候如果numOpen已經達到最大值再有獲取conn的請求將阻塞在step2,這些請求會等著先前進來的請求釋放連線,假設先前進來的這些請求建立連線全部失敗,那麼如果它們直接返回了那些等待的請求將一直阻塞在那,因為不可能有連線釋放(極限值,如果部分建立成功則會有部分釋放),直到新請求進來重新成功建立連線,顯然這樣是有問題的,所以maybeOpenNewConnections
將通知connectionOpener根據db.connRequests
長度及可建立的最大連線數重新建立連線,然後將新建立的連線發給阻塞的請求。
注意:如果maxOpen=0
將不會有請求阻塞等待連線,所有請求只要從freeConn中取不到連線就會新建立。
另外Query
、Exec
有個重試機制,首先優先使用空閒連線,如果2次取到的連線都無效則嘗試新建立連線。
獲取到可用連線後將呼叫具體資料庫的driver處理sql。
2.3 釋放連線
資料庫連線在被使用完成後需要歸還給連線池以供其它請求複用,釋放連線的操作是:putConn()
:
func (db *DB) putConn(dc *driverConn, err error) {
...
//如果連線已經無效,則不再放入連線池
if err == driver.ErrBadConn {
db.maybeOpenNewConnections()
dc.Close() //這裡最終將numOpen數減掉
return
}
...
//正常歸還
added := db.putConnDBLocked(dc, nil)
...
}
func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
if db.maxOpen > 0 && db.numOpen > db.maxOpen {
return false
}
//有等待連線的請求則將連線發給它們,否則放入freeConn
if c := len(db.connRequests); c > 0 {
req := db.connRequests[0]
// This copy is O(n) but in practice faster than a linked list.
// TODO: consider compacting it down less often and
// moving the base instead?
copy(db.connRequests, db.connRequests[1:])
db.connRequests = db.connRequests[:c-1]
if err == nil {
dc.inUse = true
}
req <- connRequest{
conn: dc,
err: err,
}
return true
} else if err == nil && !db.closed && db.maxIdleConnsLocked() > len(db.freeConn) {
db.freeConn = append(db.freeConn, dc)
db.startCleanerLocked()
return true
}
return false
}
釋放的過程:
* step1:首先檢查下當前歸還的連線在使用過程中是否發現已經無效,如果無效則不再放入連線池,然後檢查下等待連線的請求數新建連線,類似獲取連線時的異常處理,如果連線有效則進入下一步;
* step2:檢查下當前是否有等待連線阻塞的請求,有的話將當前連線發給最早的那個請求,沒有的話則再判斷空閒連線數是否達到上限,沒有則放入freeConn空閒連線池,達到上限則將連線關閉釋放。
* step3:(只執行一次)啟動connectionCleaner協程定時檢查feeConn中是否有過期連線,有則剔除。
有個地方需要注意的是,Query
、Exec
操作用法有些差異:
- a.
Exec
(update、insert、delete等無結果集返回的操作)呼叫完後會自動釋放連線; - b.
Query
(返回sql.Rows)則不會釋放連線,呼叫完後仍然佔有連線,它將連線的所屬權轉移給了sql.Rows
,所以需要手動呼叫close歸還連線,即使不用Rows也得呼叫rows.Close(),否則可能導致後續使用出錯,如下的用法是錯誤的:
//錯誤
db.SetMaxOpenConns(1)
db.Query("select * from test")
row,err := db.Query("select * from test") //此操作將一直阻塞
//正確
db.SetMaxOpenConns(1)
r,_ := db.Query("select * from test")
r.Close() //將連線的所屬權歸還,釋放連線
row,err := db.Query("select * from test")
//other op
row.Close()
文章標籤:
golang