1. 程式人生 > >Spring事務實現原始碼之事務實現以及Connection的繫結與獲取

Spring事務實現原始碼之事務實現以及Connection的繫結與獲取

PlatformTransactionManager是spring事務的高階抽象,事務的實現需要藉助PlatformTransactionManager完成,該管理主要方法如下:

當我們在使用事務的時候,需要呼叫如下方法獲取一個事務狀態物件。

TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException;
接下來就看一下AbstractPlatformTransactionManager#getTransaction原始碼:
  @Override
    public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
        
        //transaction型別:DataSourceTransactionManager.DataSourceTransactionObject
        //注意 DataSourceTransactionObject的資料結構,DataSourceTransactionObject包含一個ConnectionHolder
        // 此時會
        Object transaction = doGetTransaction();

        if (definition == null) {
            // 如果傳入的事務定義例項為null的話則建立一個預設的事務定義例項
            definition = new DefaultTransactionDefinition();
        }

        if (isExistingTransaction(transaction)) {
            // 事務傳播行為有關的的處理
            return handleExistingTransaction(definition, transaction, debugEnabled);
        }

        // Check definition settings for new transaction.
        if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
            throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
        }

        // No existing transaction found -> check propagation behavior to find out how to proceed.
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
            throw new IllegalTransactionStateException(
                    "No existing transaction found for transaction marked with propagation 'mandatory'");
        } else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            SuspendedResourcesHolder suspendedResources = suspend(null);
            if (debugEnabled) {
                logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
            }
            try {
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                //重重之重的程式碼,完成建立資料庫連線繫結以及設定自動提交為False以及
                doBegin(transaction, definition);
                prepareSynchronization(status, definition);
                return status;
            } catch (RuntimeException ex) {
                resume(null, suspendedResources);
                throw ex;
            } catch (Error err) {
                resume(null, suspendedResources);
                throw err;
            }
        } else {
            // Create "empty" transaction: no actual transaction, but potentially synchronization.
            if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
                logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                        "isolation level will effectively be ignored: " + definition);
            }
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
        }
    }
doGetTransaction方法會嘗試獲取連線,如果當前執行緒沒有繫結Connection的話則返回null,org.springframework.jdbc.datasource.DataSourceTransactionManager#doGetTransaction原始碼如下:
	@Override
	protected Object doGetTransaction() {
		DataSourceTransactionObject txObject = new DataSourceTransactionObject();
		txObject.setSavepointAllowed(isNestedTransactionAllowed());
		// 重點程式碼,使用TransactionSynchronizationManager獲取上下文是否有Connection
        //TransactionSynchronizationManager獲取連線是藉助ThreadLocal實現的
        ConnectionHolder conHolder =
				(ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
		txObject.setConnectionHolder(conHolder, false);
		return txObject;
	}

在doGetTransaction如果當前沒有繫結連線的話,則需要給當前執行緒繫結一個連線;org.springframework.transaction.support.AbstractPlatformTransactionManager#doBegin方法是事務的基礎,接下里以DataSourceTransactionManager實現為例,檢視一下原始碼:

@Override
    protected void doBegin(Object transaction, TransactionDefinition definition) {
        // DataSourceTransactionObject持有資料庫連線ConnectionHolder成員
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        Connection con = null;

        try {
            if (txObject.getConnectionHolder() == null ||txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
                // 上下文沒有獲取到Connection,因此需要在資料庫連線池中獲取一個Connection 並設定到事務物件中
                // 但是由於該連線並沒有和執行緒繫結因此需要跟執行緒繫結,下文TransactionSynchronizationManager.bindResourc方法繫結到執行緒
                Connection newCon = this.dataSource.getConnection();
                if (logger.isDebugEnabled()) {
                    logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
                }
                txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
            }

            txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
            con = txObject.getConnectionHolder().getConnection();

            Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
            txObject.setPreviousIsolationLevel(previousIsolationLevel);
            
            if (con.getAutoCommit()) {
                txObject.setMustRestoreAutoCommit(true);
                if (logger.isDebugEnabled()) {
                    logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
                }
                con.setAutoCommit(false);
            }
            txObject.getConnectionHolder().setTransactionActive(true);

            int timeout = determineTimeout(definition);
            if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
            }

            // 將連線跟當前執行緒進行繫結
            if (txObject.isNewConnectionHolder()) {
                TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
            }
        } catch (Throwable ex) {
            if (txObject.isNewConnectionHolder()) {
                DataSourceUtils.releaseConnection(con, this.dataSource);
                txObject.setConnectionHolder(null, false);
            }
            throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
        }
    }

到此為止就完成了當前執行緒的Connection繫結,繫結資源時候呼叫TransactionSynchronizationManager#bindResource方法,傳入兩個引數:分別是DataSource與ConnechtionHolder,這是為了避免一個執行緒處理多個連線池的Connection時候出錯而設定,這樣獲取連線時候會根據執行緒與連線池共同為key獲取對應的唯一Connection。在Connection繫結時候TransactionSynchronizationManager是一個重要的工具。

TransactionSynchronizationManager 該類是一個重要的工具類,用於Spring事務中的資源繫結,每一個執行緒繫結一個屬於自己的Connection,這樣保證事務有序不亂。底層使用ThreadLocal實現!!!!

當某一執行緒需要獲取連線時候會呼叫doGetResource方法獲取連線: org.springframework.transaction.support.TransactionSynchronizationManager#doGetResource; 當doGetResource方法獲取不到連線時候呼叫bindResource方法繫結連線: org.springframework.transaction.support.TransactionSynchronizationManager#bindResource

接下來檢視一下資料查詢時候如何獲取執行緒繫結的Connection,為了便於debug使用的程式設計式事務管理,原始碼如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans
        xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:context="http://www.springframework.org/schema/context"
        xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
    <!-- 掃描註解-->
    <context:component-scan base-package="com.javartisan.jdbc.spring"/>

    <bean id="datasource" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
        <property name="username" value="root"/>
        <property name="password" value="root"/>
        <property name="minIdle" value="2"/>
        <property name="maxIdle" value="5"/>
        <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
        <property name="maxTotal" value="10"/>
        <property name="initialSize" value="5"/>
        <property name="url"
                  value="jdbc:mysql://127.0.0.1:3306/databasebook?serverTimezone=UTC&amp;characterEncoding=utf-8&amp;useSSL=true"/>
    </bean>
    <bean id="platformTransactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
        <property name="dataSource" ref="datasource"/>
    </bean>
    <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
        <property name="dataSource" ref="datasource"/>
    </bean>
</beans>

Service原始碼:

@Service
public class StudentService {
    
    @Resource
    private PlatformTransactionManager platformTransactionManager;
    @Resource
    private StudentDao studentDao;
    public List<Student> getStudents() {
        TransactionDefinition td = new DefaultTransactionDefinition();
        TransactionStatus status = platformTransactionManager.getTransaction(td);
        List<Student> res = null;
        try {
            res = studentDao.getStudents();
            studentDao.insert();
            // int intRes = 1 / 0;
            platformTransactionManager.commit(status);
        } catch (Exception e) {
            platformTransactionManager.rollback(status);
        }
        return res;
    }

}

Dao原始碼:

@Repository
public class StudentDao {

    @Resource
    private JdbcTemplate jdbcTemplate;

    public List<Student> getStudents() {
        return jdbcTemplate.query("select * from student", new StudentMapper());
    }

    public void insert() {
        Object[] args = {System.currentTimeMillis() + "", "0907", "1", 18, "0907"};
        int[] types = {Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.INTEGER, Types.VARCHAR};
        jdbcTemplate.update("INSERT INTO databasebook.STUDENT (sno, sname, ssex, sage, sdept) VALUES (?,?,?, ?, ?);", args, types);
    }
}

此處以getStudents方法為例跟進分析如何獲取當前繫結的Connection,方法呼叫流程如下:

jdbcTemplate.query呼叫JdbcTemplate.execute方法,重點看一下JdbcTemplate.execute原始碼:
@Override
	public <T> T execute(StatementCallback<T> action) throws DataAccessException {
		Assert.notNull(action, "Callback object must not be null");
        // 重點程式碼,使用DataSourceUtils工具獲取繫結資源
		Connection con = DataSourceUtils.getConnection(getDataSource());
		Statement stmt = null;
		try {
			Connection conToUse = con;
			if (this.nativeJdbcExtractor != null &&
					this.nativeJdbcExtractor.isNativeConnectionNecessaryForNativeStatements()) {
				conToUse = this.nativeJdbcExtractor.getNativeConnection(con);
			}
			stmt = conToUse.createStatement();
			applyStatementSettings(stmt);
			Statement stmtToUse = stmt;
			if (this.nativeJdbcExtractor != null) {
				stmtToUse = this.nativeJdbcExtractor.getNativeStatement(stmt);
			}
			T result = action.doInStatement(stmtToUse);
			handleWarnings(stmt);
			return result;
		}
		catch (SQLException ex) {
			// Release Connection early, to avoid potential connection pool deadlock
			// in the case when the exception translator hasn't been initialized yet.
			JdbcUtils.closeStatement(stmt);
			stmt = null;
			DataSourceUtils.releaseConnection(con, getDataSource());
			con = null;
			throw getExceptionTranslator().translate("StatementCallback", getSql(action), ex);
		}
		finally {
			JdbcUtils.closeStatement(stmt);
			DataSourceUtils.releaseConnection(con, getDataSource());
		}
	}

org.springframework.jdbc.datasource.DataSourceUtils#getConnection原始碼如下:

	public static Connection getConnection(DataSource dataSource) throws CannotGetJdbcConnectionException {
		try {
			return doGetConnection(dataSource);
		}
		catch (SQLException ex) {
			throw new CannotGetJdbcConnectionException("Could not get JDBC Connection", ex);
		}
	}

org.springframework.jdbc.datasource.DataSourceUtils#doGetConnection原始碼:

public static Connection doGetConnection(DataSource dataSource) throws SQLException {
        Assert.notNull(dataSource, "No DataSource specified");
        // 重點:還是TransactionSynchronizationManager獲取繫結資源
        ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
        if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
            conHolder.requested();
            if (!conHolder.hasConnection()) {
                logger.debug("Fetching resumed JDBC Connection from DataSource");
                conHolder.setConnection(dataSource.getConnection());
            }
            //返回繫結資源
            return conHolder.getConnection();
        }
        // Else we either got no holder or an empty thread-bound holder here.
        // 如果沒有獲取到的話則進行建立並繫結資源資源返回
        logger.debug("Fetching JDBC Connection from DataSource");
        Connection con = dataSource.getConnection();

        if (TransactionSynchronizationManager.isSynchronizationActive()) {
            logger.debug("Registering transaction synchronization for JDBC Connection");
            // Use same Connection for further JDBC actions within the transaction.
            // Thread-bound object will get removed by synchronization at transaction completion.
            ConnectionHolder holderToUse = conHolder;
            if (holderToUse == null) {
                holderToUse = new ConnectionHolder(con);
            } else {
                holderToUse.setConnection(con);
            }
            holderToUse.requested();
            TransactionSynchronizationManager.registerSynchronization(
                    new ConnectionSynchronization(holderToUse, dataSource));
            holderToUse.setSynchronizedWithTransaction(true);
            if (holderToUse != conHolder) {
                TransactionSynchronizationManager.bindResource(dataSource, holderToUse);
            }
        }

        return con;
    }

到此就即可獲取到繫結的Connection資源!!!

重重重要:

TransactionSynchronizationManager是繫結資源與獲取繫結資源的核心工具!!!!