1. 程式人生 > >Spring-Hibernate分庫事務實現

Spring-Hibernate分庫事務實現

    忙活了一個星期,看了N多Spring、Hibernate原始碼,查了N多資料,走了N多彎路,總算實現了分散式事務。回頭看看,實現這個功能對自己的提升確實是巨大的。

    進入正題,官網之前的選庫方案是在DataSource層面進行選庫的,繼承org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource來進行的選庫,用SpringAOP做切面,在進入DAO方法之前進行選庫(比如某個引數或者註解等)。事務用的是org.springframework.orm.hibernate3.HibernateTransactionManager。在一次操作單個數據庫的情況下是沒有問題的,但是在一次操作多個數據庫時,比如有A、B兩個庫裡面有一個相同的表,在A插入一條資料a,在B插入一條資料b,事務便會出問題,在提交的時候會全部提交到A庫去,究其原因,應該是在事務開啟時就打開了session,而一般開啟事務時是在service層,沒有進入DAO層。也就是說在開啟事務時根本沒有進行選庫操作,而是進入的預設的資料庫,導致事務提交時提交錯了資料庫。

    為了解決這個問題,在網上瀏覽了一番後,找到一個CobarClient的阿里出的分散式資料庫訪問層,找到其中的事務管理類檢視原始碼 com.alibaba.cobar.client.transaction.MultipleDataSourcesTransactionManager ,研究了一番後,按照自己的思路實現,發現根本沒有進入事務控制,資料直接就到資料庫了。發現事情沒有這麼簡單,CobarClient是根據DataSource進行的事務開啟和關閉操作,和官網環境不同,是用的Spring-Hibernate,事務管理也是Spring的 tx:annotation-driven 標籤,而且CobarClient幾年前就不更新了,還不知道用的是那個Spring-Hibernate版本,所以只能另尋解決辦法。

    在經歷過上次失敗後,突然想到官網用的最原始的org.springframework.orm.hibernate3.HibernateTransactionManager這個事務管理器是有效果的,只是最後commit的時候資料放錯庫了而已。那我寫個類繼承這個,重寫相關方法不就行了,然後我就根據CobarClient的思想進行改造,在doBegin方法中把配置的全部的資料庫連線開啟了事務(事實上這樣做很蠢,如果有100個配置,不就有100個事務了。。。),doCommit和doRollback方法迴圈呼叫的父類方法按照FILO規則來,然後又出問題了。因為SessionFactory是同一個,Spring會報一個同步錯誤,大意就是說你的session開啟了一個以上事務,便會報錯。

    這是才意識到,選庫和分散式事務是要相互配合的。

    於是進行了一次比較大的修改,拋棄org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource選庫方案,在初始化時進行SessionFactory初始化(為此又研究了Spring動態註冊,以及org.springframework.beans.factory.FactoryBean.getObject(),一路艱辛啊。。),在選庫時由選擇DataSource改為選擇SessionFactory,DAO層在呼叫時根據執行緒引數(java.lang.ThreadLocal)中的選庫結果呼叫SessionFactory。事務方面研究了org.springframework.transaction.support.AbstractPlatformTransactionManager的原始碼,重點在於getTransaction方法發現doGetTransaction返回的結果並不重要(返回型別是Object),在框架中並沒有其他使用它的地方,只有在必須實現的方法doBegin、doCommit、doRollback,以及掛起doSuspend,恢復掛起doResume方法isExistingTransaction是否已開啟事務方法,以及doCleanupAfterCompletion事務結束後的銷燬方法中用到。而這些方法都需要我們來實現,那麼就有了一個想法。

    在doGetTransaction方法中標識事務是否開啟:

    private ThreadLocal<Object> isTransaction = new ThreadLocal<Object>();   
  
    @Override
    protected Object doGetTransaction() throws TransactionException {
        Object o = new Object();
        isTransaction.set(o);
        return o;
    }

    事實上這裡返回的object沒有任何用處。而isTransaction只是標識事務開啟而已,如果get出來為空則是沒有開啟事務。

    在doBegin中將TransactionDefinition存起來(根據原始碼呼叫,這個東西從頭到尾都是一個)

 private ThreadLocal<TransactionDefinition> localTransactionDefinition = new ThreadLocal<TransactionDefinition>();
  @Override
  protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException {
      //將TransactionDefinition儲存起來
      try {
          localTransactionDefinition.set(ObjectUtil.<TransactionDefinition>deepClone(definition));
      } catch (IOException e) {
          e.printStackTrace();
      } catch (ClassNotFoundException e) {
          e.printStackTrace();
      }
  }

    重點在於選庫的時候新增事務(插一句 Assert 這個用來判斷引數挺不錯的):
    /**
     * DataSourceAspect中選庫時開啟事務操作,如之前(執行緒中)就以開啟事務,則不做操作
     * @param sessionFactory 選中的庫
     * @return 是否成功開啟事務
     * @throws Exception
     */
    public boolean chooseDataSource(SessionFactory sessionFactory) throws Exception {
        Assert.notNull(sessionFactory, "Empty SessionFactory!");
        //check sessionFactory is correct
        if (!checkSessionFactory(sessionFactory)) return false;
        //check transaction is start
        if (isTransaction.get() == null) return false;
        //check sessionFactory is started transaction
        if (isInTransaction(sessionFactory)) return false;
        List<HibernateTransactionObject> localObj = getLocalObj();
        HibernateTransactionObject txObject = getHibernateTransactionObject(sessionFactory);
        doBegin(txObject);
        localObj.add(txObject);
        setLocalObj(localObj);
        return true;
    }
  
    private HibernateTransactionObject getHibernateTransactionObject(SessionFactory sessionFactory) {
        Assert.notNull(sessionFactory, "Empty SessionFactory!");
        HibernateTransactionObject txObject = new HibernateTransactionObject();
        txObject.setSavepointAllowed(isNestedTransactionAllowed());
        txObject.setSessionFactory(sessionFactory);
        SessionHolder sessionHolder =
                (SessionHolder) TransactionSynchronizationManager.getResource(sessionFactory);
        if (sessionHolder != null) {
            if (logger.isDebugEnabled()) {
                logger.debug("Found thread-bound Session [" +
                        SessionFactoryUtils.toString(sessionHolder.getSession()) + "] for Hibernate transaction");
            }
            txObject.setSessionHolder(sessionHolder);
        }
        DataSource dataSource = SessionFactoryUtils.getDataSource(sessionFactory);
        if (dataSource != null) {
            txObject.setDataSource(dataSource);
            ConnectionHolder conHolder = (ConnectionHolder)
                    TransactionSynchronizationManager.getResource(dataSource);
            txObject.setConnectionHolder(conHolder);
        }
        return txObject;
    }
 
    /**
     * 開啟一個事務
     * @param txObject
     * @throws Exception
     */
    private void doBegin(HibernateTransactionObject txObject) throws Exception {
        TransactionDefinition definition = localTransactionDefinition.get();
        if (definition == null) throw new Exception("Transaction is not begin!");
        if (txObject.hasConnectionHolder() && !txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
            throw new IllegalTransactionStateException(
                    "Pre-bound JDBC Connection found! HibernateTransactionManager does not support " +
                            "running within DataSourceTransactionManager if told to manage the DataSource itself. " +
                            "It is recommended to use a single HibernateTransactionManager for all transactions " +
                            "on a single DataSource, no matter whether Hibernate or JDBC access.");
        }
        Session session = null;
        try {
            if (txObject.getSessionHolder() == null || txObject.getSessionHolder().isSynchronizedWithTransaction()) {
                Session newSession = txObject.getSessionFactory().openSession();
                if (logger.isDebugEnabled()) {
                    logger.debug("Opened new Session [" + SessionFactoryUtils.toString(newSession) +
                            "] for Hibernate transaction");
                }
                txObject.setSession(newSession);
            }
            session = txObject.getSessionHolder().getSession();
            if (isSameConnectionForEntireSession(session)) {
                // We're allowed to change the transaction settings of the JDBC Connection.
                if (logger.isDebugEnabled()) {
                    logger.debug(
                            "Preparing JDBC Connection of Hibernate Session [" + SessionFactoryUtils.toString(session) + "]");
                }
                Connection con = session.connection();
                Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
                txObject.setPreviousIsolationLevel(previousIsolationLevel);
            }
            else {
                // Not allowed to change the transaction settings of the JDBC Connection.
                if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
                    // We should set a specific isolation level but are not allowed to...
                    throw new InvalidIsolationLevelException(
                            "HibernateTransactionManager is not allowed to support custom isolation levels: " +
                                    "make sure that its 'prepareConnection' flag is on (the default) and that the " +
                                    "Hibernate connection release mode is set to 'on_close' (SpringTransactionFactory's default). " +
                                    "Make sure that your LocalSessionFactoryBean actually uses SpringTransactionFactory: Your " +
                                    "Hibernate properties should *not* include a 'hibernate.transaction.factory_class' property!");
                }
                if (logger.isDebugEnabled()) {
                    logger.debug(
                            "Not preparing JDBC Connection of Hibernate Session [" + SessionFactoryUtils.toString(session) + "]");
                }
            }
            if (definition.isReadOnly() && txObject.isNewSession()) {
                // Just set to MANUAL in case of a new Session for this transaction.
                session.setFlushMode(FlushMode.MANUAL);
            }
            if (!definition.isReadOnly() && !txObject.isNewSession()) {
                // We need AUTO or COMMIT for a non-read-only transaction.
                FlushMode flushMode = session.getFlushMode();
                if (flushMode.lessThan(FlushMode.COMMIT)) {
                    session.setFlushMode(FlushMode.AUTO);
                    txObject.getSessionHolder().setPreviousFlushMode(flushMode);
                }
            }
            Transaction hibTx;
            // Register transaction timeout.
            int timeout = determineTimeout(definition);
            if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                // Use Hibernate's own transaction timeout mechanism on Hibernate 3.1+
                // Applies to all statements, also to inserts, updates and deletes!
                hibTx = session.getTransaction();
                hibTx.setTimeout(timeout);
                hibTx.begin();
            }
            else {
                // Open a plain Hibernate transaction without specified timeout.
                hibTx = session.beginTransaction();
            }
            // Add the Hibernate transaction to the session holder.
            txObject.getSessionHolder().setTransaction(hibTx);
            // Register the Hibernate Session's JDBC Connection for the DataSource, if set.
            if (txObject.getDataSource() != null) {
                Connection con = session.connection();
                ConnectionHolder conHolder = new ConnectionHolder(con);
                if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                    conHolder.setTimeoutInSeconds(timeout);
                }
                if (logger.isDebugEnabled()) {
                    logger.debug("Exposing Hibernate transaction as JDBC transaction [" + con + "]");
                }
                TransactionSynchronizationManager.bindResource(txObject.getDataSource(), conHolder);
                txObject.setConnectionHolder(conHolder);
            }
            // Bind the session holder to the thread.
            if (txObject.isNewSessionHolder()) {
                TransactionSynchronizationManager.bindResource(txObject.getSessionFactory(), txObject.getSessionHolder());
            }
            txObject.getSessionHolder().setSynchronizedWithTransaction(true);
        }
        catch (Exception ex) {
            if (txObject.isNewSession()) {
                try {
                    if (session.getTransaction().isActive()) {
                        session.getTransaction().rollback();
                    }
                }
                catch (Throwable ex2) {
                    logger.debug("Could not rollback Session after failed transaction begin", ex);
                }
                finally {
                    SessionFactoryUtils.closeSession(session);
                }
            }
            throw new CannotCreateTransactionException("Could not open Hibernate Session for transaction", ex);
        }
    }
 
    /**
     * Hibernate transaction object, representing a SessionHolder.
     * Used as transaction object by HibernateTransactionManager.
     */
    public class HibernateTransactionObject extends JdbcTransactionObjectSupport {
        private SessionHolder sessionHolder;
        private boolean newSessionHolder;
        private boolean newSession;
        private DataSource dataSource;
        private SessionFactory sessionFactory;
        public void setSession(Session session) {
            this.sessionHolder = new SessionHolder(session);
            this.newSessionHolder = true;
            this.newSession = true;
        }
        public void setExistingSession(Session session) {
            this.sessionHolder = new SessionHolder(session);
            this.newSessionHolder = true;
            this.newSession = false;
        }
        public void setSessionHolder(SessionHolder sessionHolder) {
            this.sessionHolder = sessionHolder;
            this.newSessionHolder = false;
            this.newSession = false;
        }
        public SessionHolder getSessionHolder() {
            return this.sessionHolder;
        }
        public boolean isNewSessionHolder() {
            return this.newSessionHolder;
        }
        public boolean isNewSession() {
            return this.newSession;
        }
        public DataSource getDataSource() {
            return dataSource;
        }
        public void setDataSource(DataSource dataSource) {
            this.dataSource = dataSource;
        }
        public SessionFactory getSessionFactory() {
            return sessionFactory;
        }
        public void setSessionFactory(SessionFactory sessionFactory) {
            this.sessionFactory = sessionFactory;
        }
        public boolean hasSpringManagedTransaction() {
            return (this.sessionHolder != null && this.sessionHolder.getTransaction() != null);
        }
        public boolean hasHibernateManagedTransaction() {
            return (this.sessionHolder != null && this.sessionHolder.getSession().getTransaction().isActive());
        }
        public void setRollbackOnly() {
            this.sessionHolder.setRollbackOnly();
            if (hasConnectionHolder()) {
                getConnectionHolder().setRollbackOnly();
            }
        }
        public boolean isRollbackOnly() {
            return this.sessionHolder.isRollbackOnly() ||
                    (hasConnectionHolder() && getConnectionHolder().isRollbackOnly());
        }
        @Override
        public void flush() {
            try {
                this.sessionHolder.getSession().flush();
            }
            catch (HibernateException ex) {
                throw SessionFactoryUtils.convertHibernateAccessException(ex);
            }
        }
    }


    好吧,除了chooseDataSource方法之外  基本copy的org.springframework.orm.hibernate3.HibernateTransactionManager。HibernateTransactionObject 中添加了SessionFactory和DataSource兩個屬性。

至於doCommit和doRollback基本就是按照FILO迴圈操作了

    完成程式碼後試驗,發現成功了。成功提交回滾,不會提交到一個庫裡面了。事務掛起沒有實驗成功,因為並沒有進掛起方法。原因暫時不明,需要繼續查資料。


相關推薦

Spring-Hibernate分庫事務實現

    忙活了一個星期,看了N多Spring、Hibernate原始碼,查了N多資料,走了N多彎路,總算實現了分散式事務。回頭看看,實現這個功能對自己的提升確實是巨大的。     進入正題,官網之前的選庫方案是在DataSource層面進行選庫的,繼承org.sprin

atitit.spring hibernate事務機制 spring不能保存對象的解決

pda 程序 oca roman 配置 轉載 post 本地事務 對象 atitit.spring hibernate的事務機制 spring不能保存對象的解決 sessionFactory.openSession() 不能。。log黑頭馬sql語言..

Struts2+Spring+Hibernate+Jbpm技術實現Oa(Office Automation)辦公系統第一天框架搭建

chan gdi ssi 4.5 war javassist can eve 配置文件 =============編碼規範,所有文健,所有頁面,所有數據庫的數據表都采用UTF-8編碼格式,避免亂碼;===========開發環境:jdk1.7+tomcat8.0+mys

spring系列】之14:spring宣告式事務實現原理剖析

通過上一節事務環境搭建,我們知道,在搭建的5個步驟中,有兩個是spring為我們提供底層去稍作配置,然後使用的, 這兩個操作涉及的便是: @EnableTransactionManagement PlatformTransactionManager 其中,Platfor

spring-hibernate整合 事務不起作用

當spring和hibernate 整合後,事務不起作用 解決辦法:通過Spring的SessionFactory的getCurrentSession的方法建立Session 一、首先說一下hibernate中建立用來連線資料庫的Session,有兩種方式。 1.通過Sp

用Maven整合SpringMVC+Spring+Hibernate 框架,實現簡單的插入資料庫資料功能

一、搭建開始前的準備 1、我用的MyEclipse2014版,大家也可以用IDEA。 2、下載Tomcat(免安裝解壓包)、MySQL(zip包下載地址 免安裝解壓包,好處就是雙擊啟動,最後我會把bat的啟動發給大家)、用的Navicat for MySQL的MySQL的圖

Spring + Atomikos 分散式事務實現方式

             前段時間發現對分散式事務瞭解的不夠清晰,最近又重新看了一下分散式事務,簡單做個記錄,以後方便檢視 Java規範對分散式事務定義了標準的規範Java事務API和Java事務服務,分別是JTA和JTS 一個分散式事務必須包括一個事務管理器和多個資源管

關於spring hibernate事務管理

期望結果 進service包的類事務開始,出service事務結束,如果在進行中丟擲任何BusinessException,所有修改的資料回滾 <bean id="transactionManager"class="${hibernate.transactionMa

Spring的JDBC事務實現

之前專案中有大量資料提交的需求,考慮了幾個解決方案後還是覺得使用事務提交效率更高、資料插入也更方便。 一、首先,讓我們來看看什麼是事務 事務(Transaction)是併發控制的單元,是使用者定義的一個操作序列。這些操作要麼都做,要麼都不做,是一個不可分割的工作單位。通過事

用Maven整合SpringMVC+Spring+Hibernate 框架,實現簡單的插入資料庫資料功能(二)

前一篇寫的有些多,大家先看前一篇,傳送門 具體的資源已將上傳到資源了。 上文我們直接搭建前的準備和資源配置都寫好了,下面進入具體程式碼編寫。承接上文的小3 3、我習慣建立介面,這樣對整個專案感覺更合理。 (1.)建立IBaseService(業務邏輯層,有的習慣寫成Ba

Spring程式設計式事務實現

程式設計式事務概述        所謂程式設計式事務指的是通過編碼方式實現事務,即類似於JDBC程式設計實現事務管理。        Spring框架提供一致的事務抽象,因此對於JDBC還是JTA事務都是採用相同的API進行程式設計。 java程式碼

事務spring+hibernate實現事務回滾及其他

程式碼控制的事務管理2. 引數化配置的事務管理下面就這兩種方式進行介紹。u 程式碼控制的事務管理首先,進行以下配置,假設配置檔案為(Application-Context.xml):<beans><bean id="dataSource" class="org.apache.commons.

spring+hibernate實現分庫分表操作

分庫的實現: 寫一個動態的資料來源類 public class DynamicDataSource extends AbstractRoutingDataSource {@Overrideprotected Object determineCurrentLookupKey

Spring+Hibernate+Struts2整合之實現登錄功能

else ber mit generate public rac err web field 軟件152 劉安民 前端代碼: <form id="loginForm" action="${ pageContext.request.contextPath }/us

Struts2+Spring+Hibernate實現員工管理增刪改查功能(一)之ssh框架整合

pri support scrip ext ack efault ring src 兩張 前言 轉載請標明出處:http://www.cnblogs.com/smfx1314/p/7795837.html 本項目是我寫的一個練習,目的是回顧ssh框架的整合以及

CXF+Spring+Hibernate實現RESTful webservice服務端實例

fast anti vax apach sql xsd txadvice component path 1.RESTful API接口定義 /* * Copyright 2016-2017 WitPool.org All Rights Reserved. * *

基於spring+mybatis+atomikos+jta實現分布式事務(1)

dto action 綁定 adapter lns url tab source private 本文介紹基於spring+mybatis+atomikos+jta實現分布式事務,分布式事務的實現方式基於配置文件,不同的mybatis mapper綁定在不同的數據源上,通過

spring+mybatis+tkmapper+atomikos實現分布式事務(3)-動態切換數據源

springmvc mybatis tkmapper atomiks 動態數據源 本文介紹基於spring+mybatis+tkmapper+atomikos+jta實現分布式事務,由程序動態切換數據源,通過atomikos可實現分布式事務一致性。通過繼承MapperScannerConf

Java程式設計師從笨鳥到菜鳥之(八十)細談Spring(九)spring+hibernate宣告式事務管理詳解

分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!        

JDK動態代理和CGLIB動態代理,實現Spring註解管理事務區別。

註解式事務配置 1.JDK動態代理 <tx:annotation-driven transaction-manager="txManager"/>  預設啟用JDK動態代理,JDK只能代理介面不能代理類。 @Transactional註解可以標註在