1. 程式人生 > >Spring-Boot + Atomikos 實現跨庫的分散式事務管理

Spring-Boot + Atomikos 實現跨庫的分散式事務管理

一、何為事務

定義:事務是指多個操作單元組成的合集,多個單元操作是整體不可分割的,要麼都操作成功,要麼都不成功。

其必須遵循的四個原則(ACID):

原子性(Atomicity -- 美 [ˌætəˈmɪsɪti]):事務是不可分割的最小單位,事務內的操作要麼全成功(commit),要麼一個失敗全失敗(rollback)

一致性(Consistency --  美 [kənˈsɪstənsi]):在事務執行前,資料庫的資料處於正確的狀態,而事務執行完成後,資料庫的資料還應該是處於正確的狀態,即資料完整性約束沒有被破壞; 比如A向B轉了10元錢,涉及的操作有,B+10,A-10,在轉錢這個操作(兩個action)最終成功的進行事務的提交後,必須保證A的賬戶金額確實-10,而不是B+10拿到錢後,就不管A-10了(萬一A沒扣錢,豈不是賺了一筆),否則資料的完整性無法達到兩邊甚至N邊一致。


隔離性(Isolation -- 美 [ˌaɪsəˈleʃən]併發事務執行之間互不影響,在一個事務內部的操作對其他事務是不會產生影響的,這需要事務隔離級別來指定隔離性;

       |

       |

      V

    --   五大隔離級別(不算預設的話,就是四大):

① Isolation.DEFAULT使用資料庫設定的隔離級別 ( 預設 ) ,由 DBA 預設的設定來決定隔離級別 .

② Isolation.READ_UNCOMMITTED這是事務最低的隔離級別,它充許別外一個事務可以拿到這個事務未提交的資料。這種隔離級別會產生髒讀,不可重複讀和幻讀

③ Isolation.READ_COMMITTED

保證一個事務修改的資料提交後才能被另外一個事務讀取。另外一個事務不能讀取該事務未提交的資料。這種事務隔離級別可以避免髒讀出現,但是可能會出現不可重複讀和幻讀

④ Isolation.REPEATABLE_READ 這種事務隔離級別可以防止髒讀,不可重複讀。但是可能出現幻讀。它除了保證一個事務不能讀取另一個事務未提交的資料外,還保證了避免下面的情況產生(不可重複讀)。

⑤ Isolation.SERIALIZABLE :級別最高,花費的代價最高,但卻是最可靠的事務隔離級別。事務被處理為順序執行。除了防止髒讀,不可重複讀外,還避免了幻讀。

       |

       |

      V

髒讀: 一個事務讀取了另一個事務未提交的資料     ---> 危險、可怕

A 向 B 轉100元錢 ==   B+100 and A-100  兩個操作

              如果在上述的操作中,B對應的事務還未提交(此時資料已經update進去了),假設另一端B正在查詢自己的賬戶,會發現:"咦,昨天卡里就100,怎麼今天突然成了200",還沒等B高興過來,A那邊的ATM機壞掉了,於是所有事務回滾,回到起點,假設B這時候又查了一次(懷疑中),會發現:"咦,奇了怪了,剛才200,怎麼現在....." ,再來個大膽的假設,假設A轉賬的過程非常慢,而B查到了自己賬戶多出100元,於是乎就興沖沖的取出來200,就去嗨皮了;結果A那邊在漫長的等待後,ATM機子還是壞掉了,所有事務回滾,完了,那100塊錢怎麼處理?

不可重複讀:一個事務讀取表中的某條資料,多次讀取發現結果不一樣 (讀取了另一個事務已提交的資料)

                假設A的賬戶裡面有100元,A的朋友B想找A借錢,於是就去銀行櫃檯想通過工作人員查詢A的賬戶裡有多少錢,在工作人員多次查詢的情況下(假設這在一個事務內),巧的是,在工作人員第一次查詢的時候,A賬戶金額還是顯示的100,這個時候,另一端的A在ATM機上成功的給自己的賬戶裡面存了100元,於是乎,工作人員第二次查詢的時候,電腦螢幕上顯示的A的賬戶金額為200元,這個時候,工作人員在沒有確認A是通過ATM機給自己存了100元的前提下,是無法確認這兩次查詢到底哪一次是正確的,也有可能第一次查詢系統出錯了,也有可能是第二次查詢的時候系統出錯了,哈哈,說到這,我感覺很有意思了,總之,不可重複讀區別於髒讀,髒讀讀的是未提交的資料,而這個讀的是提交的資料。

幻讀:一個事務在插入資料時先檢測到記錄不存在,於是乎準備進行插入,這時候卻驚奇的發現剛才檢測的不存在的記錄居然存在了,這時候第一個事務肯定插不進去了,我們猜測一種情況就是主鍵衝突,怎麼回事呢?原因就在於,事務在插入的時候,另一個事務已經將資料更新,造成了前一個事務有一種見了鬼的感覺。

永續性(Durability -- 美 [ˌdjʊrəˈbɪlətɪ]):事務一旦執行成功,它對資料庫的資料的改變必須是永久的,不會因比如遇到系統故障或斷電造成資料不一致或丟失。

二、事務分類

1. 資料庫事務分為   -- 本地事務   -- 全域性事務

       本地事務:普通事務,獨立一個數據庫(Connection),能保證在該資料庫上操作的ACID

       全域性事務(分散式事務):涉及兩個或多個數據庫源的事務,即跨越多臺同類或異類資料庫的事務(由每臺數據庫的本地事務組成的),分散式事務旨在保證這些本地事務的所有操作的ACID,使事務可以跨越多臺資料庫;

2. Java事務型別分為 -- JDBC事務 跟 -- JTA事務

       JDBC事務:即為上面說的資料庫事務中的本地事務,通過connection物件控制管理

       JTA(Java Transaction API)事務:Java事務API,是Java EE資料庫事務規範, JTA只提供了事物管理介面,由應用程式伺服器廠商(如WebSphere Application Server)提供實現,JTA事務比JDBC更強大,支援分散式事務

3. 程式設計式事務和宣告式事務

       程式設計式事務:通過程式碼在業務執行時根據需要自行實現事務的commit和rollback,粒度更小,可作用在程式碼塊上,缺點:不可複用,重複的程式碼太多

       宣告式事務:繁瑣的有XML配置,簡單粗暴的直接使用@Transactional註解實現

三、什麼是Atomikos(以下摘自搜狗百科)

全稱:Atomikos TransactionsEssentials 是一個為Java平臺提供增值服務的並且開源類事務管理器,以下是包括在這個開源版本中的一些功能:

l 全面崩潰 / 重啟恢復

l 相容標準的SUN公司JTA API

l 巢狀事務

l 為XA和非XA提供內建的JDBC介面卡

註釋:XA:XA協議由Tuxedo首先提出的,並交給X/Open組織,作為資源管理器(資料庫)與事務管理器的介面標準。目前,Oracle、Informix、DB2、Sybase、MySql、免費開源的Postgresql等各大資料庫廠家都提供對XA的支援。XA協議採用兩階段提交方式來管理分散式事務。XA介面提供資源管理器與事務管理器之間進行通訊的標準介面。XA協議包括兩套函式,以xa_開頭的及以ax_開頭的。

有人說 XA 是 eXtended Architecture擴充體系結構 的縮寫, 其實我覺得這僅僅是一種巧合. eXtended Architecture 是一種CD ROM的驅動架構.

以下的函式使事務管理器可以對資源管理器進行的操作:

1)xa_open,xa_close:建立和關閉與資源管理器的連線。

2)xa_start,xa_end:開始和結束一個本地事務。

3)xa_prepare,xa_commit,xa_rollback:預提交、提交和回滾一個本地事務。

4)xa_recover:回滾一個已進行預提交的事務。

5)ax_開頭的函式使資源管理器可以動態地在事務管理器中進行註冊,並可以對XID(TRANSACTION IDS)進行操作。

6)ax_reg,ax_unreg;允許一個資源管理器在一個TMS(TRANSACTION MANAGER SERVER)中動態註冊或撤消註冊。


mysql資料庫驅動實現 XADataSource介面


postgresql資料庫驅動實現 XADataSource介面


四、Spring-Boot+Atomikos+MySql實現多庫的分散式事務管理

(1)專案目錄結構圖


(2)Pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.appleyk</groupId>
	<artifactId>spring-boot-atomikos</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>war</packaging>
	<name>atomikos</name>
	<description>跨庫的分散式事務統一管理</description>
	<!-- 繼承官網最新父POM【假設當前專案不再繼承其他POM】 -->
	<!-- http://projects.spring.io/spring-boot/#quick-start -->
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>1.5.9.RELEASE</version>
	</parent>

	<!-- 使用Java8,嘗試使用新特新【stream和lambda】 -->
	<properties>
		<java.version>1.8</java.version>
	</properties>

	<!-- Starter POMs是可以包含到應用中的一個方便的依賴關係描述符集合 -->
	<!-- 該Starters包含很多你搭建專案, 快速執行所需的依賴, 並提供一致的, 管理的傳遞依賴集。 -->
	<!-- 大多數的web應用都使用spring-boot-starter-web模組進行快速搭建和執行。 -->
	<!-- spring-boot-starter-web -->
	<!-- 對全棧web開發的支援, 包括Tomcat和 spring-webmvc -->
	<dependencies>


		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<!-- 新增Mybatis、Spring-Mybatis依賴 -->
		<!-- mybatis-spring-boot-starter繼承樹那是相當全面 -->
		<dependency>
			<groupId>org.mybatis.spring.boot</groupId>
			<artifactId>mybatis-spring-boot-starter</artifactId>
			<version>1.1.1</version>
		</dependency>

		<!-- MySql驅動依賴 -->
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.postgresql/postgresql -->
		<!-- PostGresQl驅動依賴 -->
		<dependency>
			<groupId>org.postgresql</groupId>
			<artifactId>postgresql</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-jta-atomikos</artifactId>
		</dependency>

		<!-- 新增熱部署 devtools:監聽檔案變動 -->
		<!-- 當Java檔案改動時,Spring-boo會快速重新啟動 -->
		<!-- 最簡單的測試,就是隨便找一個檔案Ctrl+S一下,就可以看到效果 -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-devtools</artifactId>
			<!-- optional=true,依賴不會傳遞 -->
			<!-- 本專案依賴devtools;若依賴本專案的其他專案想要使用devtools,需要重新引入 -->
			<optional>true</optional>
		</dependency>

		<!-- Spring 單元測試 -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>

		<!-- https://mvnrepository.com/artifact/tk.mybatis/mapper-spring-boot-starter -->
		<!-- mybatis通用mapper -->
		<dependency>
			<groupId>tk.mybatis</groupId>
			<artifactId>mapper-spring-boot-starter</artifactId>
			<version>1.1.5</version>
		</dependency>

	</dependencies>
</project>

(3)application.properties配置多資料

server.port=8088
server.session.timeout=10
server.tomcat.uri-encoding=utf8

#主資料來源 -- Master 
mysql.datasource.master.url = jdbc\:mysql\://localhost\:3306/master?useUnicode\=true&characterEncoding\=utf-8
mysql.datasource.master.username = root
mysql.datasource.master.password = root
  
mysql.datasource.master.minPoolSize = 3  
mysql.datasource.master.maxPoolSize = 25  
mysql.datasource.master.maxLifetime = 20000  
mysql.datasource.master.borrowConnectionTimeout = 30  
mysql.datasource.master.loginTimeout = 30  
mysql.datasource.master.maintenanceInterval = 60  
mysql.datasource.master.maxIdleTime = 60  
mysql.datasource.master.testQuery = select 1  
  
  
#從資料來源 -- Slave 
mysql.datasource.slave.url =jdbc\:mysql\://localhost\:3306/slave?useUnicode\=true&characterEncoding\=utf-8
mysql.datasource.slave.username =root
mysql.datasource.slave.password =root
  
mysql.datasource.slave.minPoolSize = 3  
mysql.datasource.slave.maxPoolSize = 25  
mysql.datasource.slave.maxLifetime = 20000  
mysql.datasource.slave.borrowConnectionTimeout = 30  
mysql.datasource.slave.loginTimeout = 30  
mysql.datasource.slave.maintenanceInterval = 60  
mysql.datasource.slave.maxIdleTime = 60  
mysql.datasource.slave.testQuery = select 1  



#在application.properties檔案中引入日誌配置檔案
#=====================================  log  =============================
logging.config=classpath:logback-boot.xml

(4)日誌檔案logback-boot.xml配置 (設定日誌級別為error,方便輸出檢視)

<configuration>    
    <!-- %m輸出的資訊,%p日誌級別,%t執行緒名,%d日期,%c類的全名,%i索引【從數字0開始遞增】,,, -->    
    <!-- appender是configuration的子節點,是負責寫日誌的元件。 -->
    <!-- ConsoleAppender:把日誌輸出到控制檯 -->
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">    
        <encoder>    
            <pattern>%d %p (%file:%line\)- %m%n</pattern>  
            <!-- 控制檯也要使用UTF-8,不要使用GBK,否則會中文亂碼 -->
            <charset>UTF-8</charset>   
        </encoder>    
    </appender>    
    <!-- RollingFileAppender:滾動記錄檔案,先將日誌記錄到指定檔案,當符合某個條件時,將日誌記錄到其他檔案 -->
    <!-- 以下的大概意思是:1.先按日期存日誌,日期變了,將前一天的日誌檔名重新命名為XXX%日期%索引,新的日誌仍然是sys.log -->
    <!--             2.如果日期沒有發生變化,但是當前日誌的檔案大小超過1KB時,對當前日誌進行分割 重新命名-->
    <appender name="syslog"    
        class="ch.qos.logback.core.rolling.RollingFileAppender">    
        <!-- <File>log/sys.log</File>  --> 
        <File>opt/spring-boot-web/logs/sys.log</File>
        <!-- rollingPolicy:當發生滾動時,決定 RollingFileAppender 的行為,涉及檔案移動和重新命名。 -->
        <!-- TimeBasedRollingPolicy: 最常用的滾動策略,它根據時間來制定滾動策略,既負責滾動也負責出發滾動 -->
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">    
            <!-- 活動檔案的名字會根據fileNamePattern的值,每隔一段時間改變一次 -->
            <!-- 檔名:log/sys.2017-12-05.0.log -->
            <fileNamePattern>log/sys.%d.%i.log</fileNamePattern> 
            <!-- 每產生一個日誌檔案,該日誌檔案的儲存期限為30天 --> 
            <maxHistory>30</maxHistory>   
            <timeBasedFileNamingAndTriggeringPolicy  class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">    
                <!-- maxFileSize:這是活動檔案的大小,預設值是10MB,本篇設定為1KB,只是為了演示 -->  
                <maxFileSize>10MB</maxFileSize>    
            </timeBasedFileNamingAndTriggeringPolicy>    
        </rollingPolicy>    
        <encoder>    
            <!-- pattern節點,用來設定日誌的輸入格式 -->
            <pattern>    
                %d %p (%file:%line\)- %m%n  
            </pattern>    
            <!-- 記錄日誌的編碼 -->
            <charset>UTF-8</charset> <!-- 此處設定字符集 -->   
        </encoder>    
    </appender>    
    
    <!-- 控制檯輸出日誌級別 -->
    <root  level="error">
          <appender-ref ref="STDOUT" />
     </root> 
    <!-- 指定專案中某個包,當有日誌操作行為時的日誌記錄級別 -->
    <!-- com.appley為根包,也就是隻要是發生在這個根包下面的所有日誌操作行為的許可權都是DEBUG -->
    <!-- 級別依次為【從高到低】:FATAL > ERROR > WARN > INFO > DEBUG > TRACE  --> 
     <logger name="com.appleyk" level="error">    
        <appender-ref ref="syslog" />    
    </logger> 
   
</configuration>  

(5)mysql資料庫

A. 結構 (資料庫引擎 InnoDB)


B. sql指令碼

master_a.sql

--
-- Table structure for table `a`
--

DROP TABLE IF EXISTS `a`;
CREATE TABLE `a` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(45) DEFAULT NULL,
  `sex` char(2) DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

slave_b.sql

--
-- Table structure for table `b`
--

DROP TABLE IF EXISTS `b`;
CREATE TABLE `b` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(45) DEFAULT NULL,
  `sex` char(2) DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

為了簡單,特意將從庫的b表設計的和主庫的a表一樣,不同的只是資料庫不一樣和表名不一樣

(6)載入主庫資料來源的屬性(引數)


MasterConfig.java

package com.appleyk.config;

import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix="mysql.datasource.master")
public class MasterConfig {
	
	private String url;
	private String username;
	private String password;

	private int minPoolSize;

	private int maxPoolSize;

	private int maxLifetime;

	private int borrowConnectionTimeout;

	private int loginTimeout;

	private int maintenanceInterval;

	private int maxIdleTime;

	private String testQuery;

	public String getUrl() {
		return url;
	}

	public void setUrl(String url) {
		this.url = url;
	}

	public String getUsername() {
		return username;
	}

	public void setUsername(String username) {
		this.username = username;
	}

	public String getPassword() {
		return password;
	}

	public void setPassword(String password) {
		this.password = password;
	}

	public int getMinPoolSize() {
		return minPoolSize;
	}

	public void setMinPoolSize(int minPoolSize) {
		this.minPoolSize = minPoolSize;
	}

	public int getMaxPoolSize() {
		return maxPoolSize;
	}

	public void setMaxPoolSize(int maxPoolSize) {
		this.maxPoolSize = maxPoolSize;
	}

	public int getMaxLifetime() {
		return maxLifetime;
	}

	public void setMaxLifetime(int maxLifetime) {
		this.maxLifetime = maxLifetime;
	}

	public int getBorrowConnectionTimeout() {
		return borrowConnectionTimeout;
	}

	public void setBorrowConnectionTimeout(int borrowConnectionTimeout) {
		this.borrowConnectionTimeout = borrowConnectionTimeout;
	}

	public int getLoginTimeout() {
		return loginTimeout;
	}

	public void setLoginTimeout(int loginTimeout) {
		this.loginTimeout = loginTimeout;
	}

	public int getMaintenanceInterval() {
		return maintenanceInterval;
	}

	public void setMaintenanceInterval(int maintenanceInterval) {
		this.maintenanceInterval = maintenanceInterval;
	}

	public int getMaxIdleTime() {
		return maxIdleTime;
	}

	public void setMaxIdleTime(int maxIdleTime) {
		this.maxIdleTime = maxIdleTime;
	}

	public String getTestQuery() {
		return testQuery;
	}

	public void setTestQuery(String testQuery) {
		this.testQuery = testQuery;
	}
}

(7)載入從庫資料來源的屬性(引數)


SlaveConfig.java

package com.appleyk.config;

import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix="mysql.datasource.slave")
public class SlaveConfig {

	private String url;
	private String username;
	private String password;

	private int minPoolSize;

	private int maxPoolSize;

	private int maxLifetime;

	private int borrowConnectionTimeout;

	private int loginTimeout;

	private int maintenanceInterval;

	private int maxIdleTime;

	private String testQuery;

	public String getUrl() {
		return url;
	}

	public void setUrl(String url) {
		this.url = url;
	}

	public String getUsername() {
		return username;
	}

	public void setUsername(String username) {
		this.username = username;
	}

	public String getPassword() {
		return password;
	}

	public void setPassword(String password) {
		this.password = password;
	}

	public int getMinPoolSize() {
		return minPoolSize;
	}

	public void setMinPoolSize(int minPoolSize) {
		this.minPoolSize = minPoolSize;
	}

	public int getMaxPoolSize() {
		return maxPoolSize;
	}

	public void setMaxPoolSize(int maxPoolSize) {
		this.maxPoolSize = maxPoolSize;
	}

	public int getMaxLifetime() {
		return maxLifetime;
	}

	public void setMaxLifetime(int maxLifetime) {
		this.maxLifetime = maxLifetime;
	}

	public int getBorrowConnectionTimeout() {
		return borrowConnectionTimeout;
	}

	public void setBorrowConnectionTimeout(int borrowConnectionTimeout) {
		this.borrowConnectionTimeout = borrowConnectionTimeout;
	}

	public int getLoginTimeout() {
		return loginTimeout;
	}

	public void setLoginTimeout(int loginTimeout) {
		this.loginTimeout = loginTimeout;
	}

	public int getMaintenanceInterval() {
		return maintenanceInterval;
	}

	public void setMaintenanceInterval(int maintenanceInterval) {
		this.maintenanceInterval = maintenanceInterval;
	}

	public int getMaxIdleTime() {
		return maxIdleTime;
	}

	public void setMaxIdleTime(int maxIdleTime) {
		this.maxIdleTime = maxIdleTime;
	}

	public String getTestQuery() {
		return testQuery;
	}

	public void setTestQuery(String testQuery) {
		this.testQuery = testQuery;
	}
}

(8)配置主資料來源


MasterDBSource.java

package com.appleyk.datasource;

import java.sql.SQLException;

import javax.sql.DataSource;

import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import com.appleyk.config.MasterConfig;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource;


/**
 * SqlSessionFactoryBuilder:build方法建立SqlSessionFactory例項。
 * SqlSessionFactory:建立SqlSession例項的工廠。
 * SqlSession:用於執行持久化操作的物件,類似於jdbc中的Connection。
 * SqlSessionTemplate:MyBatis提供的持久層訪問模板化的工具,執行緒安全,可通過構造引數或依賴注入SqlSessionFactory例項
 * 
 * 主庫的資料來源模板,應用在主庫所對應的Dao層上(掃描對應的mapper),實現主資料來源的指定+增刪改查
 * @author [email protected]
 * @blob   http://blog.csdn.net/appleyk
 * @date   2018年3月16日-下午1:08:53
 */
@Configuration   // ---> 標註此註解,Spring—Boot啟動時,會自動進行相應的主資料來源配置 -->注入Bean
@MapperScan(basePackages = "com.appleyk.mapper.master", sqlSessionTemplateRef = "masterSqlSessionTemplate")  
public class MasterDBSource {
	
	// 配置主資料來源
	@Primary
	@Bean(name = "MasterDB")
	public DataSource testDataSource(MasterConfig masterConfig) throws SQLException {
		
		/**
		 * MySql資料庫驅動 實現 XADataSource介面
		 */
		MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
		mysqlXaDataSource.setUrl(masterConfig.getUrl());
		mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
		mysqlXaDataSource.setPassword(masterConfig.getPassword());
		mysqlXaDataSource.setUser(masterConfig.getUsername());
		mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);

//		/**
//		 * Postgresql資料庫驅動 實現 XADataSource
//		 * 包 --> org.postgresql.xa.PGXADataSource;
//		 */
//		PGXADataSource pgxaDataSource = new PGXADataSource();
//		pgxaDataSource.setUrl(masterConfig.getUrl());
//		
		/**
		 * 設定分散式-- 主資料來源
		 */
		AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
		xaDataSource.setXaDataSource(mysqlXaDataSource);
		xaDataSource.setUniqueResourceName("MasterDB");

		xaDataSource.setMinPoolSize(masterConfig.getMinPoolSize());
		xaDataSource.setMaxPoolSize(masterConfig.getMaxPoolSize());
		xaDataSource.setMaxLifetime(masterConfig.getMaxLifetime());
		xaDataSource.setBorrowConnectionTimeout(masterConfig.getBorrowConnectionTimeout());
		xaDataSource.setLoginTimeout(masterConfig.getLoginTimeout());
		xaDataSource.setMaintenanceInterval(masterConfig.getMaintenanceInterval());
		xaDataSource.setMaxIdleTime(masterConfig.getMaxIdleTime());
		xaDataSource.setTestQuery(masterConfig.getTestQuery());
		
		System.err.println("主資料來源注入成功.....");
		return xaDataSource;
	}

	@Bean(name = "masterSqlSessionFactory")
	public SqlSessionFactory masterSqlSessionFactory(@Qualifier("MasterDB") DataSource dataSource) throws Exception {
		SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
		bean.setDataSource(dataSource);
		return bean.getObject();
	}

	@Bean(name = "masterSqlSessionTemplate")
	public SqlSessionTemplate masterSqlSessionTemplate(
			@Qualifier("masterSqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
		return new SqlSessionTemplate(sqlSessionFactory);
	}
}

(9)配置從資料來源 (同上,只是改了個名字)

SlaveDBSource.java

package com.appleyk.datasource;

import java.sql.SQLException;

import javax.sql.DataSource;

import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.appleyk.config.SlaveConfig;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource;

/**
 * SqlSessionFactoryBuilder:build方法建立SqlSessionFactory例項。
 * SqlSessionFactory:建立SqlSession例項的工廠。
 * SqlSession:用於執行持久化操作的物件,類似於jdbc中的Connection。
 * SqlSessionTemplate:MyBatis提供的持久層訪問模板化的工具,執行緒安全,可通過構造引數或依賴注入SqlSessionFactory例項
 * 
 * 從庫的資料來源模板,應用在從庫所對應的Dao層上(掃描對應的mapper),實現從資料來源的指定+增刪改查
 * 
 * @author [email protected]
 * @blob http://blog.csdn.net/appleyk
 * @date 2018年3月16日-下午1:08:53
 */
@Configuration // ---> 標註此註解,Spring—Boot啟動時,會自動進行相應的從資料來源配置 -->注入Bean
@MapperScan(basePackages = "com.appleyk.mapper.slave", sqlSessionTemplateRef = "slaveSqlSessionTemplate")
public class SlaveDBSource {

	// 配置從資料來源
	@Bean(name = "SlaveDB")
	public DataSource testDataSource(SlaveConfig slaveConfig) throws SQLException {
		MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
		mysqlXaDataSource.setUrl(slaveConfig.getUrl());
		mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
		mysqlXaDataSource.setPassword(slaveConfig.getPassword());
		mysqlXaDataSource.setUser(slaveConfig.getUsername());
		mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);

		/**
		 * 設定分散式 -- 從資料來源
		 */
		AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
		xaDataSource.setXaDataSource(mysqlXaDataSource);
		xaDataSource.setUniqueResourceName("SlaveDB");

		/**
		 * 連線池配置
		 */
		xaDataSource.setMinPoolSize(slaveConfig.getMinPoolSize());
		xaDataSource.setMaxPoolSize(slaveConfig.getMaxPoolSize());
		xaDataSource.setMaxLifetime(slaveConfig.getMaxLifetime());
		xaDataSource.setBorrowConnectionTimeout(slaveConfig.getBorrowConnectionTimeout());
		xaDataSource.setLoginTimeout(slaveConfig.getLoginTimeout());
		xaDataSource.setMaintenanceInterval(slaveConfig.getMaintenanceInterval());
		xaDataSource.setMaxIdleTime(slaveConfig.getMaxIdleTime());
		xaDataSource.setTestQuery(slaveConfig.getTestQuery());

		System.err.println("從資料來源注入成功.....");
		return xaDataSource;
	}

	@Bean(name = "slaveSqlSessionFactory")
	public SqlSessionFactory masterSqlSessionFactory(@Qualifier("SlaveDB") DataSource dataSource) throws Exception {
		SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
		bean.setDataSource(dataSource);
		return bean.getObject();
	}

	@Bean(name = "slaveSqlSessionTemplate")
	public SqlSessionTemplate slaveSqlSessionTemplate(
			@Qualifier("slaveSqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
		return new SqlSessionTemplate(sqlSessionFactory);
	}
}

看似沒有配置事務管理器,其實atomikos已經在暗處給我們提供了一個全域性性的分散式事務管理器,無需擔心,好吧

(10)Spring-Boot全域性啟動入口

Application.java

package com.appleyk;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;

import com.appleyk.config.MasterConfig;
import com.appleyk.config.SlaveConfig;

@SpringBootApplication // same as @Configuration @EnableAutoConfiguration  @ComponentScan
@EnableConfigurationProperties(value = { MasterConfig.class, SlaveConfig.class })
public class Application {
	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}
}

啟動Spring-Boot


(11)準備DAO層

利用mybatis的通用mapper,先為主庫master和從庫slave的表a和表b分別進行Java實體對映


A.java

package com.appleyk.entity;

import javax.persistence.Table;

@Table(name = "a")
public class A {

	private Integer id;
	private String name;
	private String sex;
	private Integer age;

	public A(){
		
	}
	
	public Integer getId() {
		return id;
	}

	public void setId(Integer id) {
		this.id = id;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public String getSex() {
		return sex;
	}

	public void setSex(String sex) {
		this.sex = sex;
	}

	public Integer getAge() {
		return age;
	}

	public void setAge(Integer age) {
		this.age = age;
	}

}

這種對映很簡單,就是和表a的欄位一一對應


B.java 的內容和 A.java的內容一模一樣,不同的是下面這個地方


利用mybatis的通用mapper,再為主庫master和從庫slave的表a和表b分別進行mapper介面的增刪改查實現


AMapper.java

package com.appleyk.mapper.master;

import com.appleyk.entity.A;

import tk.mybatis.mapper.common.Mapper;

public interface AMapepr extends Mapper<A>{

}

是的,你沒看錯,裡面一句增刪改查的程式碼都沒有,就是這麼通用好使,B的mapper和A的一樣,如下


Dao層佈置完,準備Service層,走業務邏輯

(12)準備Service層



分散式事務應用場景:

有一個數據,格式為json串,序列化後實則為一個物件,假設是A,現master庫需要儲存A,而slave庫由於業務需要也要儲存這個物件A(通過轉化A物件為B物件),於是乎,我們定義一個ObjectService,作為整個儲存操作的入口服務

ObjectService.java
package com.appleyk.service;

import com.appleyk.entity.A;

public interface ObjectService {

	boolean Save(A a) throws Exception;
}

master庫儲存A物件的介面為

AService.java

package com.appleyk.service;

import com.appleyk.entity.A;

public interface AService {
   boolean SaveA(A a);
}

其實現為

AServiceImpl.java

package com.appleyk.service.Impl;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Service;

import com.appleyk.entity.A;
import com.appleyk.mapper.master.AMapepr;
import com.appleyk.service.AService;

@Service
@Primary
public class AServiceImpl implements AService {

	@Autowired
	private AMapepr aMapper;

	@Override
	public boolean SaveA(A a) {

		return aMapper.insert(a) > 0;
	}

}

slave庫儲存B物件的介面為

BService.java

package com.appleyk.service;

import com.appleyk.entity.B;

public interface BService {
	boolean SaveB(B b) throws Exception;
}

其實現為

BServiceImpl.java

package com.appleyk.service.Impl;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Service;

import com.appleyk.entity.B;
import com.appleyk.mapper.slave.BMapepr;
import com.appleyk.service.BService;

@Service
@Primary
public class BServiceImpl implements BService {

	@Autowired
	private BMapepr bMapper;

	@Override
	public boolean SaveB(B b) throws Exception{

		int count = bMapper.insert(b);
		
		if(b.getName().length()>5){
			System.err.println("B事務回滾");
			throw new Exception("名稱超過5");
		}		
		System.err.println("B事務提交");
		return  count >0;
		
	}

}

放大招了,放大招了,我們看ObjectService的實現

ObjectServiceImpl.java

package com.appleyk.service.Impl;

import java.sql.SQLException;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import com.appleyk.entity.A;
import com.appleyk.entity.B;
import com.appleyk.service.AService;
import com.appleyk.service.BService;
import com.appleyk.service.ObjectService;

@Service
@Primary
public class ObjectServiceImpl implements ObjectService {

	@Autowired
	private AService aService;

	@Autowired
	private BService bService;

	@Override
	@Transactional(rollbackFor = { Exception.class, SQLException.class })
	public boolean Save(A a) throws Exception {

		if (!aService.SaveA(a)) {
			return false;
		}

		//int i = 1 / 0;

		B b = new B(a);

		try {
			if (!bService.SaveB(b)) {
				return false;
			}
		} catch (Exception e) {
			System.err.println("A事務回滾");
			throw new Exception("我的錯,儲存B異常");
		}

		System.err.println("A事務提交");
		return true;

	}

}


(13)Controller層對外提供Restful風格的API介面

ObjectController.java

package com.appleyk.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.appleyk.entity.A;
import com.appleyk.result.ResponseMessage;
import com.appleyk.result.ResponseResult;
import com.appleyk.service.ObjectService;

@RestController
@RequestMapping("/rest/v1.0.1/object")
public class ObjectController {

	@Autowired
	private ObjectService objService;

	@PostMapping("/save")
	public ResponseResult SaveObject(@RequestBody A a) throws Exception {

		if (objService.Save(a)) {
			return new ResponseResult(ResponseMessage.OK);
		}
		return new ResponseResult(ResponseMessage.INTERNAL_SERVER_ERROR);
	}
}

(14)測試前,看一眼mysql


主庫的a表資料集空


從庫的b表資料集也空


(15)利用Insomnia進行API測試

json資料

{
	"name": "appleyk",
	"sex": "F",
	"age":27
}

啟動專案



測試(異常的)


儲存物件的時候提示了異常,別慌,我們看一下後臺輸出的內容是不是按照我們假定的方式走的


mysql控制檯驗證一把


測試(正常的)  --  我們傳入name的時候,長度設定小點,比如 name = kobe


後臺輸出


mysql視覺化工具驗證走一把



掉個頭,我們來讓A物件儲存的時候,發生異常,而且丟擲的異常還是未做檢查的


測試(異常的)


後臺輸出


由於ArithmeticException繼承Exception(異常的基類),而我們又設定了

所以,整個分散式事務會進行回滾,A物件和B物件都將無法正確的進行儲存

mysql控制檯進行驗證


相關推薦

Spring-Boot + Atomikos 實現分散式事務管理

一、何為事務定義:事務是指多個操作單元組成的合集,多個單元操作是整體不可分割的,要麼都操作成功,要麼都不成功。其必須遵循的四個原則(ACID):原子性(Atomicity -- 美 [ˌætəˈmɪsɪti]):事務是不可分割的最小單位,事務內的操作要麼全成功(commit)

【十九】Spring Boot 之多資料來源和分散式事務(JTA、Atomikos、Druid、Mybatis)

1.事務開始 2.A資料來源insert 3.B資料來源insert 4.報錯 5.事務回滾(A、B資料來源都回滾) 專案目錄 pom.xml <?xml version="1.0" encoding="UTF-8"?> <projec

spring-boot+Redis實現簡單的分散式叢集session共享

  寫在前面:      首先宣告,筆者是一名Java程式設計屆的小學生。前面一直在幾家公司裡面做開發,其實都是一些傳統的專案,對於像分散式啦,叢集啦一些大型的專案接觸的很少,所以一直沒有自己整合和實現過。由於最近幾天專案不是很忙,自己又有點時間

Spring Boot 中使用 @Transactional 註解配置事務管理

all arc obj 資料 ror 科學 部分 直接 true 事務管理是應用系統開發中必不可少的一部分。Spring 為事務管理提供了豐富的功能支持。Spring 事務管理分為編程式和聲明式的兩種方式。編程式事務指的是通過編碼方式實現事務;聲明式事務基於 AOP,將具體

spring boot入門(四) springboot事務管理。最完整、簡單易懂、詳細的spring boot教程。

本文緊接spring boot入門(三)。 事務管理是對於一系列資料庫操作進行管理,一個事務包含一個或多個SQL語句,是邏輯管理的工作單元(原子單元)。通俗的講,事務管理是指的“一次操作”要不就全做,要不就全不做。例如,在一個訂單系統中,包括生成訂單,扣除商品存庫等操作,如

spring多資料來源的配置(分散式事務管理

<bean id="dataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init" destroy-method="close"> <property name="uniqueResource

Spring Boot 中使用 @Transactional 註解配置事務管理(轉載)

事務管理是應用系統開發中必不可少的一部分。Spring 為事務管理提供了豐富的功能支援。Spring 事務管理分為程式設計式和宣告式的兩種方式。程式設計式事務指的是通過編碼方式實現事務;宣告式事務基於 AOP,將具體業務邏輯與事務處理解耦。宣告式事務管理使業務程式碼邏輯不受汙

Spring Boot 2.x基礎教程:事務管理入門

## 什麼是事務? 我們在開發企業應用時,通常業務人員的一個操作實際上是對資料庫讀寫的多步操作的結合。由於資料操作在順序執行的過程中,任何一步操作都有可能發生異常,異常會導致後續操作無法完成,此時由於業務邏輯並未正確的完成,之前成功操作的資料並不可靠,如果要讓這個業務正確的執行下去,通常有實現方式: 1.

spring boot+mybatis+druid 多資料來源多分散式事務

廢話不多說,首先貼配置檔案,需要引入pomxml <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter&l

spring mybatis atomikos分散式事務demo

        最近有點時間 , 就準備搭個多庫事務的例子 , 不過中間碰到一些問題 , 這裡記錄下來 .         我的atomikos   版本是 3.7.0 ; Spring4 mybatis3 ;         碰到問題主要有兩類  :  

spring+mybatis+tkmapper+atomikos實現分布式事務(3)-動態切換數據源

springmvc mybatis tkmapper atomiks 動態數據源 本文介紹基於spring+mybatis+tkmapper+atomikos+jta實現分布式事務,由程序動態切換數據源,通過atomikos可實現分布式事務一致性。通過繼承MapperScannerConf

Spring+JTA+Atomikos+mybatis分散式事務管理

  背景描述:我們平時的工作中用到的Spring事務管理是管理一個數據源的。但是如果對多個數據源進行事務管理該怎麼辦呢?我們可以用JTA和Atomikos結合Spring來實現一個分散式事務管理的功能。 事務(官方解釋):是由一組sql語句組成的“邏輯處理單元”。 事務具有

Spring boot + Quartz實現分散式定時任務

在實際專案中經常會用到定時任務,且有些定時任務同時只能執行一個例項,下面介紹一下通過Spring boot + Quartz框架實現分散式定時任務。 1. 定時任務持久化到Mysql 2. 名稱為JobA的定時任務每10秒執行一次@ScheduledJo

Spring Boot 2實現分散式鎖——這才是實現分散式鎖的正確姿勢!

今年企業對Java開發的市場需求,你看懂了嗎? >>>   

利用consul在spring boot實現最簡單的分散式

因為在專案實際過程中所採用的是微服務架構,考慮到承載量基本每個相同業務的服務都是多節點部署,所以針對某些資源的訪問就不得不用到用到分散式鎖了。 這裡列舉一個最簡單的場景,假如有一個智慧售貨機,由於機器本身的原因不能同一臺機器不能同時出兩個商品,這就要求在在出貨流程前針對同一臺機器在同一時刻出現併發 建立訂

Spring Boot Redis 實現分散式鎖,真香!!

之前看很多人手寫分散式鎖,其實 Spring Boot 現在已經做的足夠好了,開箱即用,支援主流的 Redis、Zookeeper 中介軟體,另外還支援 JDBC。 本篇棧長以 Redis 為例(這也是用得最多的方案),教大家如何利用 Spring Boot 整合 Redis 實現快取,如何簡單、快速實現

Spring Boot 2.x基礎教程:使用JTA實現多資料來源的事務管理

在一個Spring Boot專案中,連線多個數據源還是比較常見的。之前也介紹瞭如何在幾種常用框架的場景下配置多資料來源,具體可見: - [Spring Boot 2.x基礎教程:JdbcTemplate的多資料來源配置](http://blog.didispace.com/spring-boot-learn

rabbitMq與spring boot搭配實現監聽

address app caching prefix 前段時間 ever 不用 理解 its   在我前面有一篇博客說到了rabbitMq實現與zk類似的watch功能,但是那一篇博客沒有代碼實例,後面自己補了一個demo,便於理解。demo中主要利用spring boot

spring boot實現響應圖片的方法以及改進

spring-bootController響應,噴出圖片,是一個很常見的功能,代碼如下@RequestMapping(value = { "/img/{filename:.+}" }, method = RequestMethod.GET, produces = { MediaType.I

Spring Boot實現logback多環境日誌配置

cati feature gprof 配置 color app config 現在 ng- 在Spring Boot中,可以在logback.xml中的springProfile標簽中定義多個環境logback.xml: <springProfile name=