1. 程式人生 > >Spring整合mybatis中的sqlSession是如何做到執行緒隔離的?

Spring整合mybatis中的sqlSession是如何做到執行緒隔離的?

專案中常常使用mybatis配合spring進行資料庫操作,但是我們知道,資料的操作是要求做到執行緒安全的,而且按照原來的jdbc的使用方式,每次操作完成之後都要將連線關閉,但是實際使用中我們並沒有這麼幹。

更讓人疑惑的點是,spring中預設使用單例形式來載入bean,而往往我們也不會改變這種預設,所以,是所有執行緒共享資料連線?

讓我們來看看真相!

自然是要個例子的:

我們來看下spring中配置mybatis資料庫操作bean(使用 druid 連線池):

    <bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource">
        <property name="url" value="${jdbc.url}" />
        <property name="driverClassName" value="${jdbc.driver}" />
        <property name="username" value="${jdbc.username}" />
        <property name="password" value="${jdbc.password}" />
    </bean>

    <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
        <property name="dataSource" ref="dataSource" />
        <property name="configLocation" value="classpath:mybatis-config.xml" />
    </bean>

    <!-- scope="prototype" 另說,另討論,我們先以mapper形式看一下 -->
    <bean id="sqlSession" class="org.mybatis.spring.SqlSessionTemplate">
        <constructor-arg index="0" ref="sqlSessionFactory" />
    </bean>

    <!-- 事務 -->
    <bean name="transactionManager"
          class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
        <property name="dataSource" ref="dataSource"></property>
    </bean>

而在java程式碼中使用則是使用依賴注入直接使用 @resource sqlSession, 如下:

    @Resource
    private SqlSessionTemplate sqlSession;

    @Override
    public User getUser(Map<String, String> cond) {
        // 此句執行db查詢
        User result = sqlSession.selectOne(NAME_SPACE
                + ".getUser", cond);
        return result;
    }

這個sqlSession就是直接去操作資料庫了看起來是這樣,是在bean初始化的時候依賴注入的!

所以,難道每次進入該操作的時候,sqlSession 的例項都會變化嗎?答案是否定的。

那麼,肯定就是往下使用的時候才發生的變化唄!

再往下走,可以看到,呼叫了一個代理來進行具體的查詢!

  // org/mybatis/spring/SqlSessionTemplate.selectOne()
  public <T> T selectOne(String statement, Object parameter) {
    return this.sqlSessionProxy.<T> selectOne(statement, parameter);
  }

為啥要用代理呢?自己直接查不就行了嗎?其實,用代理是有好處的,那就可以可以進行另外的包裝!

代理是怎麼生成的呢?其實只要看一下 SqlSessionTemplate 的構造方法就知道了!

/**
   * Constructs a Spring managed {@code SqlSession} with the given
   * {@code SqlSessionFactory} and {@code ExecutorType}.
   * A custom {@code SQLExceptionTranslator} can be provided as an
   * argument so any {@code PersistenceException} thrown by MyBatis
   * can be custom translated to a {@code RuntimeException}
   * The {@code SQLExceptionTranslator} can also be null and thus no
   * exception translation will be done and MyBatis exceptions will be
   * thrown
   *
   * @param sqlSessionFactory
   * @param executorType
   * @param exceptionTranslator
   */
  public SqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType,
      PersistenceExceptionTranslator exceptionTranslator) {

    notNull(sqlSessionFactory, "Property 'sqlSessionFactory' is required");
    notNull(executorType, "Property 'executorType' is required");

    this.sqlSessionFactory = sqlSessionFactory;
    this.executorType = executorType;
    this.exceptionTranslator = exceptionTranslator;
    // 生成代理 SqlSessionInterceptor 為 InvocationHandler
    this.sqlSessionProxy = (SqlSession) newProxyInstance(
        SqlSessionFactory.class.getClassLoader(),
        new Class[] { SqlSession.class },
        new SqlSessionInterceptor());
  }

從上面的程式碼,看不到細節,但是,大致還是知道代理的具體實現了!即使用 SqlSessionInterceptor 去處理具體查詢邏輯!

我們來看下 SqlSessionInterceptor 的實現!

/**
   * Proxy needed to route MyBatis method calls to the proper SqlSession got
   * from Spring's Transaction Manager
   * It also unwraps exceptions thrown by {@code Method#invoke(Object, Object...)} to
   * pass a {@code PersistenceException} to the {@code PersistenceExceptionTranslator}.
   */
  private class SqlSessionInterceptor implements InvocationHandler {
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
      SqlSession sqlSession = getSqlSession(
          SqlSessionTemplate.this.sqlSessionFactory,
          SqlSessionTemplate.this.executorType,
          SqlSessionTemplate.this.exceptionTranslator);
      try {
        Object result = method.invoke(sqlSession, args);
        if (!isSqlSessionTransactional(sqlSession, SqlSessionTemplate.this.sqlSessionFactory)) {
          // force commit even on non-dirty sessions because some databases require
          // a commit/rollback before calling close()
          sqlSession.commit(true);
        }
        return result;
      } catch (Throwable t) {
        Throwable unwrapped = unwrapThrowable(t);
        if (SqlSessionTemplate.this.exceptionTranslator != null && unwrapped instanceof PersistenceException) {
          // release the connection to avoid a deadlock if the translator is no loaded. See issue #22
          closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory);
          sqlSession = null;
          Throwable translated = SqlSessionTemplate.this.exceptionTranslator.translateExceptionIfPossible((PersistenceException) unwrapped);
          if (translated != null) {
            unwrapped = translated;
          }
        }
        throw unwrapped;
      } finally {
        if (sqlSession != null) {
          closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory);
        }
      }
    }
  }

SqlSessionInterceptor 是 SqlSessionTemplate 的內部類,目的只有一個,就是處理多個 session 的db操作!

所有請求都被 invoke() 攔截,從而做相應處理:

  • 進入請求,先生成一個新的sqlSession,為本次db操作做準備;

  • 通過反射呼叫請求進來的方法,將 sqlSession 回撥,進行復雜查詢及結果對映;

  • 如果需要立即提交事務,do it;

  • 如果出現異常,包裝異常資訊,重新丟擲;

  • 操作完成後,關閉本次session;

到這裡,其實我們好像已經明白了,其實外面的 sqlSession 單例,並不會影響具體的db操作控制,所以不用擔心session的執行緒安全問題!

不過,還有個點值得考慮下,如果我一次請求裡有多次資料庫操作,難道我真的要建立多個sqlSession或者說資料庫連線?不會吧!

如果這個問題得不到解決,可能你並不真正瞭解session的定義了!

所以我們需要繼續看一下 session 到底是怎麼獲取的?

getSqlSession() 方法是在 SqlSessionUtils 中實現的!如下:

/**
   * Gets an SqlSession from Spring Transaction Manager or creates a new one if needed.
   * Tries to get a SqlSession out of current transaction. If there is not any, it creates a new one.
   * Then, it synchronizes the SqlSession with the transaction if Spring TX is active and
   * <code>SpringManagedTransactionFactory</code> is configured as a transaction manager.
   *
   * @param sessionFactory a MyBatis {@code SqlSessionFactory} to create new sessions
   * @param executorType The executor type of the SqlSession to create
   * @param exceptionTranslator Optional. Translates SqlSession.commit() exceptions to Spring exceptions.
   * @throws TransientDataAccessResourceException if a transaction is active and the
   *             {@code SqlSessionFactory} is not using a {@code SpringManagedTransactionFactory}
   * @see SpringManagedTransactionFactory
   */
  public static SqlSession getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType, PersistenceExceptionTranslator exceptionTranslator) {

    notNull(sessionFactory, "No SqlSessionFactory specified");
    notNull(executorType, "No ExecutorType specified");

    SqlSessionHolder holder = (SqlSessionHolder) TransactionSynchronizationManager.getResource(sessionFactory);

    // 如果已經有holder,則直接返回,複用連線
    if (holder != null && holder.isSynchronizedWithTransaction()) {
      if (holder.getExecutorType() != executorType) {
        throw new TransientDataAccessResourceException("Cannot change the ExecutorType when there is an existing transaction");
      }

      holder.requested();

      if (logger.isDebugEnabled()) {
        logger.debug("Fetched SqlSession [" + holder.getSqlSession() + "] from current transaction");
      }

      return holder.getSqlSession();
    }

    if (logger.isDebugEnabled()) {
      logger.debug("Creating a new SqlSession");
    }

    SqlSession session = sessionFactory.openSession(executorType);

    // Register session holder if synchronization is active (i.e. a Spring TX is active)
    //
    // Note: The DataSource used by the Environment should be synchronized with the
    // transaction either through DataSourceTxMgr or another tx synchronization.
    // Further assume that if an exception is thrown, whatever started the transaction will
    // handle closing / rolling back the Connection associated with the SqlSession.
    if (TransactionSynchronizationManager.isSynchronizationActive()) {
      Environment environment = sessionFactory.getConfiguration().getEnvironment();

      if (environment.getTransactionFactory() instanceof SpringManagedTransactionFactory) {
        if (logger.isDebugEnabled()) {
          logger.debug("Registering transaction synchronization for SqlSession [" + session + "]");
        }

        holder = new SqlSessionHolder(session, executorType, exceptionTranslator);
        TransactionSynchronizationManager.bindResource(sessionFactory, holder);
        TransactionSynchronizationManager.registerSynchronization(new SqlSessionSynchronization(holder, sessionFactory));
        holder.setSynchronizedWithTransaction(true);
        holder.requested();
      } else {
        if (TransactionSynchronizationManager.getResource(environment.getDataSource()) == null) {
          if (logger.isDebugEnabled()) {
            logger.debug("SqlSession [" + session + "] was not registered for synchronization because DataSource is not transactional");
          }
        } else {
          throw new TransientDataAccessResourceException(
              "SqlSessionFactory must be using a SpringManagedTransactionFactory in order to use Spring transaction synchronization");
        }
      }
    } else {
      if (logger.isDebugEnabled()) {
        logger.debug("SqlSession [" + session + "] was not registered for synchronization because synchronization is not active");
      }
    }

    return session;
  }

如上獲取 sqlSession 邏輯,主要分兩種情況!

歡迎大家進我的私人技術交流群【Java高階網際網路架構:964357187】點選進入

  1. 如果存在holder,則返回原有的sqlSession,到於這個holder我們稍後再說;

  2. 如果沒有,則建立一個新連線!

所以,看起來情況還不是太糟,至少有複用的概念了!

那麼問題來了,複用?如何做到執行緒安全?所以我們要看下 SqlSessionHolder 的實現了!

獲取holder是通過 TransactionSynchronizationManager.getResource(sessionFactory); 獲取的:

public static Object getResource(Object key) {
        Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
        // 實際獲取
        Object value = doGetResource(actualKey);
        if (value != null && logger.isTraceEnabled()) {
            logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +
                    Thread.currentThread().getName() + "]");
        }
        return value;
    }

    private static Object doGetResource(Object actualKey) {
        Map<Object, Object> map = resources.get();
        if (map == null) {
            return null;
        }
        Object value = map.get(actualKey);
        // Transparently remove ResourceHolder that was marked as void...
        if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
            map.remove(actualKey);
            // Remove entire ThreadLocal if empty...
            if (map.isEmpty()) {
                resources.remove();
            }
            value = null;
        }
        return value;
    }

咱們忽略對 key 的處理,實際是直接呼叫 doGetResource() 獲取holder。而 doGetResource() 中,則使用了 resources 來儲存具體的 kv。 resources 明顯是個共享變數,但是看起來這裡沒有任何的加鎖操作!這是為何?

只要看一下 resources 的定義就知道了,其實現為 ThreadLocal, 所以是執行緒安全了!

private static final ThreadLocal<Map<Object, Object>> resources =
     new NamedThreadLocal<Map<Object, Object>>("Transactional resources");

在新的請求進來時,自然是沒有值的,所以直接返回null.而後續進入,則獲取快取返回!

而對於沒有獲取到 holder 的情況,則需要重新建立一個 session 了!

這裡獲取session由DefaultSqlSessionFactory 進行建立!如下:

// org.apache.ibatis.session.defaults.DefaultSqlSessionFactory.openSession()
  public SqlSession openSession(ExecutorType execType) {
    return openSessionFromDataSource(execType, null, false);
  }

  private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) {
    Transaction tx = null;
    try {
      final Environment environment = configuration.getEnvironment();
      // SpringManagedTransactionFactory
      final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);
      tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);
      final Executor executor = configuration.newExecutor(tx, execType);
      return new DefaultSqlSession(configuration, executor, autoCommit);
    } catch (Exception e) {
      closeTransaction(tx); // may have fetched a connection so lets call close()
      throw ExceptionFactory.wrapException("Error opening session.  Cause: " + e, e);
    } finally {
      ErrorContext.instance().reset();
    }
  }

建立 session 幾件事:

  • 根據環境配置,開啟一個新事務,該事務管理器會負責後續jdbc連線管理工作;

  • 根據事務建立一個 Executor,備用;

  • 用DefaultSqlSession 將 executor 包裝後返回,用於後續真正的db操作;

至此,真正的 sqlSession 已經建立成功!返回後,就可以真正使用了!

等等,建立的session好像並沒有儲存,那麼還是那個問題,每個sql都會建立一個 sqlSession ? 好吧,是這樣的!前面的holder,只是用於存在事務操作的連線!(holder的理解出了偏差哦)

但是有一點,這裡雖然建立了多個 sqlSession 例項,但是並不意味著有多個db連線,具體使用db連線時,則一般會會使用連線池來進行優化!如前面提到的 druid 就是個不錯的選擇!

真實的jdbc連接獲取,是在進行真正的 query 時,才進行呼叫 getConnection() 進行接入!

具體則是在 doQuery() 時,進行st的組裝時呼叫的 ,如下:

// SimpleExecutor.prepareStatement()
  private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
    Statement stmt;
    // 獲取 jdbc 連線,返回 java.sql.Connection
    Connection connection = getConnection(statementLog);
    stmt = handler.prepare(connection);
    handler.parameterize(stmt);
    return stmt;
  }

  // 呼叫 BaseExecutor.getConnection()
  protected Connection getConnection(Log statementLog) throws SQLException {
    // SpringManagedTransaction 管理 connection
    Connection connection = transaction.getConnection();
    if (statementLog.isDebugEnabled()) {
      return ConnectionLogger.newInstance(connection, statementLog, queryStack);
    } else {
      return connection;
    }
  }

通過前面通過事務管理工廠建立的 SpringManagedTransaction 進行 connection 獲取!一個事務管理器只會存在一次獲取資料庫連線的操作!

public Connection getConnection() throws SQLException {
    if (this.connection == null) {
      openConnection();
    }
    return this.connection;
  }

  // 而 SpringManagedTransaction 又將connection交由 DataSourceUtils 進行管理!
  // org/springframework/jdbc/datasource/DataSourceUtils
    public static Connection getConnection(DataSource dataSource) throws CannotGetJdbcConnectionException {
        try {
            // 真正的連接獲取
            return doGetConnection(dataSource);
        }
        catch (SQLException ex) {
            throw new CannotGetJdbcConnectionException("Could not get JDBC Connection", ex);
        }
    }

    /**
     * Actually obtain a JDBC Connection from the given DataSource.
     * Same as {@link #getConnection}, but throwing the original SQLException.
     * <p>Is aware of a corresponding Connection bound to the current thread, for example
     * when using {@link DataSourceTransactionManager}. Will bind a Connection to the thread
     * if transaction synchronization is active (e.g. if in a JTA transaction).
     * <p>Directly accessed by {@link TransactionAwareDataSourceProxy}.
     * @param dataSource the DataSource to obtain Connections from
     * @return a JDBC Connection from the given DataSource
     * @throws SQLException if thrown by JDBC methods
     * @see #doReleaseConnection
     */
    public static Connection doGetConnection(DataSource dataSource) throws SQLException {
        Assert.notNull(dataSource, "No DataSource specified");

        ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
        if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
            conHolder.requested();
            if (!conHolder.hasConnection()) {
                logger.debug("Fetching resumed JDBC Connection from DataSource");
                conHolder.setConnection(dataSource.getConnection());
            }
            return conHolder.getConnection();
        }
        // Else we either got no holder or an empty thread-bound holder here.

        logger.debug("Fetching JDBC Connection from DataSource");
        // 通過接入的dataSource進行連接獲取,這裡將會是最終的jdbc連線
        Connection con = dataSource.getConnection();

        if (TransactionSynchronizationManager.isSynchronizationActive()) {
            logger.debug("Registering transaction synchronization for JDBC Connection");
            // Use same Connection for further JDBC actions within the transaction.
            // Thread-bound object will get removed by synchronization at transaction completion.
            ConnectionHolder holderToUse = conHolder;
            if (holderToUse == null) {
                holderToUse = new ConnectionHolder(con);
            }
            else {
                holderToUse.setConnection(con);
            }
            holderToUse.requested();
            TransactionSynchronizationManager.registerSynchronization(
                    new ConnectionSynchronization(holderToUse, dataSource));
            holderToUse.setSynchronizedWithTransaction(true);
            if (holderToUse != conHolder) {
                TransactionSynchronizationManager.bindResource(dataSource, holderToUse);
            }
        }

        return con;
    }

上面的實現主要做三件事:

  1. 再次確認,是否存在事務處理,holder是否存在,如果有則複用;

  2. 如果沒有,那再從資料來源處獲取連線;

  3. 獲取新連線成功後,檢查如果存在事務,則將新獲取的連線放入holder中儲存起來,以備下次使用;

獲取jdbc連線後,就可以真正發起execute()查詢了。

資料庫連線的疑問算是解答了!我們發現,外部的框架並沒有多少為我們節省db連線的動作!而是把最終 getConnection() 交給 datasource 資料來源!

而真正解決我們連線複用的問題的,是像 Druid 這樣的連線池元件!所以,咱們可以單獨來看這些中介軟體了!

歡迎大家進我的私人技術交流群【Java高階網際網路架構:964357187】點選進入

有任何問題都可以在群裡提出哦,群內有就職阿里、京東等一線網際網路公司大佬給大家解答技術問題,歡迎大家進群