1. 程式人生 > >Spring Boot多資料來源事務管理

Spring Boot多資料來源事務管理

在開發企業應用時,對於使用者的一個操作實際上對應底層資料庫的多個讀寫。由於資料操作在順序執行的過程中,任何一步操作都有可能發生異常,異常會導致後續操作無法完成,此時由於業務邏輯並未正確的完成,之前成功操作資料的並不可靠,會產生不一致的資料,需要在這種情況下進行回退。事務的作用就是為了保證使用者的每一個操作都是可靠的,事務中的每一步操作都必須成功執行,只要有發生異常就回退到事務開始未進行操作的狀態。瞭解事務的基本屬性和隔離級別,請參考之前的一篇文章理解資料庫事務的4種隔離級別。瞭解事務的傳播屬性,請看Spring Boot中使用@Transactional註解配置事務管理。對於單源資料庫,只要在需要進行事務控制的方法上新增@Transactional註解就可以,但是對於多源資料庫,@Transactionnal是無法管理多個數據源的。本篇文章主要介紹上篇文章多源資料庫操作時,事務控制的實現方式。其次要講的是,如果想真正實現多源資料庫事務控制,肯定是需要分散式鎖的,本篇文章介紹的兩種方式,並沒有使用分散式鎖,換言之,只是多源資料庫事務控制的一種變通方式。

1. 只使用主庫TransactionManger

這種方式,在需要進行事務控制的方法上加@Transactional註解,並在註解上使用value屬性,註明是主庫事務管理器。如下:

@Transactional(value = "masterTransactionManager")
public void createUser(String userName, String description) {
    MasterUser masterUser = new MasterUser();

    /*主資料庫插入*/
    masterUser.setUserName(userName);
    masterUser.setDescription(description);
    masterUser.setIsDeleted(DataStatusEnum.EXIST.getCode());
    masterUserMapper.insertSelective(masterUser);

    /*從資料庫插入*/
    SlaveUser slaveUser = new SlaveUser();
    slaveUser.setUserName(userName);
    slaveUser.setDescription(description);
    slaveUser.setIsDeleted(DataStatusEnum.EXIST.getCode());
    slaveUserMapper.insertSelective(slaveUser);
}

使用主庫事務管理器,也就是說事務中產生異常時,只能回滾主庫資料。但是因為資料操作順序是先主後從,所以分一下三種情況:

  1. 主庫插入時異常,主庫未插成功,這時候從庫還沒來及插入,主從資料是還是一致的
  2. 主庫插入成功,從庫插入時異常,這時候在主庫事務管理器監測到事務中存在異常,將之前插入的主庫資料插入,主從資料還是一致的
  3. 主庫插入成功,從庫插入成功,事務結束,主從資料一致。

當然這只是理想情況,假如存在一種情況,在資料庫從庫插入之後,還有其他業務邏輯的處理,假如這部分業務處理產生了異常,主庫事務管理器只能回滾主庫資料,但是從庫資料是無法回滾的,這時候主從資料變產生了不一致。還有比如從庫資料插入成功後,主庫提交,這時候主庫崩潰了,導致資料沒插入,這時候從庫資料也是無法回滾的。這種方式可以簡單實現多源資料庫的事務管理,但是無法處理上述情況。看一下正常處理下情況下列印的日誌資訊:

在createUser方法上,添加了註解@Transactional(value = “masterTransactionManager”),只開啟了主庫的事務管理器,從日誌上看,也只有主庫事務管理生效了,與預期一致。

2. 為方法新增多個事務管理器

@Transactional註解支援指定事務管理器,假如可以為一個方法新增多個註解,是不是就可以了完成對兩個資料來源的事務管理。但是,Spring是不支援為一個方法新增兩個@Transactional註解的,所以最直接的想法是可不可以通過程式碼實習為一個方法新增兩個事務管理器,最終找到一種解決方案,通過自定義註解方式,實現為createUser方法新增兩個@Transactional註解的效果,並開啟兩個事務管理器。核心程式碼如下:

2.1 新增自定義註解MultiTransactional

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MultiTransactional {

    String[] value() default {};
}

註解使用物件是方法,@Retention(RetentionPolicy.RUNTIME)表示註解會在class位元組碼檔案中存在,在執行時可以通過反射獲取到。關於自定義註解的詳細資訊,請參考這篇文章

2.2 新增主從資料庫事務管理器名稱常量

public class DbTxConstants {

    public static final String DB1_TX = "masterTransactionManager";

    public static final String DB2_TX = "slaveTransactionManager";
}

其實這一步也可以省略,只是程式碼中要多次使用主從事務管理器名,所以這裡定義成常量。

2.3 新增自定義攔截器

@Aspect
@Component
public class MultiTransactionAop {

    private final ComboTransaction comboTransaction;

    @Autowired
    public MultiTransactionAop(ComboTransaction comboTransaction) {
        this.comboTransaction = comboTransaction;
    }

    @Pointcut("@annotation(com.zhuoli.service.springboot.mybatis.transaction.repository.aop.MultiTransactional)")
    public void pointCut() {
    }

    @Around("pointCut() && @annotation(multiTransactional)")
    public Object inMultiTransactions(ProceedingJoinPoint pjp, MultiTransactional multiTransactional) {
        return comboTransaction.inCombinedTx(() -> {
            try {
                return pjp.proceed();
            } catch (Throwable throwable) {
                if (throwable instanceof RuntimeException) {
                    throw (RuntimeException) throwable;
                }
                throw new RuntimeException(throwable);
            }
        }, multiTransactional.value());
    }
}

功能為收集被攔截方法MultiTransactional註解,並將Callable物件 () -> createUser()作為引數傳給comboTransaction.inCombinedTx方法。

2.4 ComboTransaction類

@Component
public class ComboTransaction {

    @Autowired
    private Db1TxBroker db1TxBroker;

    @Autowired
    private Db2TxBroker db2TxBroker;

    public <V> V inCombinedTx(Callable<V> callable, String[] transactions) {
        if (callable == null) {
            return null;
        }

        Callable<V> combined = Stream.of(transactions)
                .filter(ele -> !StringUtils.isEmpty(ele))
                .distinct()
                .reduce(callable, (r, tx) -> {
                    switch (tx) {
                        case DbTxConstants.DB1_TX:
                            return () -> db1TxBroker.inTransaction(r);
                        case DbTxConstants.DB2_TX:
                            return () -> db2TxBroker.inTransaction(r);
                        default:
                            return null;
                    }
                }, (r1, r2) -> r2);

        try {
            return combined.call();
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

簡單講一下方法inCombinedTx的作用,其實就是將自定義註解@MultiTransactional的引數通過Java8 Stream的Reduce操作,轉化為Callable物件,不瞭解Reduce操作的同學,可以參考我之前的一篇文章Java8 Stream reduce操作。將最終的Callable物件呼叫call方法執行,得到最終結果。

2.5 Db1TxBroker & Db2TxBroker

@Component
public class Db1TxBroker {

    @Transactional(DbTxConstants.DB1_TX)
    public <V> V inTransaction(Callable<V> callable) {
        try {
            return callable.call();
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

@Component
public class Db2TxBroker {

    @Transactional(DbTxConstants.DB2_TX)
    public <V> V inTransaction(Callable<V> callable) {
        try {
            return callable.call();
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

ComboTransaction類的inCombinedTx方法,第一個引數為() -> createUser(),所以reduce執行過程如下:

1. () -> db1TxBroker.inTransaction(() -> createUser())

2. db1TxBroker.inTransaction(() -> createUser())結果為
@Transactional("masterTransactionManager")
createUser()
所以() -> db1TxBroker.inTransaction(() -> createUser())結果為
      @Transactional("masterTransactionManager")
() -> createUser()

3. db2TxBroker.inTransaction(() -> createUser())結果為
@Transactional("slaveTransactionaManager")
@Transactional("masterTransactionManager")
createUser()
所以() -> db2TxBroker.inTransaction(() -> createUser())結果為
      @Transactional("slaveTransactionaManager")
      @Transactional("masterTransactionManager")
() -> createUser()

4. combined.call()其實等價於
@Transactional("slaveTransactionaManager")
@Transactional("masterTransactionManager")
createUser()
實現為createUser()新增兩個@Transactional註解

2.6 自定義註解使用

@Override
@MultiTransactional(value = {DbTxConstants.DB1_TX, DbTxConstants.DB2_TX})
public void createUserWithAnnotation(String userName, String description) {
    MasterUser masterUser = new MasterUser();

    /*主資料庫插入*/
    masterUser.setUserName(userName);
    masterUser.setDescription(description);
    masterUser.setIsDeleted(DataStatusEnum.EXIST.getCode());
    masterUserMapper.insertSelective(masterUser);

    /*從資料庫插入*/
    SlaveUser slaveUser = new SlaveUser();
    slaveUser.setUserName(userName);
    slaveUser.setDescription(description);
    slaveUser.setIsDeleted(DataStatusEnum.EXIST.getCode());
    slaveUserMapper.insertSelective(slaveUser);
}

2.7 日誌資訊

可以看到,開啟了兩個事務管理器,符合預期

2.8 異常模擬

@Override
@MultiTransactional(value = {DbTxConstants.DB1_TX, DbTxConstants.DB2_TX})
public void createUserWithAnnotation(String userName, String description) {
    MasterUser masterUser = new MasterUser();

    /*主資料庫插入*/
    masterUser.setUserName(userName);
    masterUser.setDescription(description);
    masterUser.setIsDeleted(DataStatusEnum.EXIST.getCode());
    masterUserMapper.insertSelective(masterUser);

    /*從資料庫插入*/
    SlaveUser slaveUser = new SlaveUser();
    slaveUser.setUserName(userName);
    slaveUser.setDescription(description);
    slaveUser.setIsDeleted(DataStatusEnum.EXIST.getCode());
    slaveUserMapper.insertSelective(slaveUser);

    if (true){
        throw new RuntimeException("Exception");
    }
}

日誌資訊如下:可以看到使用自定義註解這種方式,假如在從庫插入後,還有其他業務邏輯,並且報了異常,這時候主從資料庫都是可以回滾的。當然這種事務控制方式也存在不完美的地方,比如當提交時資料庫崩潰這種情況,依然是無法解決的,但是這種情況可能性是相對比較小的,所以在不使用分散式鎖的情況下,這種事務多元資料庫事務管理方式是一種有效的方案。

另外對於本篇文章的示例程式碼配置需要注意一下,需要講日誌級別調到DEBUG,否則無法看到資料庫提交的相關日誌資訊。