1. 程式人生 > >SpringBoot使用jta+atomikos解決分散式事務

SpringBoot使用jta+atomikos解決分散式事務

jta:Java Transaction API,即是java中對事務處理的api 即 api即是介面的意思

atomikos:Atomikos TransactionsEssentials 是一個為Java平臺提供增值服務的並且開源類事務管理器

1、pom依賴

2、application.properties---->注意字首

# Mysql 1
mysql.datasource.test1.url = jdbc:mysql://192.168.25.11:3306/18-08-12-manyDatasource1?useUnicode=true&characterEncoding=utf-8
mysql.datasource.test1.username = root
mysql.datasource.test1.password = @sS19980713
mysql.datasource.test1.minPoolSize = 3
mysql.datasource.test1.maxPoolSize = 25
mysql.datasource.test1.maxLifetime = 20000
mysql.datasource.test1.borrowConnectionTimeout = 30
mysql.datasource.test1.loginTimeout = 30
mysql.datasource.test1.maintenanceInterval = 60
mysql.datasource.test1.maxIdleTime = 60

# Mysql 2
mysql.datasource.test2.url =jdbc:mysql://192.168.25.11:3306/18-08-12-manyDatasource2?useUnicode=true&characterEncoding=utf-8
mysql.datasource.test2.username =root
mysql.datasource.test2.password 
[email protected]
mysql.datasource.test2.minPoolSize = 3 mysql.datasource.test2.maxPoolSize = 25 mysql.datasource.test2.maxLifetime = 20000 mysql.datasource.test2.borrowConnectionTimeout = 30 mysql.datasource.test2.loginTimeout = 30 mysql.datasource.test2.maintenanceInterval = 60 mysql.datasource.test2.maxIdleTime = 60

3、書寫讀取配置類1和2

package czs.config;

import org.springframework.boot.context.properties.ConfigurationProperties;

import lombok.Data;

@Data
/**
 * 將application.properties配置檔案中配置自動封裝到實體類欄位中
 * @author Administrator
 */
@ConfigurationProperties(prefix = "mysql.datasource.test1") // 注意這個字首要和application.properties檔案的字首一樣
public class DBConfig1 {

	private String url;
	// 比如這個url在properties中是這樣子的mysql.datasource.test1.username = root
	private String username;
	private String password;
	private int minPoolSize;
	private int maxPoolSize;
	private int maxLifetime;
	private int borrowConnectionTimeout;
	private int loginTimeout;
	private int maintenanceInterval;
	private int maxIdleTime;
	private String testQuery;
}
package czs.config;

import org.springframework.boot.context.properties.ConfigurationProperties;

import lombok.Data;

@Data
/**
 * 將application.properties配置檔案中配置自動封裝到實體類欄位中
 * @author Administrator
 */
@ConfigurationProperties(prefix = "mysql.datasource.test2")// 注意這個字首要和application.properties檔案的字首一樣
public class DBConfig2 {

	private String url;
	// 比如這個url在properties中是這樣子的mysql.datasource.test1.username = root
	private String username;
	private String password;
	private int minPoolSize;
	private int maxPoolSize;
	private int maxLifetime;
	private int borrowConnectionTimeout;
	private int loginTimeout;
	private int maintenanceInterval;
	private int maxIdleTime;
	private String testQuery;
}

4、開啟掃描註冊上面的兩個配置檔案類

5、兩個資料來源管理Bean

package czs.datasource;

import java.sql.SQLException;
import javax.sql.DataSource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource;

import czs.config.DBConfig1;

@Configuration
// basePackages 最好分開配置 如果放在同一個資料夾可能會報錯
@MapperScan(basePackages = "czs.mapper1", sqlSessionTemplateRef = "testSqlSessionTemplate")
public class MyBatisConfig1 {

	// 配置資料來源
	@Bean(name = "testDataSource")
	public DataSource testDataSource(DBConfig1 testConfig) throws SQLException {
		MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
		mysqlXaDataSource.setUrl(testConfig.getUrl());
		mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
		mysqlXaDataSource.setPassword(testConfig.getPassword());
		mysqlXaDataSource.setUser(testConfig.getUsername());
		mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);

		// 將本地事務註冊到創 Atomikos全域性事務
		AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
		xaDataSource.setXaDataSource(mysqlXaDataSource);
		xaDataSource.setUniqueResourceName("testDataSource");

		xaDataSource.setMinPoolSize(testConfig.getMinPoolSize());
		xaDataSource.setMaxPoolSize(testConfig.getMaxPoolSize());
		xaDataSource.setMaxLifetime(testConfig.getMaxLifetime());
		xaDataSource.setBorrowConnectionTimeout(testConfig.getBorrowConnectionTimeout());
		xaDataSource.setLoginTimeout(testConfig.getLoginTimeout());
		xaDataSource.setMaintenanceInterval(testConfig.getMaintenanceInterval());
		xaDataSource.setMaxIdleTime(testConfig.getMaxIdleTime());
		xaDataSource.setTestQuery(testConfig.getTestQuery());
		return xaDataSource;
	}

	@Bean(name = "testSqlSessionFactory")
	public SqlSessionFactory testSqlSessionFactory(@Qualifier("testDataSource") DataSource dataSource)
			throws Exception {
		SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
		bean.setDataSource(dataSource);
		return bean.getObject();
	}

	@Bean(name = "testSqlSessionTemplate")
	public SqlSessionTemplate testSqlSessionTemplate(
			@Qualifier("testSqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
		return new SqlSessionTemplate(sqlSessionFactory);
	}
}
package czs.datasource;

import java.sql.SQLException;
import javax.sql.DataSource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource;

import czs.config.DBConfig2;

@Configuration
@MapperScan(basePackages = "czs.mapper2", sqlSessionTemplateRef = "test2SqlSessionTemplate")
public class MyBatisConfig2 {

	// 配置資料來源
	@Bean(name = "test2DataSource")
	public DataSource testDataSource(DBConfig2 testConfig) throws SQLException {
		MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
		mysqlXaDataSource.setUrl(testConfig.getUrl());
		mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
		mysqlXaDataSource.setPassword(testConfig.getPassword());
		mysqlXaDataSource.setUser(testConfig.getUsername());
		mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);

		AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
		xaDataSource.setXaDataSource(mysqlXaDataSource);
		xaDataSource.setUniqueResourceName("test2DataSource");

		xaDataSource.setMinPoolSize(testConfig.getMinPoolSize());
		xaDataSource.setMaxPoolSize(testConfig.getMaxPoolSize());
		xaDataSource.setMaxLifetime(testConfig.getMaxLifetime());
		xaDataSource.setBorrowConnectionTimeout(testConfig.getBorrowConnectionTimeout());
		xaDataSource.setLoginTimeout(testConfig.getLoginTimeout());
		xaDataSource.setMaintenanceInterval(testConfig.getMaintenanceInterval());
		xaDataSource.setMaxIdleTime(testConfig.getMaxIdleTime());
		xaDataSource.setTestQuery(testConfig.getTestQuery());
		return xaDataSource;
	}

	@Bean(name = "test2SqlSessionFactory")
	public SqlSessionFactory testSqlSessionFactory(@Qualifier("test2DataSource") DataSource dataSource)
			throws Exception {
		SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
		bean.setDataSource(dataSource);
		return bean.getObject();
	}

	@Bean(name = "test2SqlSessionTemplate")
	public SqlSessionTemplate testSqlSessionTemplate(
			@Qualifier("test2SqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
		return new SqlSessionTemplate(sqlSessionFactory);
	}
}

有兩個不同的地方要注意

           1、@MapperScan(basePackages = "czs.mapper2", sqlSessionTemplateRef = "test2SqlSessionTemplate")

                          basePackages掃描的mapper類包不要掃錯了

           2、testDataSource()的引數為第三步的DBConfig1和DBConfig2

6、Service

package czs.mapper1;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import czs.mapper2.UserMapper2;

/**
* @author czs
* @version 建立時間:2018年8月12日 下午7:28:07 
*/
@Service
public class ManyService1 {

	@Autowired
	private UserMapper1 userMapper1;

	@Autowired
	private UserMapper2 userMapper2;

	// 開啟事務,由於使用jta+atomikos解決分散式事務,所以此處不必再指定事務
	@Transactional
	public int insert(String name, Integer age) {
		int insert = userMapper1.insert(name, age);
		int i = 1 / age;// 賦值age為0故意引發事務
		return insert;
	}

	// 開啟事務,由於使用jta+atomikos解決分散式事務,所以此處不必再指定事務
	@Transactional
	public int insertDb1AndDb2(String name, Integer age) {
		int insert = userMapper1.insert(name, age);
		int insert2 = userMapper2.insert(name, age);
		int i = 1 / age;// 賦值age為0故意引發事務
		return insert + insert2;
	}

}

7、Controller

	@RequestMapping(value = "insertDb1AndDb2")
	public int insertDb1AndDb2(String name, Integer age) {
		return manyService1.insertDb1AndDb2(name, age);
	}

9、檢視資料庫,如果沒有一條資料插入,代表分散式事務管理成功。

相關推薦

SpringBoot使用jta+atomikos解決分散式事務

jta:Java Transaction API,即是java中對事務處理的api 即 api即是介面的意思 atomikos:Atomikos TransactionsEssentials 是一個為Java平臺提供增值服務的並且開源類事務管理器 1、pom依賴 2

分散式事務】使用atomikos+jta解決分散式事務問題

一、前言 分散式事務,這個問題困惑了小編很久,在3個月之前,就間斷性的研究分散式事務。從MQ方面,資料庫事務方面,jta方面。近期終於成功了,使用JTA解決了分散式事務問題。先寫一下心得,後面的二級提交也會在研究。 二、介紹 分散式事務 說到分散式事務,可以理解為,由於分散式而引起的事務不一致的問題。

Spring+JTA+Atomikos+mybatis分散式事務管理

  背景描述:我們平時的工作中用到的Spring事務管理是管理一個數據源的。但是如果對多個數據源進行事務管理該怎麼辦呢?我們可以用JTA和Atomikos結合Spring來實現一個分散式事務管理的功能。 事務(官方解釋):是由一組sql語句組成的“邏輯處理單元”。 事務具有

使用kafka訊息佇列解決分散式事務(可靠訊息最終一致性方案-本地訊息服務)

微服務框架Spring Cloud介紹 Part1: 使用事件和訊息佇列實現分散式事務 本文轉自:http://skaka.me/blog/2016/04/21/springcloud1/ 不同於單一架構應用(Monolith), 分散式環境下, 進行事務操作將變得困難,

# 訊息佇列解決分散式事務

訊息佇列解決分散式事務 本地訊息表:通常處於同一張資料表,通過事務觸發器就能實現,但無法解決兩張表處於不同的資料庫問題 begin transaction: update User set account = account - 100 where userId = 'A' i

搞懂分散式技術19:使用RocketMQ事務訊息解決分散式事務

說到分散式事務,就會談到那個經典的”賬號轉賬”問題:2個賬號,分佈處於2個不同的DB,或者說2個不同的子系統裡面,A要扣錢,B要加錢,如何保證原子性?一般的思路都是通過訊息中介軟體來實現“最終一致性”:A系統扣錢,然後發條訊息給中介軟體,B系統接收此訊息,進行加錢。但這裡面有個問題:A是先update DB,

分散式訊息佇列RocketMQ--事務訊息--解決分散式事務

說到分散式事務,就會談到那個經典的”賬號轉賬”問題:2個賬號,分佈處於2個不同的DB,或者說2個不同的子系統裡面,A要扣錢,B要加錢,如何保證原子性? 一般的思路都是通過訊息中介軟體來實現“最終一致性”:A系統扣錢,然後發條訊息給中介軟體,B系統接收此訊息,進行加錢。 但這裡面有個問題:A是先update D

藉助訊息佇列解決分散式事務

先介紹一下RabbitMQ的基本概念 核心概念 Queue:真正儲存資料的地方 Exchange接受請求,轉存資料 Bind:收到請求後儲存到哪裡 訊息生產者:傳送資料的應用 訊息消費者:取出資料處理的應用 Bind的幾種分發規則:Direct、Topic、Fanout Fano

利用mq的最終一致性,解決分散式事務

 我們的訂單系統和商品(庫存)是兩個系統,當我們下訂單後,我們就要去修改庫存。分散式系統要滿足cap定理。一致性,可靠性,可用性。我們沒法都滿足,只能滿足兩個。因為我們是電商專案。我們要滿足服務的高可用。所以我們滿足可靠性,和可用性。但是,我們又得滿足一致性,這個時候,這個矛

ActiveMQ解決分散式事務方案以及程式碼實現(一)

1.場景描述 可以設想一個比較常見的分散式事務場景,商品上架操作,該操作涉及到商品模組的Service服務中的上架操作,同時必須要滿足在solr中建立商品的索引方便前臺搜尋以及生成商品的靜態化頁面,在上架操作中傳送了一條訊息,訊息接收方搜尋工程以及靜態化工程分

解決分散式事務問題

利用.Net 2.0中提供的事務新特性大大方便了分散式事務的開發,但是卻需要系統MSDTC的支援。由於系統的設定問題等各種原因,總是會使事情變得不太順利。下面說明一下我解決這個問題的步驟,希望對大家有所幫助: 第一步: -------------------- win2

分散式訊息佇列RocketMQ--事務訊息--解決分散式事務的最佳實踐

說到分散式事務,就會談到那個經典的”賬號轉賬”問題:2個賬號,分佈處於2個不同的DB,或者說2個不同的子系統裡面,A要扣錢,B要加錢,如何保證原子性? 一般的思路都是通過訊息中介軟體來實現“最終一致性”:A系統扣錢,然後發條訊息給中介軟體,B系統接收此訊息,進行加錢。

分析redis訊息佇列和kafka來解決分散式事務場景

1、系統A(扣減托盤)【訊息生產者】 2、系統B(扣減押金)【訊息消費者】 業務描述: 兩套系統,A中扣減托盤,B中對應的要扣減押金;A中托盤歸還,B中押金返還 利用訊息佇列來解決分散式事務過程: 傳送方【生產者】:(不關心接收方狀態,只需要確定本地OK,訊息推送即可)

Spring Boot 整合 Seata 解決分散式事務問題

seata 簡介 Seata 是 阿里巴巴2019年開源的分散式事務解決方案,致力於在微服務架構下提供高效能和簡單易用的分散式事務服務。在 Seata 開源之前,Seata 對應的內部版本在阿里內部一直扮演著分散式一致性中介軟體的角色,幫助阿里度過歷年的雙11,對各業務進行了有力的支撐。經過多年沉澱與積累,2

分散式事務有兩種解決方式

1.優先使用非同步訊息。 上文已經說過,使用非同步訊息 Consumer 端需要實現冪等。冪等有兩種方式,一種方式是業務邏輯保證冪等。比如接到支付成功的訊息訂單狀態變成支付完成,如果當前狀態是支付完成,則再收到一個支付成功的訊息則說明訊息重複了,直接作為訊息成功處理。 另外一種方式如果業務邏輯無

分散式事務 解決資料一致性(一)事務原則與實現:事務、sql事務

事務: 定義:       是以一種可靠、一致的方式,訪問和操作資料庫中資料的程式單元。 原則:      *a、 原子性   * b、一致性  * c、隔離性 &nbs

[轉載]使用訊息佇列實現分散式事務-公認較為理想的分散式事務解決方案

前陣子從支付寶轉賬1萬塊錢到餘額寶,這是日常生活的一件普通小事,但作為網際網路研發人員的職業病,我就思考支付寶扣除1萬之後,如果系統掛掉怎麼辦,這時餘額寶賬戶並沒有增加1萬,資料就會出現不一致狀況了。 上述場景在各個型別的系統中都能找到相似影子,比如在電商系統中,當有使用者下單後,除了在訂單表插

分散式事務解決方案

分散式事務:分散式事務是指事務的參與者、支援事務的伺服器、資源伺服器以及事務管理器分別位於不同的分散式系統的不同節點之上。 先來理解幾個概念:事物具有四大特性ACID,分散式系統無法同時滿足CAP中的三種特性,所以我們一般使用最終一致性BASE。 ACID: 原子性(At

分散式事務原理及解決方案

1 引言 分散式事務是企業整合中的一個技術難點,也是每一個分散式系統架構中都會涉及到的一個東西,特別是在這幾年越來越火的微服務架構中,幾乎可以說是無法避免,本文就圍繞單機事務,分散式事務以及分散式事務的處理方式來展開。 2 事務 事務提供一種“要麼什麼都不做,要麼做全套(All or Nothing)”

分散式事務常見解決方案

分散式一致性協議 XA介面  XA是由X/Open組織提出的分散式事務的規範。XA規範主要定義了(全域性)事務管理器(Transaction Manager)和(區域性)資源管理器(Resource Manager)之間的介面。XA介面是雙向的系統介面,在事務管理器(Transaction Ma