1. 程式人生 > >支付寶 分散式事務服務 DTS 二

支付寶 分散式事務服務 DTS 二

分散式事務服務 DTS二

更多幹貨

如何玩轉 DTS,基本上使用 DTS 對發起方的配置要求會多一點。

新增 DTS 的依賴

NOTE: 發起方和參與方都需要新增依賴。

如果使用 SOFA Lite,只需按照樣例工程裡的方式新增依賴:

<dependency>
    <groupId>com.alipay.sofa</groupId>
    <artifactId>slite-starter-xts</artifactId>
</dependency>

如果沒有使用 SOFA Lite,那麼需要在 pom 配置里加上 DTS 的依賴:

<dependency
> <groupId>com.alipay.xts</groupId> <artifactId>xts-core</artifactId> <version>6.0.8</version> </dependency> <dependency> <groupId>com.alipay.xts</groupId> <artifactId>xts-adapter-sofa</artifactId> <version
>6.0.8</version> </dependency>

場景介紹

  1. 首先我們假想這樣一種場景:轉賬服務,從銀行 A 某個賬戶轉 100 元錢到銀行 B 的某個賬戶,銀行 A 和銀行 B 可以認為是兩個單獨的系統,也就是兩套單獨的資料庫。
  2. 我們將賬戶系統簡化成只有賬戶和餘額 2 個欄位,並且為了適應 DTS 的兩階段設計要求,業務上又增加了一個凍結金額(凍結金額是指在一筆轉賬期間,在一階段的時候使用該欄位臨時儲存轉賬金額,該轉賬額度不能被使用,只有等這筆分散式事務全部提交成功時,才會真正的計入可用餘額)。按這樣的設計,使用者的可用餘額等於賬戶餘額減去凍結金額。這點是理解參與者設計的關鍵,也是 DTS 保證最終一致的業務約束。
  3. 同時為了記錄賬戶操作明細,我們設計了一張賬戶流水錶用來記錄每次賬戶的操作明細,所以領域物件簡單設計如下:
public class Account {
    /**
     * 賬戶
     */
    private String accountNo;
    /**
     * 餘額
     */
    private double amount;
    /**
     * 凍結金額
     */
    private double freezedAmount;
public class AccountTransaction {
    /**
     * 事務id
     */
    private String txId;
    /**
     * 操作賬戶
     */
    private String accountNo;
    /**
     * 操作金額
     */
    private double amount;
    /**
     * 操作型別,扣帳還是入賬
     */
    private String type;

A 銀行參與者

我們假設需要從 A 賬戶扣 100 元錢,所以 A 系統提供了一個扣帳的服務,對應扣帳的一階段介面和相應的二階段介面如下:

/**
 * A銀行參與者,執行扣帳操作
 * @version $Id: FirstAction.java, v 0.1 2014年9月22日 下午5:32:59 Exp $
 */
public interface FirstAction {
  /**
   * 一階段方法,注意要打上xts的標註哦
   * 
   * @param businessActionContext
   * @param accountNo
   * @param amount
   */
  @TwoPhaseBusinessAction(name = "firstAction", commitMethod = "commit", rollbackMethod = "rollback")
  public void prepare_minus(BusinessActionContext businessActionContext,String accountNo,double amount);
  /**
   * 二階段的提交方法
   * @param businessActionContext
   * @return
   */
  public boolean commit(BusinessActionContext businessActionContext);
  /**
   * 二階段的回滾方法
   * @param businessActionContext
   * @return
   */
  public boolean rollback(BusinessActionContext businessActionContext);
}

對應的一階段扣帳實現

public void prepare_minus(final BusinessActionContext businessActionContext,
                          final String accountNo, final double amount) {
    transactionTemplate.execute(new TransactionCallback() {
        @Override
        public Object doInTransaction(TransactionStatus status) {
            try {
                try {
                        //鎖定賬戶
                        Account account = accountDAO.getAccount(accountNo);
                        if (account.getAmount() - amount < 0) {
                            throw new TransactionFailException("餘額不足");
                        }
                        //先記一筆賬戶操作流水
                        AccountTransaction accountTransaction = new AccountTransaction();
                        accountTransaction.setTxId(businessActionContext.getTxId());
                        accountTransaction.setAccountNo(accountNo);
                        accountTransaction.setAmount(amount);
                        accountTransaction.setType("minus");
                        //初始狀態,如果提交則更新為C狀態,如果失敗則刪除記錄
                        accountTransaction.setStatus("I");
                        accountTransactionDAO.addTransaction(accountTransaction);
                        //再遞增凍結金額,表示這部分錢已經被凍結,不能使用
                        double freezedAmount = account.getFreezedAmount() + amount;
                        account.setFreezedAmount(freezedAmount);
                        accountDAO.updateFreezedAmount(account);
                    } catch (Exception e) {
                        System.out.println("一階段異常," + e);
                        throw new TransactionFailException("一階段操作失敗", e);
                    }
            return null;
        }
    });
}

對應的二階段提交操作

public boolean commit(final BusinessActionContext businessActionContext) {
    transactionTemplate.execute(new TransactionCallback() {
        @Override
        public Object doInTransaction(TransactionStatus status) {
            try {
                    //找到賬戶操作流水
                    AccountTransaction accountTransaction = accountTransactionDAO
                        .findTransaction(businessActionContext.getTxId());
                    //事務資料被刪除了
                    if (accountTransaction == null) {
                        throw new TransactionFailException("事務資訊被刪除");
                    }
                    //重複提交冪等保證只做一次
                    if (StringUtils.equalsIgnoreCase("C", accountTransaction.getStatus())) {
                        return true;
                    }
                    Account account = accountDAO.getAccount(accountTransaction.getAccountNo());
                    //扣錢
                    double amount = account.getAmount() - accountTransaction.getAmount();
                    if (amount < 0) {
                        throw new TransactionFailException("餘額不足");
                    }
                    account.setAmount(amount);
                    accountDAO.updateAmount(account);
                    //凍結金額相應減少
                    account.setFreezedAmount(account.getFreezedAmount()
                                             - accountTransaction.getAmount());
                    accountDAO.updateFreezedAmount(account);
                    //事務成功之後更新為C
                    accountTransactionDAO.updateTransaction(businessActionContext.getTxId(), "C");
                } catch (Exception e) {
                    System.out.println("二階段異常," + e);
                    throw new TransactionFailException("二階段操作失敗", e);
                }
            return null;
        }
    });
    return false;
}

對應的二階段回滾操作

public boolean rollback(final BusinessActionContext businessActionContext) {
    transactionTemplate.execute(new TransactionCallback() {
        @Override
        public Object doInTransaction(TransactionStatus status) {
            try {
                    //回滾凍結金額
                    AccountTransaction accountTransaction = accountTransactionDAO
                        .findTransaction(businessActionContext.getTxId());
                    if (accountTransaction == null) {
                        System.out.println("二階段---空回滾成功");
                        return null;
                    }
                    Account account = accountDAO.getAccount(accountTransaction.getAccountNo());
                    account.setFreezedAmount(account.getFreezedAmount()
                                             - accountTransaction.getAmount());
                    accountDAO.updateFreezedAmount(account);
                    //刪除流水
                    accountTransactionDAO.deleteTransaction(businessActionContext.getTxId());
                } catch (Exception e) {
                    System.out.println("二階段異常," + e);
                    throw new TransactionFailException("二階段操作失敗", e);
                }
              return null;
        }
   });
   return false;
}

B 銀行參與者

我們假設需要對 B 賬戶入賬 100 元錢,所以 B 系統提供了一個入賬的服務,對應入賬的一階段介面和相應的二階段介面基本和 A 銀行參與者類似,這裡不多做介紹,可以直接檢視樣例工程下的 xts-sample 工程程式碼。

發起方

前面介紹了參與者的實現細節,接下來看看發起方系統是如何協調這 2 個參與者,達到分散式事務下資料的最終一致性的。相比參與者,發起方的配置要複雜一些。

  1. 在發起方自己的資料庫裡建立 DTS 的表
  2. 配置 BusinessActivityControlService

BusinessActivityControlService 是 DTS 分散式事務的啟動類,在 SOFA 環境中,我們可以這樣使用

<!-- 分散式事務的服務,用來發起分散式事務 -->
<sofa:xts id="businessActivityControlService">
  <!-- 發起方自己的資料來源,建議使用zdal資料來源元件,這裡簡單使用dbcp資料來源 -->
   <sofa:datasource ref="activityDataSource"/>
  <!-- 如果使用zdal資料來源,可以不用配置這個屬性,這個dbType是用來區分目標庫的型別,以方便xts設定sqlmap -->
   <sofa:dbtype value="mysql"/>
</sofa:xts>

在其他環境中,我們也可以將它配置成一個普通 Bean,配置如下

<!-- 分散式事務的服務,用來發起分散式事務 -->
<bean name="businessActivityControlService" class="com.alipay.xts.client.api.impl.sofa.BusinessActivityControlServiceImplSofa">
   <!-- 發起方自己的資料來源,建議使用zdal資料來源元件,這裡簡單使用dbcp資料來源 -->
   <property name="dataSource" ref="activityDataSource"/>
   <!-- 如果使用zdal資料來源,可以不用配置這個屬性,這個dbType是用來區分目標庫的型別,以方便xts設定sqlmap -->
   <property name="dbType" value="mysql"/>
</bean>
  1. 配置參與者服務和攔截器。如果是在 SOFA 環境中,DTS 框架會自動攔截參與者方法,攔截器就不用配置了
<!-- 第一個參與者的代理 -->
<bean id="firstAction" class="org.springframework.aop.framework.ProxyFactoryBean">
   <property name="proxyInterfaces" value="com.alipay.xts.client.sample.action.FirstAction"/>
   <property name="target" ref="firstActionTarget"/>
   <property name="interceptorNames">
      <list>
          <value>businessActionInterceptor</value>
      </list>
   </property>
</bean>
<!-- 第一個參與者 -->
<bean id="firstActionTarget" class="com.alipay.xts.client.sample.action.impl.FirstActionImpl">
   <property name="accountTransactionDAO">
      <ref bean="firstActionAccountTransactionDAO" />
   </property>
   <property name="accountDAO">
      <ref bean="firstActionAccountDAO" />
   </property>
   <property name="transactionTemplate">
      <ref bean="firstActionTransactionTemplate" />
   </property>
</bean>
<!-- 第二個參與者的代理 -->
<bean id="secondAction" class="org.springframework.aop.framework.ProxyFactoryBean">
   <property name="proxyInterfaces" value="com.alipay.xts.client.sample.action.SecondAction"/>
   <property name="target" ref="secondActionTarget"/>
   <property name="interceptorNames">
     <list>
        <value>businessActionInterceptor</value>
     </list>
   </property>
</bean>
<!-- 第二個參與者 -->
<bean id="secondActionTarget" class="com.alipay.xts.client.sample.action.impl.SecondActionImpl">
   <property name="accountTransactionDAO">
      <ref bean="secondActionAccountTransactionDAO" />
   </property>
   <property name="accountDAO">
      <ref bean="secondActionAccountDAO" />
   </property>
   <property name="transactionTemplate">
      <ref bean="secondActionTransactionTemplate" />
   </property>
</bean>
<!-- 攔截器,在參與者呼叫前生效,插入參與者的action記錄 -->
<bean id="businessActionInterceptor"
     class="com.alipay.sofa.platform.xts.bacs.integration.BusinessActionInterceptor">
   <property name="businessActivityControlService" ref="businessActivityControlService"/>
</bean>
  1. 發起分散式事務

啟動分散式事務的入口方法

/**
 * 啟動一個業務活動。
 * 
 * 為了保證業務活動的唯一性,對同樣的businessType與businessId,只能有一次成功記錄。
 * 
 * 系統允許多次呼叫start方式啟動業務活動,如果當前業務活動已經存在,再次啟動業務活動不會有任何效果,也不會檢查業務型別與業務號是否匹配。
 * 
 * @param businessType 業務型別,由業務系統自定義,比如'trade_pay'代表交易支付
 * @param businessId 業務號,如交易號
 * @notice 事務號的格式為: businessType+"-"+businessId,總長度為128
 * @return 
 */
BusinessActivityId start(String businessType, String businessId, Map<String, Object> properties);

businessType + businessId 就是最終的事務號,properties 可以讓發起方設定一些全域性的事務上下文資訊。

轉賬服務發起分散式事務

/**
 * 執行轉賬操作
 * 
 * @param from
 * @param to
 * @param amount
 */
public void transfer(final String from, final String to, final double amount) {
    /**
     * 注意:開啟xts服務必須包含在發起方的本地事務模版中

            
           

相關推薦

支付 分散式事務服務 DTS

分散式事務服務 DTS二更多幹貨如何玩轉 DTS,基本上使用 DTS 對發起方的配置要求會多一點。新增 DTS 的依賴NOTE: 發起方和參與方都需要新增依賴。如果使用 SOFA Lite,只需按照樣例工程裡的方式新增依賴:<dependency> <

python分散式事務方案()基於訊息最終一致性

python分散式事務方案(二)基於訊息最終一致性 上一章採用的是tcc方案,但是在進行批量操作時,比如說幾百臺主機一起分配策略時,會執行很長時間,這時體驗比較差。 由於zabbix隱藏域後臺,而這個慢主要是集中在呼叫zabbix介面,這裡我們就基於訊息最終一致性來進行優化 訊息一致性方案是通過

構建基於RocketMQ的分散式事務服務

說在前面 Apache RocketMQ-4.3.0正式Release了事務訊息的特性,順著最近的這個熱點。第一篇文章,就來聊一下在軟體工程學上的長久的難題——分散式事務(Distributed Transaction)。 這個技術也在各個諸如阿里,騰訊等大廠的

Netty遊戲伺服器實戰開發(8):利用redis或者zookeeper實現3pc分散式事務鎖()。支撐騰訊系列某手遊百萬級流量公測

導讀:在上篇文章中介紹了分散式事務專案的基本原理和工程元件,我們瞭解到了分散式事務的理論知識。處於實戰的經驗,我們將理論知識使用到實際專案中。所以我們將藉助idea中maven工程 來實戰我們的專案。 回到正文: 在上篇文章中我們已經把需要的準備工作做好了。現在

分散式事務一致性:階段提交協議(2PC)

文章目錄 一、概念 二、優缺點 優點: 缺點: 三、總結 分散式系統開發不可避免會遇到分散式事務,目前業界都

分散式事務解決方案()【基於可靠訊息的最終一致性】

2. 最終一致性(基於可靠訊息) 2.1 訊息傳送的一致性 指產生訊息的業務動作與訊息傳送的一致。(也就是說,如果業務操作成功,那麼由這個業務操作所產生的訊息一定要成功投遞出去,否則就丟訊息) 2.1.1 如何保障訊息傳送一致性 處理方式1

支付SDK整合服務端(java)

支付寶SDK整合服務端 配置項(公共引數) // 商戶appid public static String APPID = "2017120800451448"; // 私鑰 pkcs8格式的 public static String RSA_P

js / 前端 / 支付,微信合併維碼功能

支付寶,微信合併二維碼 近期專案要優化支付頁面,希望將兩個二維碼合成一個。研究整理一下: 首先做這件事,要明白原理哦: 網站的支付功能,一般都是生成一個( 後臺大哥與支付寶或微信介面授權好了的ur

分散式事務實戰()--可靠訊息的最終一致性方案(訊息的一致性問題)

前言 訊息傳送一致性問題: 在分散式部署環境下,通過網路進行通訊,就會有資料傳世的不確定性,也就是CAP中的P【會出現分割槽容錯性的問題】。主動方傳送訊息到訊息中介軟體以及訊息中介軟體到被動方應用題都會出現網路的問題; 如何保證一致性問題

更多免費初級中級高階大資料java視訊教程下載 加(微***信((號keepper,請備註java或掃下面23維4碼第31: 2017年7月最新微服務架構的分散式事務解決方案價值1399

更多免費初級中級高階大資料java視訊教程下載 加(微***信((號keepper,請備註java或掃下面2二3維4碼第31: 2017年7月最新微服務架構的分散式事務解決方案價值1399java視訊教程01 課程介紹.wmvjava視訊教程02 解決方案的效果演示(結合支付系統真實應用場景).mp4java

Java支付支付開發流程與原理【沙箱環境】【分散式事務解決方案】

不管是支付寶支付,還是微信支付,還是銀聯支付等,大部分的支付流程都是相似的,學會了其中的思想,那麼其他支付方式也就很簡單了。 支付寶支付流程: 1、A網站以POST請求方式提交引數給支付寶介面,在支付寶端進行支付處理。 POST請求方式一定程度下保證了安全性,即在url

Java 支付之APP支付服務端 ()

/** * 支付寶請求交易 * * @param bean * @return */ @RequestMapping(value=Route.Payment.ALI_PAY,method=RequestMethod.POST) @ResponseBody public Respon

支付支付代理商 支付支付省級市級服務

支付寶支付 支付寶支付代理商 支付寶省級代理 杭州合言從2015年踏足移動支付行業以來以“為支付寶支付代理商持續提供小而美的改變”為宗旨,結合線上公眾號、支付寶服務窗代理加盟省級市級一級支付寶支付加盟商、采寶移動支付提供支付寶支付等平臺,公司自主研發出線上營銷線下收款的互聯網020閉環營銷集成收單系統-

聚合支付服務商 微信+支付一站式服務平臺

聚合支付 二維碼支付 移動支付代理 支付寶支付代理服務商 杭州合言信息科技有限公司-發展至今,在支付中,支持各種支付工具,一站式支付服務商家(包括:支付寶、微信支付、中國銀聯)等支付服務,在各大第三方支付平臺僵持不下之際,采寶聚合支付便開始活躍在移動支付市場上。

微信支付維碼支付代理加盟 采維碼掃碼POS

微信支付代理 支付寶代理商 智能POS代理 現如今微信支付寶二維碼支付代理以及進入了支付潮流。如:銀幣、再到紙幣,演變到“錢”變得越來越輕薄。在現這個二維碼掃碼POS市場中,互聯網技術的支撐下,刷手機乘公交、刷支付寶買菜購物、繳水電費……日常生活中使用“現金”的場景,逐漸被手機各種支付方式取代,“無現金

最近在做支付支付,在本地測試一切正常,上傳到服務器就遇到報錯:

-1 tail 服務 war tar nbsp 百度 nature function 最近在做支付寶支付,在本地測試一切正常,上傳到服務器就遇到報錯: Warning: openssl_sign() [function.openssl-sign]: Unknown sign

微信和支付支付模式詳解及實現

配置 其余 logs https 朋友 一個 target 多租戶 對比   繼上篇《微信和支付寶支付模式詳解及實現》到現在已經有半年時間了,這期間不少朋友在公號留言支付相關的問題,最近正好也在處理公司支付相關的對接,打算寫這篇來做一個更進一步的介紹,同時根據主要的幾個支付

H5集成支付App支付客戶端+服務端(java)

XML 服務端 onf response 成功 code default format sim 由於最近項目需要接入第三方開發,支付寶支付,微信支付,OSS圖片上傳以及短信服務。為避免第一次開發支付寶再次花時間查看文檔,今天總結一下接入支付寶的過程,以及接入過程中遇到的問題

微信支付支付支付生成維碼的方法(php生成維碼的三種方法)

gpo 就是 contents 微信支付 amp 如何 使用 alt scrip 如果圖簡單,可以用在線生成 http://pan.baidu.com/share/qrcode?w=150&h=150&url=http://www.xinzhenkj.com

JAVA項目實戰,項目架構,高並發,分布式,微服務架構,微信支付支付支付,理財系統,並發編程

等等 搭建 服務器 net 三方庫 必須 服務發現 netflix 分布式 Spring Cloud集成項目有很多,下面我們列舉一下和Spring Cloud相關的優秀項目,我們的企業架構中用到了很多的優秀項目,說白了,也是站在巨人的肩膀上去整合的。在學習Spring Cl