1. 程式人生 > >分散式學習筆記十三:分散式事務 TCC-Transaction 原始碼分析 —— TCC 實現

分散式學習筆記十三:分散式事務 TCC-Transaction 原始碼分析 —— TCC 實現

本文主要基於 TCC-Transaction 1.2.3.3 正式版

1. 概述

本文分享 TCC 實現。主要涉及如下三個 Maven 專案:

  • tcc-transaction-core :tcc-transaction 底層實現。
  • tcc-transaction-api :tcc-transaction 使用 API。
  • tcc-transaction-spring :tcc-transaction Spring 支援。

ps:筆者假設你已經閱讀過《tcc-transaction 官方文件 —— 使用指南1.2.x》。

ps2:未特殊說明的情況下,本文事務指的是 TCC事務。

2. TCC 原理

TCC事務  為了解決在事務執行過程中大顆粒度資源鎖定的問題,業界提出一種新的事務模型,它是基於業務層面的事務定義。鎖粒度完全由業務自己控制。它本質是一種補償的思路。它把事務執行過程分成 Try、Confirm / Cancel 兩個階段。在每個階段的邏輯由業務程式碼控制。這樣就事務的鎖粒度可以完全自由控制。業務可以在犧牲隔離性的情況下,獲取更高的效能。

  • Try 階段 

        Try :嘗試執行業務               完成所有業務檢查( 一致性 )              預留必須業務資源( 準隔離性 )

  • Confirm / Cancel 階段: 

        Confirm :確認執行業務              真正執行業務             不做任務業務檢查             Confirm 操作滿足冪等性        Cancel :取消執行業務             釋放 Try 階段預留的業務資源            Cancel 操作滿足冪等性        Confirm 與 Cancel 互斥 整體流程如下圖:

  • 紅框部分功能由 tcc-transaction-core 實現:
  1. 啟動業務活動
  2. 登記業務操作
  3. 提交 / 回滾業務活動
  • 黃框部分功能由 tcc-transaction-http-sample 實現( 官方提供的示例專案 ):
  1. Try 操作
  2. Confirm 操作
  3. Cancel 操作

與 2PC協議 比較:

  • 位於業務服務層而非自願層
  • 沒有單獨的準備( Prepare )階段,Try 操作兼備自願操作與準備能力
  • Try 操作可以靈活選擇業務資源的鎖定粒度
  • 較高開發成本

參考資料:

  • 《支付寶運營架構中柔性事務指的是什麼?》
  • 《分散式事務的典型處理方式:2PC、TCC、非同步確保和最大努力型》

3. TCC-Transaction 原理

在 TCC 裡,一個業務活動可以有多個事務,每個業務操作歸屬於不同的事務,即一個事務可以包含多個業務操作。TCC-Transaction 將每個業務操作抽象成事務參與者,每個事務可以包含多個參與者

參與者需要宣告 try / confirm / cancel 三個型別的方法,和 TCC 的操作一一對應。在程式裡,通過 @Compensable 註解標記在 try 方法上,並填寫對應的 confirm / cancel 方法,示例程式碼如下:

// try @Compensable(confirmMethod = "confirmRecord", cancelMethod = "cancelRecord", transactionContextEditor = MethodTransactionContextEditor.class) public String record(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {}

// confirm public void confirmRecord(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {}

// cancel public void cancelRecord(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {}

  • 在示例程式碼中,我們看到 TransactionContext,事務上下文,這個是怎麼生成的呢?這裡先賣一個關子。

TCC-Transaction 有兩個攔截器,通過對 @Compensable AOP 切面( 參與者 try 方法 )進行攔截,透明化對參與者 confirm / cancel 方法呼叫,從而實現 TCC 。簡化流程如下圖:

第一個攔截器,可補償事務攔截器,實現如下功能:

  • 在 Try 階段,對事務的發起、傳播。
  • 在 Confirm / Cancel 階段,對事務提交或回滾。
  • 為什麼會有對事務的傳播呢?在遠端呼叫服務的參與者時,會通過“引數”( 需要序列化 )的形式傳遞事務給遠端參與者。

第二個攔截器,資源協調者攔截器,實現如下功能:

  • 在 Try 階段,新增參與者到事務中。當事務上下文不存在時,進行建立。

實際攔截器對事務的處理會比上圖複雜一些,在本文「6. 事務攔截器」詳細解析。

在 TCC-Transaction 程式碼實現上,元件分層如下圖:

本文按照如下順序分享:

  • 「4. 事務攔截器」
  • 「5. 事務管理器」
  • 「6. 事務管理器」

內容是自下而上的方式分享,每個元件可以更加整體的被認識。當然這可能對你理解時產生一臉悶逼,所以推薦兩種閱讀方式:

  • 簡讀 x 1 + 深讀 x 1
  • 倒著讀,發現未分享的方法,全文檢索該方法。

4. 事務與參與者

在 TCC 裡,一個事務( org.mengyun.tcctransaction.Transaction ) 可以有多個參與者( org.mengyun.tcctransaction.Participant )參與業務活動。類圖關係如下( 開啟大圖 ):

4.1 事務

Transaction 實現程式碼如下:

public class Transaction implements Serializable {

    private static final long serialVersionUID = 7291423944314337931L;

    /**
     * 事務編號
     */
    private TransactionXid xid;
    /**
     * 事務狀態
     */
    private TransactionStatus status;
    /**
     * 事務型別
     */
    private TransactionType transactionType;
    /**
     * 重試次數
     */
    private volatile int retriedCount = 0;
    /**
     * 建立時間
     */
    private Date createTime = new Date();
    /**
     * 最後更新時間
     */
    private Date lastUpdateTime = new Date();
    /**
     * 版本號
     */
    private long version = 1;
    /**
     * 參與者集合
     */
    private List<Participant> participants = new ArrayList<Participant>();
    /**
     * 附帶屬性對映
     */
    private Map<String, Object> attachments = new ConcurrentHashMap<String, Object>();

    /**
     * 新增參與者
     *
     * @param participant 參與者
     */
    public void enlistParticipant(Participant participant) {
        participants.add(participant);
    }

    /**
     * 提交 TCC 事務
     */
    public void commit() {
        for (Participant participant : participants) {
            participant.commit();
        }
    }

    /**
     * 回滾 TCC 事務
     */
    public void rollback() {
        for (Participant participant : participants) {
            participant.rollback();
        }
    }
}

xid,事務編號( TransactionXid ),用於唯一標識一個事務。使用 UUID 演算法生成,保證唯一性org.mengyun.tcctransaction.api.TransactionXid 實現 javax.transaction.xa.Xid 介面,實現程式碼如下:

public class TransactionXid implements Xid, Serializable {
    private static final long serialVersionUID = -6817267250789142043L;

    /**
     * xid 格式識別符號
     */
    private int formatId = 1;
    /**
     * 全域性事務編號
     */
    private byte[] globalTransactionId;
    /**
     * 分支事務編號
     */
    private byte[] branchQualifier;

}
  • TODO 為什麼要繼承 Xid 介面?

status,事務狀態( TransactionStatus )。org.mengyun.tcctransaction.api.TransactionStatus 實現程式碼如下:

public enum TransactionStatus {
    /**
     * 嘗試中狀態
     */
    TRYING(1),
    /**
     * 確認中狀態
     */
    CONFIRMING(2),
    /**
     * 取消中狀態
     */
    CANCELLING(3);

    private int id;
}

transactionType,事務型別( TransactionType )。org.mengyun.tcctransaction.common.TransactionType 實現程式碼如下:

public enum TransactionType {

    /**
     * 根事務
     */
    ROOT(1),
    /**
     * 分支事務
     */
    BRANCH(2);

    int id;
}

retriedCount,重試次數。在 TCC 過程中,可能參與者異常崩潰,這個時候會進行重試直到成功或超過最大次數。在《TCC-Transaction 原始碼解析 —— 事務恢復》詳細解析。

version,版本號,用於樂觀鎖更新事務。在《TCC-Transaction 原始碼解析 —— 事務儲存器》詳細解析。 attachments,附帶屬性對映。在《TCC-Transaction 原始碼解析 —— Dubbo 支援》詳細解析。 提供 #enlistParticipant() 方法,新增事務參與者。 提供 #commit() 方法,呼叫參與者們提交事務。 提供 #rollback() 方法,呼叫參與者回滾事務。4.2 參與者

Participant 實現程式碼如下:

public class Participant implements Serializable {

    private static final long serialVersionUID = 4127729421281425247L;

    /**
     * 事務編號
     */
    private TransactionXid xid;
    /**
     * 確認執行業務方法呼叫上下文
     */
    private InvocationContext confirmInvocationContext;
    /**
     * 取消執行業務方法
     */
    private InvocationContext cancelInvocationContext;
    /**
     * 執行器
     */
    private Terminator terminator = new Terminator();
    /**
     * 事務上下文編輯
     */
    Class<? extends TransactionContextEditor> transactionContextEditorClass;

    /**
     * 提交事務
     */
    public void commit() {
        terminator.invoke(new TransactionContext(xid, TransactionStatus.CONFIRMING.getId()), confirmInvocationContext, transactionContextEditorClass);
    }

    /**
     * 回滾事務
     */
    public void rollback() {
        terminator.invoke(new TransactionContext(xid, TransactionStatus.CANCELLING.getId()), cancelInvocationContext, transactionContextEditorClass);
    }
}
  • xid,參與者事務編號。通過 TransactionXid.globalTransactionId 屬性,關聯上其所屬的事務。當參與者進行遠端呼叫時,遠端的分支事務的事務編號等於該參與者的事務編號。通過事務編號的關聯,TCC Confirm / Cancel 階段,使用參與者的事務編號和遠端的分支事務進行關聯,從而實現事務的提交和回滾,在「5.2 傳播發起分支事務」 + 「6.2 可補償事務攔截器」可以看到具體實現。

confirmInvocationContext,確認執行業務方法呼叫上下文( InvocationContext )。org.mengyun.tcctransaction.InvocationContext 實現程式碼如下:

public class InvocationContext implements Serializable {

    private static final long serialVersionUID = -7969140711432461165L;

    /**
     * 類
     */
    private Class targetClass;
    /**
     * 方法名
     */
    private String methodName;
    /**
     * 引數型別陣列
     */
    private Class[] parameterTypes;
    /**
     * 引數陣列
     */
    private Object[] args;
}
  • InvocationContext,執行方法呼叫上下文,記錄類、方法名、引數型別陣列、引數陣列。通過這些屬性,可以執行提交 / 回滾事務。在 org.mengyun.tcctransaction.Terminator 會看到具體的程式碼實現。本質上,TCC 通過多個參與者的 try / confirm / cancel 方法,實現事務的最終一致性。

cancelInvocationContext,取消執行業務方法呼叫上下文( InvocationContext )。

terminator,執行器( Terminator )。org.mengyun.tcctransaction.Terminator 實現程式碼如下:

public class Terminator implements Serializable {

    private static final long serialVersionUID = -164958655471605778L;

    public Object invoke(TransactionContext transactionContext, InvocationContext invocationContext, Class<? extends TransactionContextEditor> transactionContextEditorClass) {
        if (StringUtils.isNotEmpty(invocationContext.getMethodName())) {
            try {
                // 獲得 參與者物件
                Object target = FactoryBuilder.factoryOf(invocationContext.getTargetClass()).getInstance();
                // 獲得 方法
                Method method = target.getClass().getMethod(invocationContext.getMethodName(), invocationContext.getParameterTypes());
                // 設定 事務上下文 到方法引數
                FactoryBuilder.factoryOf(transactionContextEditorClass).getInstance().set(transactionContext, target, method, invocationContext.getArgs());
                // 執行方法
                return method.invoke(target, invocationContext.getArgs());
            } catch (Exception e) {
                throw new SystemException(e);
            }
        }
        return null;
    }

}
  • FactoryBuilder,工廠 Builder,感興趣的同學點選連結檢視,已經新增完整中文程式碼註釋。
  • TransactionContextEditor,在本文「6.1 Compensable」詳細解析。

transactionContextEditorClass,事務上下文編輯,在「6.1 Compensable」詳細解析。 提交 #commit() 方法,提交參與者自己的事務。 提交 #rollback() 方法,回滾參與者自己的事務。

5. 事務管理器

org.mengyun.tcctransaction.TransactionManager,事務管理器,提供事務的獲取、發起、提交、回滾,參與者的新增等等方法。

5.1 發起根事務

提供 begin() 方法,發起根事務。該方法在呼叫方法型別為 MethodType.ROOT 並且 事務處於 Try 階段被呼叫。MethodType 在「6.2 可補償事務攔截器」詳細解析。

實現程式碼如下:

// TransactionManager.java
/**
* 發起根事務
*
* @return 事務
*/
public Transaction begin() {
   // 建立 根事務
   Transaction transaction = new Transaction(TransactionType.ROOT);
   // 儲存 事務
   transactionRepository.create(transaction);
   // 註冊 事務
   registerTransaction(transaction);
   return transaction;
}

呼叫 Transaction 構造方法,建立根事務。實現程式碼如下:

// Transaction.java
/**
* 建立指定型別的事務
*
* @param transactionType 事務型別
*/
public Transaction(TransactionType transactionType) {
   this.xid = new TransactionXid();
   this.status = TransactionStatus.TRYING; // 嘗試中狀態
   this.transactionType = transactionType;
}
  • 目前該構造方法只有 TransactionManager#begin() 在呼叫,即只建立根事務。

呼叫 TransactionRepository#crete() 方法,儲存事務。 呼叫 #registerTransaction(...) 方法,註冊事務到當前執行緒事務佇列。實現程式碼如下:

// TransactionManager.java
/**
* 當前執行緒事務佇列
*/
private static final ThreadLocal<Deque<Transaction>> CURRENT = new ThreadLocal<Deque<Transaction>>();

/**
* 註冊事務到當前執行緒事務佇列
*
* @param transaction 事務
*/
private void registerTransaction(Transaction transaction) {
   if (CURRENT.get() == null) {
       CURRENT.set(new LinkedList<Transaction>());
   }
   CURRENT.get().push(transaction); // 新增到頭部
}
  • 可能有同學會比較好奇,為什麼使用佇列儲存當前執行緒事務?TCC-Transaction 支援多個的事務獨立存在,後建立的事務先提交,類似 Spring 的org.springframework.transaction.annotation.Propagation.REQUIRES_NEW 。在下文,很快我們就會看到 TCC-Transaction 自己的 org.mengyun.tcctransaction.api.Propagation

5.2 傳播發起分支事務

呼叫 #propagationNewBegin(...) 方法,傳播發起分支事務。該方法在呼叫方法型別為 MethodType.PROVIDER 並且 事務處於 Try 階段被呼叫。MethodType 在「6.2 可補償事務攔截器」詳細解析。

實現程式碼如下:

/**
* 傳播發起分支事務
*
* @param transactionContext 事務上下文
* @return 分支事務
*/
public Transaction propagationNewBegin(TransactionContext transactionContext) {
  // 建立 分支事務
  Transaction transaction = new Transaction(transactionContext);
  // 儲存 事務
  transactionRepository.create(transaction);
  // 註冊 事務
  registerTransaction(transaction);
  return transaction;
}

呼叫 Transaction 構造方法,建立分支事務。實現程式碼如下:

/**
* 建立分支事務
*
* @param transactionContext 事務上下文
*/
public Transaction(TransactionContext transactionContext) {
   this.xid = transactionContext.getXid(); // 事務上下文的 xid
   this.status = TransactionStatus.TRYING; // 嘗試中狀態
   this.transactionType = TransactionType.BRANCH; // 分支事務
}
  • 分支事務使用傳播的事務上下文的事務編號。

呼叫 TransactionRepository#crete() 方法,儲存事務。為什麼要儲存分支事務,在「6.3 資源協調者攔截器」詳細解析。 呼叫 #registerTransaction(...) 方法,註冊事務到當前執行緒事務佇列。5.3 傳播獲取分支事務

呼叫 #propagationExistBegin(...) 方法,傳播發起分支事務。該方法在呼叫方法型別為 MethodType.PROVIDER 並且 事務處於 Confirm / Cancel 階段被呼叫。MethodType 在「6.2 可補償事務攔截器」詳細解析。

實現程式碼如下:

/**
* 傳播獲取分支事務
*
* @param transactionContext 事務上下文
* @return 分支事務
* @throws NoExistedTransactionException 當事務不存在時
*/
public Transaction propagationExistBegin(TransactionContext transactionContext) throws NoExistedTransactionException {
   // 查詢 事務
   Transaction transaction = transactionRepository.findByXid(transactionContext.getXid());
   if (transaction != null) {
       // 設定 事務 狀態
       transaction.changeStatus(TransactionStatus.valueOf(transactionContext.getStatus()));
       // 註冊 事務
       registerTransaction(transaction);
       return transaction;
   } else {
       throw new NoExistedTransactionException();
   }
}
  • 呼叫 TransactionRepository#findByXid() 方法,查詢事務。
  • 呼叫 Transaction#changeStatus(...) 方法,設定事務狀態為 CONFIRMING 或 CANCELLING。
  • 呼叫 #registerTransaction(...) 方法,註冊事務到當前執行緒事務佇列。
  • 為什麼此處是分支事務呢?結合 #propagationNewBegin(...) 思考下。

5.4 提交事務

呼叫 #commit(...) 方法,提交事務。該方法在事務處於 Confirm / Cancel 階段被呼叫。

實現程式碼如下:

/**
* 提交事務
*/
public void commit() {
   // 獲取 事務
   Transaction transaction = getCurrentTransaction();
   // 設定 事務狀態 為 CONFIRMING
   transaction.changeStatus(TransactionStatus.CONFIRMING);
   // 更新 事務
   transactionRepository.update(transaction);
   try {
       // 提交 事務
       transaction.commit();
       // 刪除 事務
       transactionRepository.delete(transaction);
   } catch (Throwable commitException) {
       logger.error("compensable transaction confirm failed.", commitException);
       throw new ConfirmingException(commitException);
   }
}

呼叫 #getCurrentTransaction() 方法, 獲取事務。實現程式碼如下:

public Transaction getCurrentTransaction() {
   if (isTransactionActive()) {
       return CURRENT.get().peek(); // 獲得頭部元素
   }
   return null;
}

public boolean isTransactionActive() {
   Deque<Transaction> transactions = CURRENT.get();
   return transactions != null && !transactions.isEmpty();
}
  • 為什麼獲得佇列頭部元素呢?該元素即是上文呼叫 #registerTransaction(...) 註冊到佇列頭部。

呼叫 Transaction#changeStatus(...) 方法, 設定事務狀態為 CONFIRMING。 呼叫 TransactionRepository#update(...) 方法, 更新事務。 呼叫 Transaction#commit(...) 方法, 提交事務。 呼叫 TransactionRepository#delete(...) 方法,刪除事務。5.5 回滾事務

呼叫 #rollback(...) 方法,取消事務,和 #commit() 方法基本類似。該方法在事務處於 Confirm / Cancel 階段被呼叫

實現程式碼如下:

/**
* 回滾事務
*/
public void rollback() {
   // 獲取 事務
   Transaction transaction = getCurrentTransaction();
   // 設定 事務狀態 為 CANCELLING
   transaction.changeStatus(TransactionStatus.CANCELLING);
   // 更新 事務
   transactionRepository.update(transaction);
   try {
       // 提交 事務
       transaction.rollback();
       // 刪除 事務
       transactionRepository.delete(transaction);
   } catch (Throwable rollbackException) {
       logger.error("compensable transaction rollback failed.", rollbackException);
       throw new CancellingException(rollbackException);
   }
}

呼叫 #getCurrentTransaction() 方法,獲取事務。 呼叫 Transaction#changeStatus(...) 方法, 設定事務狀態為 CANCELLING。 呼叫 TransactionRepository#update(...) 方法, 更新事務。 呼叫 Transaction#rollback(...) 方法, 回滾事務。 呼叫 TransactionRepository#delete(...) 方法,刪除事務。5.6 新增參與者到事務

呼叫 #enlistParticipant(...) 方法,新增參與者到事務。該方法在事務處於 Try 階段被呼叫,在「6.3 資源協調者攔截器」有詳細解析。

實現程式碼如下:

/**
* 新增參與者到事務
*
* @param participant 參與者
*/
public void enlistParticipant(Participant participant) {
   // 獲取 事務
   Transaction transaction = this.getCurrentTransaction();
   // 新增參與者
   transaction.enlistParticipant(participant);
   // 更新 事務
   transactionRepository.update(transaction);
}

呼叫 #getCurrentTransaction() 方法,獲取事務。 呼叫 Transaction#enlistParticipant(...) 方法, 新增參與者到事務。 呼叫 TransactionRepository#update(...) 方法, 更新事務。6. 事務攔截器

TCC-Transaction 基於 [email protected] + [email protected] 註解 AOP 切面實現業務方法的 TCC 事務宣告攔截,同 Spring 的 [email protected] 的實現。

TCC-Transaction 有兩個攔截器:

  • org.mengyun.tcctransaction.interceptor.CompensableTransactionInterceptor,可補償事務攔截器。
  • org.mengyun.tcctransaction.interceptor.ResourceCoordinatorInterceptor,資源協調者攔截器。

在分享攔截器的實現之前,我們先一起看看 @Compensable 註解。

6.1 Compensable

@Compensable,標記可補償的方法註解。實現程式碼如下:

public @interface Compensable {

    /**
     * 傳播級別
     */
    Propagation propagation() default Propagation.REQUIRED;

    /**
     * 確認執行業務方法
     */
    String confirmMethod() default "";

    /**
     * 取消執行業務方法
     */
    String cancelMethod() default "";

    /**
     * 事務上下文編輯
     */
    Class<? extends TransactionContextEditor> transactionContextEditor() default DefaultTransactionContextEditor.class;
}

propagation,傳播級別( Propagation ),預設 Propagation.REQUIRED。和 Spring 的 Propagation 除了缺少幾個屬性,基本一致。實現程式碼如下:

public enum Propagation {

    /**
     * 支援當前事務,如果當前沒有事務,就新建一個事務。
     */
    REQUIRED(0),
    /**
     * 支援當前事務,如果當前沒有事務,就以非事務方式執行。
     */
    SUPPORTS(1),
    /**
     * 支援當前事務,如果當前沒有事務,就丟擲異常。
     */
    MANDATORY(2),
    /**
     * 新建事務,如果當前存在事務,把當前事務掛起。
     */
    REQUIRES_NEW(3);

    private final int value;
}

confirmMethod,確認執行業務方法名。

cancelMethod,取消執行業務方法名。 TransactionContextEditor,事務上下文編輯器( TransactionContextEditor ),用於設定和獲得事務上下文( TransactionContext ),在「6.3 資源協調者攔截器」可以看到被呼叫,此處只看它的程式碼實現。org.mengyun.tcctransaction.api.TransactionContextEditor 介面程式碼如下:

public interface TransactionContextEditor {

    /**
     * 從引數中獲得事務上下文
     *
     * @param target 物件
     * @param method 方法
     * @param args 引數
     * @return 事務上下文
     */
    TransactionContext get(Object target, Method method, Object[] args);

    /**
     * 設定事務上下文到引數中
     *
     * @param transactionContext 事務上下文
     * @param target 物件
     * @param method 方法
     * @param args 引數
     */
    void set(TransactionContext transactionContext, Object target, Method method, Object[] args);

}

DefaultTransactionContextEditor,預設事務上下文編輯器實現。實現程式碼如下:

class DefaultTransactionContextEditor implements TransactionContextEditor {

    @Override
    public TransactionContext get(Object target, Method method, Object[] args) {
        int position = getTransactionContextParamPosition(method.getParameterTypes());
        if (position >= 0) {
            return (TransactionContext) args[position];
        }
        return null;
    }

    @Override
    public void set(TransactionContext transactionContext, Object target, Method method, Object[] args) {
        int position = getTransactionContextParamPosition(method.getParameterTypes());
        if (position >= 0) {
            args[position] = transactionContext; // 設定方法引數
        }
    }

    /**
     * 獲得事務上下文在方法引數裡的位置
     *
     * @param parameterTypes 引數型別集合
     * @return 位置
     */
    public static int getTransactionContextParamPosition(Class<?>[] parameterTypes) {
        int position = -1;
        for (int i = 0; i < parameterTypes.length; i++) {
            if (parameterTypes[i].equals(org.mengyun.tcctransaction.api.TransactionContext.class)) {
                position = i;
                break;
            }
        }
        return position;
    }
}

x NullableTransactionContextEditor,無事務上下文編輯器實現。實現程式碼如下:

class NullableTransactionContextEditor implements TransactionContextEditor {

    @Override
    public TransactionContext get(Object target, Method method, Object[] args) {
        return null;
    }

    @Override
    public void set(TransactionContext transactionContext, Object target, Method method, Object[] args) {
    }
}

DubboTransactionContextEditor,Dubbo 事務上下文編輯器實現,通過 Dubbo 隱式傳參方式獲得事務上下文,在《TCC-Transaction 原始碼解析 —— Dubbo 支援》詳細解析。

6.2 可補償事務攔截器

先一起來看下可補償事務攔截器對應的切面 org.mengyun.tcctransaction.interceptor.CompensableTransactionAspect,實現程式碼如下:

@Aspect
public abstract class CompensableTransactionAspect {

    private CompensableTransactionInterceptor compensableTransactionInterceptor;

    public void setCompensableTransactionInterceptor(CompensableTransactionInterceptor compensableTransactionInterceptor) {
        this.compensableTransactionInterceptor = compensableTransactionInterceptor;
    }

    @Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
    public void compensableService() {
    }

    @Around("compensableService()")
    public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {
        return compensableTransactionInterceptor.interceptCompensableMethod(pjp);
    }

    public abstract int getOrder();
}

通過 [email protected] + [email protected] 註解,配置對 @Compensable 註解的方法進行攔截,呼叫 CompensableTransactionInterceptor#interceptCompensableMethod(...) 方法進行處理。CompensableTransactionInterceptor 實現程式碼如下:

public class CompensableTransactionInterceptor {

    private TransactionManager transactionManager;

    private Set<Class<? extends Exception>> delayCancelExceptions;

    public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {
        // 獲得帶 @Compensable 註解的方法
        Method method = CompensableMethodUtils.getCompensableMethod(pjp);
        //
        Compensable compensable = method.getAnnotation(Compensable.class);
        Propagation propagation = compensable.propagation();
        // 獲得 事務上下文
        TransactionContext transactionContext = FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs());
        // 當前執行緒是否在事務中
        boolean isTransactionActive = transactionManager.isTransactionActive();
        // 判斷事務上下文是否合法
        if (!TransactionUtils.isLegalTransactionContext(isTransactionActive, propagation, transactionContext)) {
            throw new SystemException("no active compensable transaction while propagation is mandatory for method " + method.getName());
        }
        // 計算方法型別
        MethodType methodType = CompensableMethodUtils.calculateMethodType(propagation, isTransactionActive, transactionContext);
        // 處理
        switch (methodType) {
            case ROOT:
                return rootMethodProceed(pjp);
            case PROVIDER:
                return providerMethodProceed(pjp, transactionContext);
            default:
                return pjp.proceed();
        }
    }
}

呼叫 CompensableMethodUtils#getCompensableMethod(...) 方法,獲得帶 @Compensable 註解的方法。實現程式碼如下:

// CompensableMethodUtils.java
/**
* 獲得帶 @Compensable 註解的方法
*
* @param pjp 切面點
* @return 方法
*/
public static Method getCompensableMethod(ProceedingJoinPoint pjp) {
   Method method = ((MethodSignature) (pjp.getSignature())).getMethod(); // 代理方法物件
   if (method.getAnnotation(Compensable.class) == null) {
       try {
           method = pjp.getTarget().getClass().getMethod(method.getName(), method.getParameterTypes()); // 實際方法物件
       } catch (NoSuchMethodException e) {
           return null;
       }
   }
   return method;
}

呼叫 TransactionContextEditor#get(...) 方法,從引數中獲得事務上下文。為什麼從引數中可以獲得事務上下文呢?在「6.3 資源協調者攔截器」揭曉答案。

呼叫 TransactionManager#isTransactionActive() 方法,當前執行緒是否在事務中。實現程式碼如下:

// TransactionManager.java
private static final ThreadLocal<Deque<Transaction>> CURRENT = new ThreadLocal<Deque<Transaction>>();

public boolean isTransactionActive() {
   Deque<Transaction> transactions = CURRENT.get();
   return transactions != null && !transactions.isEmpty();
}

呼叫 TransactionUtils#isLegalTransactionContext(...) 方法,判斷事務上下文是否合法。實現程式碼如下:

// TransactionUtils.java
/**
* 判斷事務上下文是否合法
* 在 Propagation.MANDATORY 必須有在事務內
*
* @param isTransactionActive 是否
* @param propagation 傳播級別
* @param transactionContext 事務上下文
* @return 是否合法
*/
public static boolean isLegalTransactionContext(boolean isTransactionActive, Propagation propagation, TransactionContext transactionContext) {
   if (propagation.equals(Propagation.MANDATORY) && !isTransactionActive && transactionContext == null) {
       return false;
   }
   return true;
}
  • 當傳播級別為 Propagation.MANDATORY 時,要求必須在事務中。

呼叫 CompensableMethodUtils#calculateMethodType(...) 方法,計算方法型別。實現程式碼如下:

/**
* 計算方法型別
*
* @param propagation 傳播級別
* @param isTransactionActive 是否事務開啟
* @param transactionContext 事務上下文
* @return 方法型別
*/
public static MethodType calculateMethodType(Propagation propagation, boolean isTransactionActive, TransactionContext transactionContext) {
   if ((propagation.equals(Propagation.REQUIRED) && !isTransactionActive && transactionContext == null) // Propagation.REQUIRED:支援當前事務,當前沒有事務,就新建一個事務。
           || propagation.equals(Propagation.REQUIRES_NEW)) { // Propagation.REQUIRES_NEW:新建事務,如果當前存在事務,把當前事務掛起。
       return MethodType.ROOT;
   } else if ((propagation.equals(Propagation.REQUIRED) // Propagation.REQUIRED:支援當前事務
               || propagation.equals(Propagation.MANDATORY)) // Propagation.MANDATORY:支援當前事務
           && !isTransactionActive && transactionContext != null) {
       return MethodType.PROVIDER;
   } else {
       return MethodType.NORMAL;
   }
}
  • 計算方法型別( MethodType )的目的,可以根據不同方法型別,做不同的事務處理。
  • 方法型別為 MethodType.ROOT 時,發起根事務,判斷條件如下二選一: 
  1. 事務傳播級別為 Propagation.REQUIRED,並且當前沒有事務。
  2. 事務傳播級別為 Propagation.REQUIRES_NEW,新建事務,如果當前存在事務,把當前事務掛起。此時,事務管理器的當前執行緒事務佇列可能會存在多個事務。
  • 方法型別為 MethodType.ROOT 時,發起分支事務,判斷條件如下二選一: 
  1. 事務傳播級別為 Propagation.REQUIRED,並且當前不存在事務,並且方法引數傳遞了事務上下文
  2. 事務傳播級別為 Propagation.PROVIDER,並且當前不存在事務,並且方法引數傳遞了事務上下文
  3. 當前不存在事務,方法引數傳遞了事務上下文是什麼意思?當跨服務遠端呼叫時,被呼叫服務本身( 服務提供者 )不在事務中,通過傳遞事務上下文引數,融入當前事務。
  • 方法型別為 MethodType.Normal 時,不進行事務處理。
  • MethodType.CONSUMER 專案已經不再使用,猜測已廢棄。

當方法型別為 MethodType.ROOT 時,呼叫 #rootMethodProceed(...) 方法,發起 TCC 整體流程。實現程式碼如下:

private Object rootMethodProceed(ProceedingJoinPoint pjp) throws Throwable {
   Object returnValue;
   Transaction transaction = null;
   try {
       // 發起根事務
       transaction = transactionManager.begin();
       // 執行方法原邏輯
       try {
           returnValue = pjp.proceed();
       } catch (Throwable tryingException) {
           if (isDelayCancelException(tryingException)) { // 是否延遲迴滾
           } else {
               logger.warn(String.format("compensable transaction trying failed. transaction content:%s", JSON.toJSONString(transaction)), tryingException);
               // 回滾事務
               transactionManager.rollback();
           }
           throw tryingException;
       }
       // 提交事務
       transactionManager.commit();
   } finally {
       // 將事務從當前執行緒事務佇列移除
       transactionManager.cleanAfterCompletion(transaction);
   }
   return returnValue;
}

呼叫 #transactionManager() 方法,發起根事務,TCC Try 階段開始。 呼叫 ProceedingJoinPoint#proceed() 方法,執行方法原邏輯( 即 Try 邏輯 )。 當原邏輯執行異常時,TCC Try 階段失敗,呼叫 TransactionManager#rollback(...) 方法,TCC Cancel 階段,回滾事務。此處 #isDelayCancelException(...) 方法,判斷異常是否為延遲取消回滾異常,部分異常不適合立即回滾事務,在《TCC-Transaction 原始碼分析 —— 事務恢復》詳細解析。 當原邏輯執行成功時,TCC Try 階段成功,呼叫 TransactionManager#commit(...) 方法,TCC Confirm 階段,提交事務。 呼叫 TransactionManager#cleanAfterCompletion(...) 方法,將事務從當前執行緒事務佇列移除,避免執行緒衝突。實現程式碼如下:

// TransactionManager.java
public void cleanAfterCompletion(Transaction transaction) {
    if (isTransactionActive() && transaction != null) {
        Transaction currentTransaction = getCurrentTransaction();
        if (currentTransaction == transaction) {
            CURRENT.get().pop();
        } else {
            throw new SystemException("Illegal transaction when clean after completion");
        }
    }
}

x 當方法型別為 Propagation.PROVIDER 時,服務提供者參與 TCC 整體流程。實現程式碼如下:

private Object providerMethodProceed(ProceedingJoinPoint pjp, TransactionContext transactionContext) throws Throwable {
   Transaction transaction = null;
   try {
       switch (TransactionStatus.valueOf(transactionContext.getStatus())) {
           case TRYING:
               // 傳播發起分支事務
               transaction = transactionManager.propagationNewBegin(transactionContext);
               return pjp.proceed();
           case CONFIRMING:
               try {
                   // 傳播獲取分支事務
                   transaction = transactionManager.propagationExistBegin(transactionContext);
                   // 提交事務
                   transactionManager.commit();
               } catch (NoExistedTransactionException excepton) {
                   //the transaction has been commit,ignore it.
               }
               break;
           case CANCELLING:
               try {
                   // 傳播獲取分支事務
                   transaction = transactionManager.propagationExistBegin(transactionContext);
                   // 回滾事務
                   transactionManager.rollback();
               } catch (NoExistedTransactionException exception) {
                   //the transaction has been rollback,ignore it.
               }
               break;
       }
   } finally {
       // 將事務從當前執行緒事務佇列移除
       transactionManager.cleanAfterCompletion(transaction);
   }
   // 返回空值
   Method method = ((MethodSignature) (pjp.getSignature())).getMethod();
   return ReflectionUtils.getNullValue(method.getReturnType());
}
  • 當事務處於 TransactionStatus.TRYING 時,呼叫 TransactionManager#propagationExistBegin(...) 方法,傳播發起分支事務。發起分支事務完成後,呼叫 ProceedingJoinPoint#proceed() 方法,執行方法原邏輯( 即 Try 邏輯 )。 
  1. 為什麼要傳播發起分支事務?在根事務進行 Confirm / Cancel 時,呼叫根事務上的參與者們提交或回滾事務時,進行遠端服務方法呼叫的參與者,可以通過自己的事務編號關聯上傳播的分支事務( 兩者的事務編號相等 ),進行事務的提交或回滾。
  • 當事務處於 TransactionStatus.CONFIRMING 時,呼叫 TransactionManager#commit() 方法,提交事務。
  • 當事務處於 TransactionStatus.CANCELLING 時,呼叫 TransactionManager#rollback() 方法,提交事務。
  • 呼叫 TransactionManager#cleanAfterCompletion(...) 方法,將事務從當前執行緒事務佇列移除,避免執行緒衝突。
  • 當事務處於 TransactionStatus.CONFIRMING / TransactionStatus.CANCELLING 時,呼叫 ReflectionUtils#getNullValue(...) 方法,返回空值。為什麼返回空值?Confirm / Cancel 相關方法,是通過 AOP 切面呼叫,只調用,不處理返回值,但是又不能沒有返回值,因此直接返回空。實現程式碼如下:
public static Object getNullValue(Class type) {
   // 處理基本型別
   if (boolean.class.equals(type)) {
       return false;
   } else if (byte.class.equals(type)) {
       return 0;
   } else if (short.class.equals(type)) {
       return 0;
   } else if (int.class.equals(type)) {
       return 0;
   } else if (long.class.equals(type)) {
       return 0;
   } else if (float.class.equals(type)) {
       return 0;
   } else if (double.class.equals(type)) {
       return 0;
   }
   // 處理物件
   return null;
}

當方法型別為 Propagation.NORMAL 時,執行方法原邏輯,不進行事務處理

6.3 資源協調者攔截器

先一起來看下資源協調者攔截器 對應的切面 org.mengyun.tcctransaction.interceptor.CompensableTransactionAspect,實現程式碼如下:

@Aspect
public abstract class ResourceCoordinatorAspect {

    private ResourceCoordinatorInterceptor resourceCoordinatorInterceptor;

    @Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
    public void transactionContextCall() {
    }

    @Around("transactionContextCall()")
    public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {
        return resourceCoordinatorInterceptor.interceptTransactionContextMethod(pjp);
    }

    public void setResourceCoordinatorInterceptor(ResourceCoordinatorInterceptor resourceCoordinatorInterceptor) {
        this.resourceCoordinatorInterceptor = resourceCoordinatorInterceptor;
    }

    public abstract int getOrder();
}
  • 通過 [email protected] + [email protected] 註解,配置對 @Compensable 註解的方法進行攔截,呼叫 ResourceCoordinatorInterceptor#interceptTransactionContextMethod(...) 方法進行處理。

ResourceCoordinatorInterceptor 實現程式碼如下:

public class ResourceCoordinatorInterceptor {

    private TransactionManager transactionManager;

    public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {
        Transaction transaction = transactionManager.getCurrentTransaction();
        if (transaction != null) {
            switch (transaction.getStatus()) {
                case TRYING:
                    // 新增事務參與者
                    enlistParticipant(pjp);
                    break;
                case CONFIRMING:
                    break;
                case CANCELLING:
                    break;
            }
        }
        // 執行方法原邏輯
        return pjp.proceed(pjp.getArgs());
    }
}
  • 當事務處於 TransactionStatus.TRYING 時,呼叫 #enlistParticipant(...) 方法,新增事務參與者。
  • 呼叫 ProceedingJoinPoint#proceed(...) 方法,執行方法原邏輯。

ResourceCoordinatorInterceptor#enlistParticipant() 實現程式碼如下:

private void enlistParticipant(ProceedingJoinPoint pjp) throws IllegalAccessException, InstantiationException {
   // 獲得 @Compensable 註解
   Method method = CompensableMethodUtils.getCompensableMethod(pjp);
   if (method == null) {
       throw new RuntimeException(String.format("join point not found method, point is : %s", pjp.getSignature().getName()));
   }
   Compensable compensable = method.getAnnotation(Compensable.class);
   // 獲得 確認執行業務方法 和 取消執行業務方法
   String confirmMethodName = compensable.confirmMethod();
   String cancelMethodName = compensable.cancelMethod();
   // 獲取 當前執行緒事務第一個(頭部)元素
   Transaction transaction = transactionManager.getCurrentTransaction();
   // 建立 事務編號
   TransactionXid xid = new TransactionXid(transaction.getXid().getGlobalTransactionId());
   // TODO
   if (FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs()) == null) {
       FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().set(new TransactionContext(xid, TransactionStatus.TRYING.getId()), pjp.getTarget(), ((MethodSignature) pjp.getSignature()).getMethod(), pjp.getArgs());
   }
   // 獲得類
   Class targetClass = ReflectionUtils.getDeclaringType(pjp.getTarget().getClass(), method.getName(), method.getParameterTypes());
   // 建立 確認執行方法呼叫上下文 和 取消執行方法呼叫上下文
   InvocationContext confirmInvocation = new InvocationContext(targetClass,
           confirmMethodName,
           method.getParameterTypes(), pjp.getArgs());
   InvocationContext cancelInvocation = new InvocationContext(targetClass,
           cancelMethodName,
           method.getParameterTypes(), pjp.getArgs());
   // 建立 事務參與者
   Participant participant =
           new Participant(
                   xid,
                   confirmInvocation,
                   cancelInvocation,
                   compensable.transactionContextEditor());
   // 新增 事務參與者 到 事務
   transactionManager.enlistParticipant(participant);
}
  • 呼叫 CompensableMethodUtils#getCompensableMethod(...) 方法,獲得帶 @Compensable 註解的方法。
  • 呼叫 #getCurrentTransaction() 方法, 獲取事務。

呼叫 TransactionXid 構造方法,建立分支事務編號。實現程式碼如下:

/**
 * 全域性事務編號
 */
private byte[] globalTransactionId;
/**
 * 分支事務編號
 */
private byte[] branchQualifier;

public TransactionXid(byte[] globalTransactionId) {
   this.globalTransactionId = globalTransactionId;
   branchQualifier = uuidToByteArray(UUID.randomUUID()); // 生成 分支事務編號
}
  • 分支事務編號( branchQualifier ) 需要生成。

TODO TransactionContext 和 Participant 的關係。

呼叫 ReflectionUtils#getDeclaringType(...) 方法,獲得宣告 @Compensable 方法的實際類。實現程式碼如下:

public static Class getDeclaringType(Class aClass, String methodName, Class<?>[] parameterTypes) {
   Method method;
   Class findClass = aClass;
   do {
       Class[] clazzes = findClass.getInterfaces();
       for (Class clazz : clazzes) {
           try {
               method = clazz.getDeclaredMethod(methodName, parameterTypes);
           } catch (NoSuchMethodException e) {
               method = null;
           }
           if (method != null) {
               return clazz;
           }
       }
       findClass = findClass.getSuperclass();
   } while (!findClass.equals(Object.class));
   return aClass;
}

呼叫 InvocationContext 構造方法,分別建立確認執行方法呼叫上下文和取消執行方法呼叫上下文。實現程式碼如下:

/**
* 類
*/
private Class targetClass;
/**
* 方法名
*/
private String methodName;
/**
* 引數型別陣列
*/
private Class[] parameterTypes;
/**
* 引數陣列
*/
private Object[] args;

public InvocationContext(Class targetClass, String methodName, Class[] parameterTypes, Object... args) {
  this.methodName = methodName;
  this.parameterTypes = parameterTypes;
  this.targetClass = targetClass;
  this.args = args;
}

呼叫 Participant 構造方法,建立事務參與者。實現程式碼如下:

public class Participant implements Serializable {

    private static final long serialVersionUID = 4127729421281425247L;

    /**
     * 事務編號
     */
    private TransactionXid xid;
    /**
     * 確認執行業務方法呼叫上下文
     */
    private InvocationContext confirmInvocationContext;
    /**
     * 取消執行業務方法
     */
    private InvocationContext cancelInvocationContext;
    /**
     * 執行器
     */
    private Terminator terminator = new Terminator();
    /**
     * 事務上下文編輯
     */
    Class<? extends TransactionContextEditor> transactionContextEditorClass;

    public Participant() {
    }

    public Participant(TransactionXid xid, InvocationContext confirmInvocationContext, InvocationContext cancelInvocationContext, Class<? extends TransactionContextEditor> transactionContextEditorClass) {
        this.xid = xid;
        this.confirmInvocationContext = confirmInvocationContext;
        this.cancelInvocationContext = cancelInvocationContext;
        this.transactionContextEditorClass = transactionContextEditorClass;
    }
}
  • 呼叫 TransactionManager#enlistParticipant(...) 方法,新增事務參與者到事務。