1. 程式人生 > >分散式學習筆記十一:分散式事務模型DTP

分散式學習筆記十一:分散式事務模型DTP

一。 DTP簡介

   X/Open DTP(X/Open Distributed Transaction Processing Reference Model) 是X/Open 這個組織定義的一套分散式事務的標準,也就是了定義了規範和API介面,由廠商進行具體的實現

    X/Open DTP 定義了三個元件: AP,TM,RM
AP(Application Program):也就是應用程式,可以理解為使用DTP的程式
RM(Resource Manager):資源管理器,這裡可以理解為一個DBMS系統,或者訊息伺服器管理系統,應用程式通過資源管理器對資源進行控制。資源必須實現XA定義的介面
TM(Transaction Manager):事務管理器,負責協調和管理事務,提供給AP應用程式程式設計介面(TX協議)以及管理資源管理器
其中,AP 可以和TM 以及 RM 通訊,TM 和 RM 互相之間可以通訊,DTP模型裡面定義了XA介面,TM 和 RM 通過XA介面進行雙向通訊,例如:TM通知RM提交事務或者回滾事務,RM把提交結果通知給TM。AP和RM之間則通過RM提供的Native API 進行資源控制,這個沒有進行約API和規範,各個廠商自己實現自己的資源控制,比如Oracle自己的資料庫驅動程式。

其中在DTP定了以下幾個概念:


事務:一個事務是一個完整的工作單元,由多個獨立的計算任務組成,這多個任務在邏輯上是原子的。
全域性事務:對於一次性操作多個資源管理器的事務,就是全域性事務
分支事務:在全域性事務中,某一個資源管理器有自己獨立的任務,這些任務的集合作為這個資源管理器的分支任務
控制執行緒:用來表示一個工作執行緒,主要是關聯AP,TM,RM三者的一個執行緒,也就是事務上下文環境。簡單的說,就是需要標識一個全域性事務以及分支事務的關係。

分散式事務模型 使用二階段提交協議2PC(Two-phaseCommit) 實現多資料來源事務處理

準備階段
事務協調者(事務管理器)給每個參與者(資源管理器)傳送Prepare訊息,每個參與者要麼直接返回失敗(如許可權驗證失敗),要麼在本地執行事務,寫本地的redo和undo日誌,但不提交,到達一種“萬事俱備,只欠東風”的狀態。

提交階段
如果協調者收到了參與者的失敗訊息或者超時,直接給每個參與者傳送回滾(Rollback)訊息;否則,傳送提交(Commit)訊息;參與者根據協調者的指令執行提交或者回滾操作,釋放所有事務處理過程中使用的鎖資源。(注意:必須在最後階段釋放鎖資源)

出現部分資源失敗後的處理情況

二階段提交看起來確實能夠提供原子性的操作,但是不幸的事,二階段提交還是有幾個缺點的、

1、同步阻塞問題。執行過程中,所有參與節點都是事務阻塞型的。當參與者佔有公共資源時,其他第三方節點訪問公共資源不得不處於阻塞狀態。
2、單點故障。由於協調者的重要性,一旦協調者發生故障。參與者會一直阻塞下去。尤其在第二階段,協調者發生故障,那麼所有的參與者還都處於鎖定事務資源的狀態中,而無法繼續完成事務操作。(如果是協調者掛掉,可以重新選舉一個協調者,但是無法解決因為協調者宕機導致的參與者處於阻塞狀態的問題)
3、資料不一致。在二階段提交的階段二中,當協調者向參與者傳送commit請求之後,發生了局部網路異常或者在傳送commit請求過程中協調者發生了故障,這回導致只有一部分參與者接受到了commit請求。而在這部分參與者接到commit請求之後就會執行commit操作。但是其他部分未接到commit請求的機器則無法執行事務提交。於是整個分散式系統便出現了資料部一致性的現象。
4、二階段無法解決的問題:協調者再發出commit訊息之後宕機,而唯一接收到這條訊息的參與者同時也宕機了。那麼即使協調者通過選舉協議產生了新的協調者,這條事務的狀態也是不確定的,沒人知道事務是否被已經提交。


二。DTP程式設計模型JTA

  JTA(Java Transaction API)是符合X/Open DTP的一個程式設計模型,事務管理和資源管理器支架也是用了XA協議。

1》TA的構成
a、高層應用事務界定介面,供事務客戶界定事務邊界的
b、X/Open XA協議(資源之間的一種標準化的介面)的標準Java對映,它可以使事務性的資源管理器參與由外部事務管理器控制的事務中
c、高層事務管理器介面,允許應用程式伺服器為其管理的應用程式界定事務的邊界 

2》JTA的主要介面 
位於javax.transaction包中
a、UserTransaction介面:讓應用程式得以控制事務的開始、掛起、提交、回滾等。由Java客戶端程式或EJB呼叫。
b、TransactionManager 介面:用於應用伺服器管理事務狀態
c、Transaction介面:用於執行相關事務操作
d、XAResource介面:用於在分散式事務環境下,協調事務管理器和資源管理器的工作
e、Xid介面:為事務識別符號的Java對映
注:前3個介面位於Java EE版的類庫 javaee.jar 中,Java SE中沒有提供!UserTransaction是程式設計常用的介面
注意的是JTA只提供了介面,沒有具體的實現。
JTS(Java Transaction Service)是服務OTS的JTA的實現。簡單的說JTS實現了JTA介面,並且符合OTS的規範。
JTA的事務週期可橫跨多個JDBC Connection生命週期,對眾多Connection進行排程,實現其事務性要求。
JTA可以處理任何提供符合XA介面的資源。包括:JDBC連線,資料庫,JMS,商業物件等等。
3》JTA實現

JOTM(Java Open Transaction Manager)是ObjectWeb的一個開源JTA實現,它本身也是開源應用程式伺服器JOnAS(Java Open Application Server)的一部分,為其提供JTA分散式事務的功能。官網:http://jotm.objectweb.org

Atomikos 是一個為Java平臺提供增值服務的並且開源類事務管理器提供 以下是包括在這個開源版本中的一些功能:
1 全面崩潰 / 重啟恢復
2 相容標準的SUN公司JTA API
3 巢狀事務
4 為XA和非XA提供內建的JDBC介面卡
官網:https://www.atomikos.com/

三。Spring整合JTA

因為spring的
<tx:annotation-driven transaction-manager="springTransactionManager"/>  只能指定一個事務管理器 所以多資料來源下 肯定不能使用同一個事務管理器
必須使用jta事務

1》使用jtom配置jta實現
模擬環境 
  mysql 下存在表 mymoney 存在使用者zs 餘額 1000 建表sql

CREATE DATABASE IF NOT EXISTS dtp;
USE dtp;
CREATE TABLE mymoney(
  id INT PRIMARY KEY AUTO_INCREMENT,
  NAME VARCHAR(20),
  lostedmoney INT
);
INSERT INTO mymoney(id,NAME,lostedmoney) VALUES(1,'zs',1000);


  oracle下存在表mymoney 存在使用者ls 餘額300 建表sql


oracle下 scott賬號下 建立相同表
CREATE TABLE mymoney(
  id number PRIMARY KEY ,
  NAME VARCHAR2(20),
  lostedmoney number
);
INSERT INTO mymoney(id,NAME,lostedmoney) VALUES(2,'ls',300);


模擬zs給ls轉賬 自然涉及到多資料來源分散式事務
配置spring支援jtom的環境 (使用xapool管理資料來源)
maven依賴:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>cn.et</groupId>
    <artifactId>spring</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>4.3.13.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-tx</artifactId>
            <version>4.3.13.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aop</artifactId>
            <version>4.3.13.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jdbc</artifactId>
            <version>4.3.13.RELEASE</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/jotm/jotm -->
        <dependency>
            <groupId>jotm</groupId>
            <artifactId>jotm</artifactId>
            <version>2.0.10</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.ow2.jotm/jotm-core -->
        <dependency>
            <groupId>org.ow2.jotm</groupId>
            <artifactId>jotm-core</artifactId>
            <version>2.2.3</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.ow2.jotm/jotm-datasource -->
        <dependency>
            <groupId>org.ow2.jotm</groupId>
            <artifactId>jotm-datasource</artifactId>
            <version>2.2.3</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.ow2.jotm/jotm-standalone -->
        <dependency>
            <groupId>org.ow2.jotm</groupId>
            <artifactId>jotm-standalone</artifactId>
            <version>2.2.3</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/javax.resource/javax.resource-api -->
        <dependency>
            <groupId>javax.resource</groupId>
            <artifactId>javax.resource-api</artifactId>
            <version>1.7</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.experlog/xapool -->
        <dependency>
            <groupId>com.experlog</groupId>
            <artifactId>xapool</artifactId>
            <version>1.5.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.44</version>
        </dependency>
        <dependency>
            <groupId>com.jslsolucoes</groupId>
            <artifactId>ojdbc6</artifactId>
            <version>11.2.0.1.0</version>
        </dependency>
    </dependencies>
</project>


新增spring配置檔案 配置 兩個資料來源 並且將兩個支援xa的資料來源 繫結到一個資源管理器 UserTranscation中

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:aop="http://www.springframework.org/schema/aop"  
    xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:context="http://www.springframework.org/schema/context"  
    xsi:schemaLocation="    
        http://www.springframework.org/schema/beans     
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd    
        http://www.springframework.org/schema/tx     
        http://www.springframework.org/schema/tx/spring-tx-4.0.xsd    
        http://www.springframework.org/schema/aop  
        http://www.springframework.org/schema/aop/spring-aop-4.0.xsd    
        http://www.springframework.org/schema/context     
        http://www.springframework.org/schema/context/spring-context-4.0.xsd    
">  
  <context:component-scan base-package="cn"></context:component-scan>
  <bean id="txCurrent" class="org.objectweb.jotm.Current"></bean>
 
  <!-- 使用xapool配置支援xa的資源(mysql資料庫)連線池 -->
  <bean id="xaMysqlDataSource" class="org.enhydra.jdbc.pool.StandardXAPoolDataSource" destroy-method="shutdown">
     <property name="dataSource">
          <!-- 定義支援mysql的xa資料來源 -->
          <bean class="org.enhydra.jdbc.standard.StandardXADataSource" destroy-method="shutdown">
              <property name="driverName" value="com.mysql.jdbc.Driver"></property>
            <property name="url" value="jdbc:mysql://localhost:3306/dtp"></property>
            <!-- 注意使用者名稱密碼不能配在這裡 否則無法讀取 -->
            <!-- 註冊到txCurrent -->
            <property name="transactionManager" ref="txCurrent"></property>
          </bean>
           
     </property>
     <property name="user" value="root"></property>
     <property name="password" value="123456"></property>
  </bean>
   <bean id="mysqlJdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">  
        <property name="dataSource" ref="xaMysqlDataSource" />  
   </bean>  
   
   <!-- 使用xapool配置支援xa的資源(oracle資料庫)連線池 -->
   <bean id="xaOracleDataSource" class="org.enhydra.jdbc.pool.StandardXAPoolDataSource" destroy-method="shutdown">
     <property name="dataSource">
          <!-- 定義支援mysql的xa資料來源 -->
          <bean class="org.enhydra.jdbc.standard.StandardXADataSource" destroy-method="shutdown">
              <property name="driverName" value="oracle.jdbc.OracleDriver"></property>
            <property name="url" value="jdbc:oracle:thin:@localhost:1521:orcl"></property>
            <!-- 註冊到txCurrent -->
            <property name="transactionManager" ref="txCurrent"></property>
          </bean>
     </property>
     <property name="user" value="scott"></property>
     <property name="password" value="tiger"></property>
  </bean>
   <bean id="oracleJdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">  
        <property name="dataSource" ref="xaOracleDataSource" />  
   </bean>  
   
   
   <!-- 配置jta事務  userTransaction必須將所有的資料來源(RM)註冊到txCurrent中-->
   <bean id="springTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">  
        <property name="userTransaction" ref="txCurrent" />  
   </bean>  
   <tx:annotation-driven transaction-manager="springTransactionManager"/>  
</beans>


新增zs的資料處理類

package cn.et.dao;
 
import javax.annotation.Resource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
@Transactional
@Repository
public class UserZsDao {
    @Autowired
    private UserLsDao userLsDao;
    @Resource(name="mysqlJdbcTemplate")
    JdbcTemplate mysqlJdbcTemplate;
    public void zsMinus(int money){
        //zs的mysql扣款
        String sql="update MYMONEY set lostedmoney=lostedmoney-"+money+" where id=1";
        mysqlJdbcTemplate.execute(sql);
        //呼叫ls的oracle加錢
        userLsDao.lsAdd(money);
        //模擬出現異常 檢視資料兩個操作資料是否都回滾
        String a=null;
        a.toCharArray();
    }
}


處理ls的資料處理類

package cn.et.dao;
 
import javax.annotation.Resource;
 
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
@Transactional
@Repository
public class UserLsDao {
    
    @Resource(name="oracleJdbcTemplate")
    JdbcTemplate oracleJdbcTemplate;
    
    public void lsAdd(int money){
        String sql="update MYMONEY set lostedmoney=lostedmoney+"+money+" where id=2";
        oracleJdbcTemplate.execute(sql);
    }
}


新增測試類

package cn.et.dao;
 
import org.springframework.context.support.ClassPathXmlApplicationContext;
 
public class Test {
 
    public static void main(String[] args) {
        ClassPathXmlApplicationContext cpxac=new ClassPathXmlApplicationContext("spring.xml");
        UserZsDao userZsDao=(UserZsDao)cpxac.getBean("userZsDao");
        userZsDao.zsMinus(100);
        cpxac.close();
    }
 
}


執行後 發現出現異常正常回滾