1. 程式人生 > >清晰架構(Clean Architecture)的Go微服務: 事物管理

清晰架構(Clean Architecture)的Go微服務: 事物管理

為了支援業務層中的事務,我試圖在Go中查詢類似Spring的宣告式事務管理,但是沒找到,所以我決定自己寫一個。 事務很容易在Go中實現,但很難做到正確地實現。

需求:
  1. 將業務邏輯與事務程式碼分開。
    在編寫業務用例時,開發者應該只需考慮業務邏輯,不需要同時考慮怎樣給業務邏輯加事務管理。如果以後需要新增事務支援,你可以在現有業務邏輯的基礎上進行簡單封裝,而無需更改任何其他程式碼。事務實現細節應該對業務邏輯透明。

  2. 事務邏輯應該作用於用例層(業務邏輯)
    不在持久層上。

  3. 資料服務(資料永續性)層應對事務邏輯透明。
    這意味著永續性程式碼應該是相同的,無論它是否支援事務

  4. 你可以選擇延遲支援事物。
    你可以先編寫沒有事務的用例,稍後可以在不修改現有程式碼的情況下給該用例加上事務。你只需新增新程式碼。

我最終的解決方案還不是宣告式事務管理,但它非常接近。建立一個真正的宣告式事務管理需要付出很多努力,因此我構建了一個可以實現宣告式事務的大多數功能的事務管理,同時又沒花很多精力。

方案:

最終解決方案涉及本程式的所有層級,我將逐一解釋它們。

資料庫連結封裝

在Go的“sql”lib中,有兩個資料庫連結sql.DB和sql.Tx. 不需要事務時,使用sql.DB訪問資料庫; 當需要事務時,你使用sql.Tx. 為了共享程式碼,持久層需要同時支援兩者。 因此需要對資料庫連結進行封裝,然後把它作為資料庫訪問方法的接收器。 我從這裡¹得到了粗略的想法。

// SqlGdbc (SQL Go database connection) is a wrapper for SQL database handler ( can be *sql.DB or *sql.Tx)
// It should be able to work with all SQL data that follows SQL standard.
type SqlGdbc interface {
    Exec(query string, args ...interface{}) (sql.Result, error)
    Prepare(query string) (*sql.Stmt, error)
    Query(query string, args ...interface{}) (*sql.Rows, error)
    QueryRow(query string, args ...interface{}) *sql.Row
    // If need transaction support, add this interface
    Transactioner
}

// SqlDBTx is the concrete implementation of sqlGdbc by using *sql.DB
type SqlDBTx struct {
    DB *sql.DB
}

// SqlConnTx is the concrete implementation of sqlGdbc by using *sql.Tx
type SqlConnTx struct {
    DB *sql.Tx
}

資料庫實現型別SqlDBTx和sqlConnTx都需要實現SqlGdbc介面(包括“Transactioner”)接口才行。 需要為每個資料庫(例如MySQL, CouchDB)實現“Transactioner”介面以支援事務。

// Transactioner is the transaction interface for database handler
// It should only be applicable to SQL database
type Transactioner interface {
    // Rollback a transaction
    Rollback() error
    // Commit a transaction
    Commit() error
    // TxEnd commits a transaction if no errors, otherwise rollback
    // txFunc is the operations wrapped in a transaction
    TxEnd(txFunc func() error) error
    // TxBegin gets *sql.DB from receiver and return a SqlGdbc, which has a *sql.Tx
    TxBegin() (SqlGdbc, error)
}

資料庫儲存層(datastore layer)的事物管理程式碼

以下是“Transactioner”介面的實現程式碼,其中只有TxBegin()是在SqlDBTx(sql.DB)上實現,因為事務從sql.DB開始,然後所有事務的其他操作都在SqlConnTx(sql.Tx)上。 我從這裡²得到了這個想法。

// TransactionBegin starts a transaction
func (sdt *SqlDBTx) TxBegin() (gdbc.SqlGdbc, error) {
    tx, err := sdt.DB.Begin()
    sct := SqlConnTx{tx}
    return &sct, err
}

func (sct *SqlConnTx) TxEnd(txFunc func() error) error {
    var err error
    tx := sct.DB

    defer func() {
        if p := recover(); p != nil {
            tx.Rollback()
            panic(p) // re-throw panic after Rollback
        } else if err != nil {
            tx.Rollback() // err is non-nil; don't change it
        } else {
            err = tx.Commit() // if Commit returns error update err with commit err
        }
    }()
    err = txFunc()
    return err
}

func (sct *SqlConnTx) Rollback() error {
    return sct.DB.Rollback()
}

用例層的事物介面

在用例層中,你可以擁有相同業務功能的一個函式的兩個版本,一個支援事務,一個不支援,並且它們的名稱可以共享相同的字首,而事務可以新增“withTx”作為字尾。 例如,在以下程式碼中,“ModifyAndUnregister”是不支援事務的那個,“ModifyAndUnregisterWithTx”是支援事務的那個。 “EnableTxer”是用例層上唯一的事務支援介面,任何支援事務的“用例”都需要它。 這裡的所有程式碼都在是用例層級(包括“EnableTxer”)程式碼,不涉及資料庫內容。

type RegistrationUseCaseInterface interface {
...
    // ModifyAndUnregister change user information and then unregister the user based on the User.Id passed in.
    // It is created to illustrate transaction, no real use.
    ModifyAndUnregister(user *model.User) error
    // ModifyAndUnregisterWithTx change user information and then unregister the user based on the User.Id passed in.
    // It supports transaction
    // It is created to illustrate transaction, no real use.
    ModifyAndUnregisterWithTx(user *model.User) error
    // EnableTx enable transaction support on use case. Need to be included for each use case needs transaction
    // It replaces the underline database handler to sql.Tx for each data service that used by this use case
    EnableTxer
}
// EnableTxer is the transaction interface for use case layer
type EnableTxer interface {
    EnableTx()
}

以下是不包含事務的業務邏輯程式碼的示例。 “modifyAndUnregister(ruc,user)”是事務和非事務用例函式共享的業務功能。 你需要使用TxBegin()和TxEnd()(在TxDataInterface中)來包裝業務功能以支援事務,這些是資料服務層介面,並且與資料庫訪問層無關。 該用例還實現了“EnableTx()”介面,該介面實際上將底層資料庫連結從sql.DB切換到sql.Tx.

// The use case of ModifyAndUnregister without transaction
func (ruc *RegistrationUseCase) ModifyAndUnregister(user *model.User) error {
    return modifyAndUnregister(ruc, user)
}

// The use case of ModifyAndUnregister with transaction
func (ruc *RegistrationUseCase) ModifyAndUnregisterWithTx(user *model.User) error {
    tdi, err := ruc.TxDataInterface.TxBegin()
    if err != nil {
        return errors.Wrap(err, "")
    }
    ruc.EnableTx()
    return tdi.TxEnd(func() error {
        // wrap the business function inside the TxEnd function
        return modifyAndUnregister(ruc, user)
    })
}

// The business function will be wrapped inside a transaction and inside a non-transaction function
// It needs to be written in a way that every error will be returned so it can be catched by TxEnd() function,
// which will handle commit and rollback
func modifyAndUnregister(ruc *RegistrationUseCase, user *model.User) error {
    udi := ruc.UserDataInterface
    err := modifyUser(udi, user)
    if err != nil {
        return errors.Wrap(err, "")
    }
    err = unregisterUser(udi, user.Name)
    if err != nil {
        return errors.Wrap(err, "")
    }
    return nil
}

func (ruc *RegistrationUseCase) EnableTx() {
    // Only UserDataInterface need transaction support here. If there are other data services need it,
    // then they also need to enable transaction here
    ruc.UserDataInterface.EnableTx(ruc.TxDataInterface)
}

為什麼我需要在“TxDataInterface”中呼叫函式“EnbaleTx”來替換底層資料庫連結而不是直接在用例中執行? 因為sql.DB和sql.Tx層級要比用例層低幾個級別,直接呼叫會搞砸依賴關係。 保持合理依賴關係的訣竅是在每一層上都有TxBegin()和TxEnd()並逐層呼叫它們以維持合理的依賴關係。

資料服務層的事物介面

我們討論了用例層和資料儲存層上的事務功能,我們還需要資料服務層中的事務功能將這兩者連線在一起。 以下程式碼是資料服務層的事務介面(“TxDataInterface”)。 “TxDataInterface”是僅為事物管理而建立的資料服務層介面。 每個資料庫只需要實現一次。 還有一個“EnableTxer”介面(這是一個數據服務層介面,不要與用例層中的“EnableTxer”介面混淆),實現“EnableTxer”介面將開啟資料服務型別對事務的支援,例如, 如果想要“UserDataInterface”支援事物,就需要它實現“EnableTxer”介面。

// TxDataInterface represents operations needed for transaction support.
// It only needs to be implemented once for each database
// For sqlGdbc, it is implemented for SqlDBTx in transaction.go
type TxDataInterface interface {
    // TxBegin starts a transaction. It gets a DB handler from the receiver and return a TxDataInterface, which has a
    // *sql.Tx inside. Any data access wrapped inside a transaction will go through the *sql.Tx
    TxBegin() (TxDataInterface, error)
    // TxEnd is called at the end of a transaction and based on whether there is an error, it commits or rollback the
    // transaction.
    // txFunc is the business function wrapped in a transaction
    TxEnd(txFunc func() error) error
    // Return the underline transaction handler, sql.Tx
    GetTx() gdbc.SqlGdbc
}

// This interface needs to be included in every data service interface that needs transaction support
type EnableTxer interface {
    // EnableTx enables transaction, basically it replaces the underling database handle sql.DB with sql.Tx
    EnableTx(dataInterface TxDataInterface)
}

// UserDataInterface represents interface for user data access through database
type UserDataInterface interface {
...
    Update(user *model.User) (rowsAffected int64, err error)
    // Insert adds a user to a database. The returned resultUser has a Id, which is auto generated by database
    Insert(user *model.User) (resultUser *model.User, err error)
    // Need to add this for transaction support
    EnableTxer
}

以下程式碼是“TxDataInterface”的實現。 “TxDataSql”是“TxDataInterface”的具體型別。 它呼叫底層資料庫連結的開始和結束函式來執行真正的事務操作。

// TxDataSql is the generic implementation for transaction for SQL database
// You only need to do it once for each SQL database
type TxDataSql struct {
    DB gdbc.SqlGdbc
}

func (tds *TxDataSql) TxEnd(txFunc func() error) error {
    return tds.DB.TxEnd(txFunc)
}

func (tds *TxDataSql) TxBegin() (dataservice.TxDataInterface, error) {

    sqlTx, error := tds.DB.TxBegin()
    tdi := TxDataSql{sqlTx}
    tds.DB = tdi.DB
    return &tdi, error
}
func (tds *TxDataSql) GetTx() gdbc.SqlGdbc {
    return tds.DB
}

事物策略:

你可能會問為什麼我在上面的程式碼中需要“TxDataSql”? 確實可以在沒有它的情況下實現事務,實際上最開的程式裡就沒有它。 但是我還是要在某些資料服務中實現“TxDataInterface”來開始和結束事務。 由於這是在用例層中完成的,用例層不知道哪個資料服務型別實現了介面,因此必須在每個資料服務介面上實現“TxDataInterface”(例如,“UserDataInterface”和“CourseDataInterface”)以保證 “用例層”不會選擇沒有介面的“資料服務(data service)”。 在建立“TxDataSql”之後,我只需要在“TxDataSql”中實現一次“TxDataInterface”,然後每個資料服務型別只需要實現“EnableTx()”就行了。

// UserDataSql is the SQL implementation of UserDataInterface
type UserDataSql struct {
    DB gdbc.SqlGdbc
}

func (uds *UserDataSql) EnableTx(tx dataservice.TxDataInterface) {
    uds.DB = tx.GetTx()
}

func (uds *UserDataSql) FindByName(name string) (*model.User, error) {
    //logger.Log.Debug("call FindByName() and name is:", name)
    rows, err := uds.DB.Query(QUERY_USER_BY_NAME, name)
    if err != nil {
        return nil, errors.Wrap(err, "")
    }
    defer rows.Close()
    return retrieveUser(rows)
}

上面的程式碼是“UserDataService”介面的實現程式。 “EnableTx()”方法從“TxDataInterface”獲得sql.Tx並將“UserDataSql”中的sql.DB替換為sql.Tx.

資料訪問方法(例如,FindByName())在事務程式碼和非事務程式碼之間共享,並且不需要知道“UserDataSql.DB”是sql.DB還是sql.Tx.

依賴關係漏洞:

上面的程式碼實現中存在一個缺陷,這會破壞我的設計並使其不完美。它是“TxDataInterface”中的函式“GetTx()”,它是一個數據服務層介面,因此它不應該依賴於gdbc.SqlGdbc(資料庫介面)。你可能認為資料服務層的實現程式碼無論如何都需要訪問資料庫,當前這是正確的。但是,你可以在將來更改實現去呼叫gRPC微服務(而不是資料庫)。如果介面不依賴於SQL介面的話,則可以自由更改實現,但如果不是,則即使你的介面實現已更改,該介面也會永久保留對SQL的依賴。

為什麼它是本程式中打破依賴關係的唯一地方?因為對於其他介面,容器負責建立具體型別,而程式的其餘部分僅使用介面。但是對於事務,在建立具體型別之後,需要將底層資料庫處理程式從sql.DB替換為sql.Tx,這破壞了設計。

它有解決方法嗎?是的,容器可以為需要事務的函式建立sql.Tx而不是sql.DB,這樣我就不需要在以後的用例級別中替換它。但是,配置檔案中需要一個標誌來指示函式是否需要事務, 而且這個標誌需要配備給用例中的每個函式。這是一個太大的改動,所以我決定現在先這樣,以後再重新審視它。

好處:

通過這個實現,事務程式碼對業務邏輯幾乎是透明的(除了我上面提到的缺陷)。業務邏輯中沒有資料儲存(datastore)級事務程式碼,如Tx.Begin,Tx.Commit和Tx.Rollback(但你確實需要業務級別事物函式Tx.Begin和Tx.End),不僅如此,你的永續性程式碼中也幾乎沒有資料儲存級事務程式碼。 如需在用例層上啟用事務,你只需要在用例上實現EnableTx()並將業務函式封裝在“TxBegin()”,EnableTx()和“TxEnd()”中,如上例所示。 在持久層上,大多數事務程式碼已經由“txDataService.go”實現,你只需要為特定的資料服務(例如UserDataService)實現“EnableTx”。 事務支援的真正操作是在“transaction.go”檔案中實現的,它實現了“Transactioner”介面,它有四個函式,“Rollback”, “Commit”, “TxBegin” 和 “TxEnd”。

對用例增加事物支援的步驟:

假設我們需要在用例“listCourse”中為一個函式新增事務支援,以下是步驟

  1. 在列表課程用例(“listCourse.go”)中實現“EnableTxer”介面

  2. 在域模型(“course”)資料服務層(courseDataMysql.go)中實現“EnableTxer”介面

  3. 建立一個新的事務啟用函式並將現有業務函式包裝在“TxBegin()”,EnableTx()和“TxEnd()”中

缺陷:

首先,它仍然不是宣告​​式事物管理;第二,它沒有完全達到需求中的#4。要將用例函式從非事務更改為事務,你可以建立一個支援事務的新函式,它需要更改呼叫函式; 或者你修改現有函式並將其包裝到事務中,這也需要程式碼更改。為了實現#4,需要新增許多程式碼,因此我將其推遲到以後。第三,它不支援巢狀事務(Nested Transaction),因此你需要手動確保程式碼中沒有發生巢狀事務。如果程式碼庫不是太複雜,這很容易做到。如果你有一個非常複雜的程式碼庫,有很多事務和非事務函式混在一起,那麼手工做起來會比較困難,這是需要在程式中實現巢狀事務或找到已經支援它的方案。我沒有花時間研究新增巢狀事務所需的工作量,但這可能並不容易。如果你對它感興趣,這裡³是一些討論。到目前為止,對於大多數情況而言,當前的解決方案可能是在代價不大的情況下的最佳方案。

應用範圍:

首先,它只支援SQL資料庫的事務。 如果你有NoSql資料庫,它將無法工作(大多數NoSql資料庫無論如何都不支援事務)。 其次,如果事務跨越了資料庫的邊界(例如在不同的微伺服器之間),那麼它將無法工作。 在這種情況下,你需要使用Saga⁴。它的原理是為事物中的每個操作寫一個補償操作,然後在回滾階段挨個執行每一個補償操作。 在當前框架中新增Sage解決方案應該不難。

其他資料庫相關問題:

關閉資料庫連結(Close connection)

我從來沒有為資料庫連結呼叫Close()函式,因為沒有必要這樣做。 你可以傳入sql.DB或sql.Tx作為永續性函式的接收器(receiver)。 對於sql.DB,資料庫將自動建立連結池併為你管理連結。 連結完成後,它將返回到連結池,無需關閉。 對於sql.Tx,在事務結束時,你可以提交或回滾,之後連結將返回到連線池,而無需關閉。 請參閱此處⁵ 和 此處⁶ .

物件關係對映(O/R mapping)

我簡要地查看了幾個“O/R”對映庫,但它們沒有提供我所需要的功能。 我認為“O/R對映”只適合兩種情況。 首先,你的應用程式主要是CRUD,沒有太多的查詢或搜尋; 第二,開發人員不熟悉SQL。 如果不是這種情況,則O/R對映不會提供太多幫助。 我想從擴充套件資料庫模組中獲得兩個功能,一個是將sql.row載入到我的域模型結構(包括處理NULL值)中(例如“User”),另一個是自動關閉sql型別,如sql.statement或sql.rows。 有一些sql擴充套件庫似乎提供了至少部分這樣的功能。 我還沒有嘗試,但似乎值得一試。

延遲(Defer):

在進行資料庫訪問時,你將進行大量重複呼叫以關閉資料庫型別(例如statements, rows)。例如以下程式碼中的“defer row.close()”。 你想要記住這一點,要在錯誤處理函式之後呼叫“defer row.close()”,因為如果不是這樣,當出現錯誤時,“rows”將為nil,這將導致恐慌並且不會執行錯誤處理程式碼。


func (uds *UserDataSql) Find(id int) (*model.User, error) {
    rows, err := uds.DB.Query(QUERY_USER_BY_ID, id)
    if err != nil {
        return nil, errors.Wrap(err, "")
    }
    defer rows.Close()
    return retrieveUser(rows)
}

恐慌(panic):

我看到很多Go資料庫程式碼在出現資料庫錯誤時丟擲了恐慌(panic)而不是錯誤(error),這可能會導致微服務出現問題,因為在微服務環境中你通常希望服務一直執行。 假設當更新語句中出現SQL錯誤時,使用者將無法訪問該功能,這很糟糕。 但如果因為這個,整個微服務或網站被關閉,那就更糟了。 因此,正確的方法是將錯誤傳播到上一級並讓它決定要做什麼。 因此正確的做法是不在你的程式中丟擲panic,但如果第三方庫丟擲恐慌呢? 這時你需要捕獲恐慌並從中恢復以保持你的服務正常執行。 我在另一篇文章“日誌管理”⁸中有具體示例.

源程式:

完整的源程式連結 github: https://github.com/jfeng45/servicetmpl

索引:

[1][db transaction in golang](https://stackoverflow.com/questions/26593867/db-transaction-in-golang)

[2][database/sql Tx—detecting Commit or Rollback](https://stackoverflow.com/questions/16184238/database-sql-tx-detecting-commit-or-rollback/23502629#23502629)

[3][database/sql: nested transaction or save point support](https://github.com/golang/go/issues/7898)

[4][GOTO 2015 • Applying the Saga Pattern • Caitie McCaffrey — YouTube](https://www.youtube.com/watch?v=xDuwrtwYHu8)

[5][Common Pitfalls When Using database/sql in Go](https://www.vividcortex.com/blog/2015/09/22/common-pitfalls-go/)

[6][Go database/sql tutorial]
(http://go-database-sql.org/connection-pool.html)

[7][sqlx](https://github.com/jmoiron/sqlx)

[8][Go Microservice with Clean Architecture: Application Logging](https://jfeng45.github.io/posts/go_logging_and_error_handlin