1. 程式人生 > >sharding-jdbc系列之SQL執行(六)

sharding-jdbc系列之SQL執行(六)

前言

在前面我們介紹,通過SQL路由找到具體的執行表,通過SQL改寫生成具體的執行SQL, 拿到具體的結果之後,sharding-jdbc下一步是幹嘛呢,

下一步當然是SQL執行了。

route

程式碼入口: com.dangdang.ddframe.rdb.sharding.jdbc.core.statement.ShardingPreparedStatement

private Collection<PreparedStatementUnit> route() throws SQLException {
        Collection<PreparedStatementUnit> result = new LinkedList<>();
          // SQL路由,SQL改寫都在這個route方法裡面
        routeResult = routingEngine.route(getParameters());
          // 迴圈執行單位(路由結果中,每個表的執行單位)
        for (SQLExecutionUnit each : routeResult.getExecutionUnits()) {
              // 獲取SQL型別
            SQLType sqlType = routeResult.getSqlStatement().getType();
            Collection<PreparedStatement> preparedStatements;
            if (SQLType.DDL == sqlType) {
                  //CREATE , ALTER ,DROP ,TRUNCATE操作 都走這裡
                preparedStatements = generatePreparedStatementForDDL(each);
            } else {
                  // 增刪查改的SQL都走這裡 , 主要看這個方法 generatePreparedStatement
                preparedStatements = Collections.singletonList(generatePreparedStatement(each));
            }
              // 將preparedStatements 放入集合,後續preparedStatement執行完成之後,會從routedStatements獲取結果
            routedStatements.addAll(preparedStatements); 
            for (PreparedStatement preparedStatement : preparedStatements) {
                  // 更新PreparedStatement中的引數值
                replaySetParameter(preparedStatement);
                  // 構建一個PreparedStatementUnit執行單位,放入返回結果
                result.add(new PreparedStatementUnit(each, preparedStatement));
            }
        }
        return result;
    }

routedStatements.addAll(preparedStatements) :

將preparedStatements 放入集合,後續preparedStatement執行完成之後,會從routedStatements獲取結果 , 這行程式碼比較重要

private PreparedStatement generatePreparedStatement(final SQLExecutionUnit sqlExecutionUnit) throws SQLException {
          // 獲取資料庫連線
        Connection connection = getConnection().getConnection(sqlExecutionUnit.getDataSource(), routeResult.getSqlStatement().getType());
          // 構建 PreparedStatement
        return returnGeneratedKeys ? connection.prepareStatement(sqlExecutionUnit.getSql(), RETURN_GENERATED_KEYS)
                : connection.prepareStatement(sqlExecutionUnit.getSql(), resultSetType, resultSetConcurrency, resultSetHoldability);
    }

步驟說明:

1.拿到SQL路由,SQL改寫的結果,迴圈 結果集

2.判斷SQL型別

3.獲取資料庫連線,構建PreparedStatement

4.將PreparedStatement集合放入routedStatements , 方便後續獲取執行結果。

4.返回PreparedStatement結果

SQL執行

回到原始碼最開始的路口: com.dangdang.ddframe.rdb.sharding.jdbc.core.statement.ShardingPreparedStatement

@Override
    public boolean execute() throws SQLException {
        try {
              // 這個地方的route方法,就是我們上面講的,獲取到了PreparedStatementUnit集合
            Collection<PreparedStatementUnit> preparedStatementUnits = route();
              // 呼叫PreparedStatementExecutor的exceute方法進行SQL執行
            return new PreparedStatementExecutor(
                    getConnection().getShardingContext().getExecutorEngine(), routeResult.getSqlStatement().getType(), preparedStatementUnits, getParameters()).execute();
        } finally {
            clearBatch();
        }
    }
public boolean execute() {
          // 構建ExecuteCallback物件,然後通過executePreparedStatement方法進行執行
        List<Boolean> result = executorEngine.executePreparedStatement(sqlType, preparedStatementUnits, parameters, new ExecuteCallback<Boolean>() {

            @Override
            public Boolean execute(final BaseStatementUnit baseStatementUnit) throws Exception {
                return ((PreparedStatement) baseStatementUnit.getStatement()).execute();
            }
        });
          // 執行結果為空,說明執行失敗,則返回false
        if (null == result || result.isEmpty() || null == result.get(0)) {
            return false;
        }
          // 執行成功,預設返回第一個PreparedStatement執行的結果。
        return result.get(0);
    }

至於為什麼取第一個的結果,下面會詳細講的,這個涉及到同步執行和非同步執行的問題

/**
* 執行PreparedStatement.
* @param sqlType SQL型別
* @param preparedStatementUnits 語句物件執行單元集合
* @param parameters 引數列表
* @param executeCallback 執行回撥函式
* @param <T> 返回值型別
* @return 執行結果
*/
public <T> List<T> executePreparedStatement(
       final SQLType sqlType, final Collection<PreparedStatementUnit> preparedStatementUnits, final List<Object> parameters, final ExecuteCallback<T> executeCallback) {
   return execute(sqlType, preparedStatementUnits, Collections.singletonList(parameters), executeCallback);
}

execute

private  <T> List<T> execute(
       final SQLType sqlType, final Collection<? extends BaseStatementUnit> baseStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) {
   if (baseStatementUnits.isEmpty()) {
       return Collections.emptyList();
   }
   Iterator<? extends BaseStatementUnit> iterator = baseStatementUnits.iterator();
   BaseStatementUnit firstInput = iterator.next();
   // 已經把第一個取出來了firstInput ,第二個任務開始所有 SQL任務 提交執行緒池【非同步】執行任務
   ListenableFuture<List<T>> restFutures = asyncExecute(sqlType, Lists.newArrayList(iterator), parameterSets, executeCallback);
   T firstOutput;
   List<T> restOutputs;
   try {
       // 第一個任務【同步】執行任務
       firstOutput = syncExecute(sqlType, firstInput, parameterSets, executeCallback);
       // 等待第二個任務開始所有 SQL任務完成 , ListenableFuture是繼承了Future的,他的get方法,只有在執行結果返回之後,才能get到值,否則一直阻塞
       restOutputs = restFutures.get();
       //CHECKSTYLE:OFF
   } catch (final Exception ex) {
       //CHECKSTYLE:ON
       ExecutorExceptionHandler.handleException(ex);
       return null;
   }
   // 返回結果
   List<T> result = Lists.newLinkedList(restOutputs);
   result.add(0, firstOutput);
   return result;
}

非同步執行

private <T> ListenableFuture<List<T>> asyncExecute(
            final SQLType sqlType, final Collection<BaseStatementUnit> baseStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) {
          // 構建一個ListenableFuture集合
        List<ListenableFuture<T>> result = new ArrayList<>(baseStatementUnits.size());
        final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
        final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
          //將實現了callable的任務放入到執行緒池中,得到一個帶有回撥機制的ListenableFuture例項,
        //通過Futures.addCallback方法對得到的ListenableFuture例項進行監聽,一旦得到結果就進入到onSuccess方法中,
        //在onSuccess方法中將查詢的結果存入到集合中
        for (final BaseStatementUnit each : baseStatementUnits) {
              // 呼叫executorService執行緒池,提交任務至執行緒池非同步化處理,submit返回一個ListenableFuture物件
            result.add(executorService.submit(new Callable<T>() {

                @Override
                public T call() throws Exception {
                      // 執行SQL
                    return executeInternal(sqlType, each, parameterSets, executeCallback, isExceptionThrown, dataMap);
                }
            }));
        }
          //這裡將集合中的若干ListenableFuture形成一個新的ListenableFuture
        //目的是為了當呼叫新的ListenableFuture.get()方法是非同步阻塞,
          //直到所有的ListenableFuture都得到結果才繼續當前執行緒
        //阻塞的時間取的是所有任務中用時最長的一個
        return Futures.allAsList(result);
    }

總結:

1.第一個任務是當前執行緒直接同步執行, 從第二個開始,其他任務都交給非同步執行緒來執行

2.restFutures.get()通過Future機制阻塞執行緒,等待所有執行緒執行完畢之後,才會獲取到結果,也就是說只有所有的任務執行成功之後,才會返回結果

ListenableFuture是guava的一個內部實現,後面會找相關的文章給大家分享一下。

SQL執行完成之後,結果會封裝在PreparedStatement物件裡面,通過getResultSet()方法,可以獲取到執行結果。

最終執行

private <T> T executeInternal(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<List<Object>> parameterSets, 
                              private AbstractExecutionEvent getExecutionEvent(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<Object> parameters) {
        AbstractExecutionEvent result;
        if (SQLType.DQL == sqlType) {
            result = new DQLExecutionEvent(baseStatementUnit.getSqlExecutionUnit().getDataSource(), baseStatementUnit.getSqlExecutionUnit().getSql(), parameters);
        } else {
            result = new DMLExecutionEvent(baseStatementUnit.getSqlExecutionUnit().getDataSource(), baseStatementUnit.getSqlExecutionUnit().getSql(), parameters);
        }
        return result;
    }final ExecuteCallback<T> executeCallback, 
                          final boolean isExceptionThrown, final Map<String, Object> dataMap) throws Exception {
        synchronized (baseStatementUnit.getStatement().getConnection()) {
            T result;
            ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
            ExecutorDataMap.setDataMap(dataMap);
              // 設定事件集合
            List<AbstractExecutionEvent> events = new LinkedList<>();
            if (parameterSets.isEmpty()) {
                  // 如果引數為空則放入一個空的引數集合,並且獲取執行事件
                events.add(getExecutionEvent(sqlType, baseStatementUnit, Collections.emptyList()));
            }
            for (List<Object> each : parameterSets) {
                  // 將有引數的構建一個執行事件,放入集合
                events.add(getExecutionEvent(sqlType, baseStatementUnit, each));
            }
            for (AbstractExecutionEvent event : events) {
                  // 釋出執行事件,用於監聽事件的執行結果
                EventBusInstance.getInstance().post(event);
            }
            try {
                  // SQL 執行
                result = executeCallback.execute(baseStatementUnit);
            } catch (final SQLException ex) {
                  // SQL執行 出現異常
                for (AbstractExecutionEvent each : events) {
                      // 迴圈事件,將事件的執行結果設定為失敗
                    each.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE);
                    each.setException(Optional.of(ex));
                      // 釋出事件
                    EventBusInstance.getInstance().post(each);
                    // 收集異常
                    ExecutorExceptionHandler.handleException(ex);
                }
                return null;
            }
            for (AbstractExecutionEvent each : events) {
                  // 執行的很成功,修改事件的執行狀態為SUCCESS
                each.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS);
                  // 釋出
                EventBusInstance.getInstance().post(each);
            }
            return result;
        }
    }
private AbstractExecutionEvent getExecutionEvent(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<Object> parameters) {
        AbstractExecutionEvent result;
        if (SQLType.DQL == sqlType) { //查詢語句
            result = new DQLExecutionEvent(baseStatementUnit.getSqlExecutionUnit().getDataSource(), baseStatementUnit.
                                           getSqlExecutionUnit().getSql(), parameters);
        } else { //其他操作
            result = new DMLExecutionEvent(baseStatementUnit.getSqlExecutionUnit().getDataSource(), baseStatementUnit.
                                           getSqlExecutionUnit().getSql(), parameters);
        }
        return result;
    }

SQLType

public enum SQLType {

    /**
     * 查詢語句
     * 
     * <p>Such as {@code SELECT}.</p>
     */
    DQL,

    /**
     * 增加,修改,刪除語句
     *
     * <p>Such as {@code INSERT}, {@code UPDATE}, {@code DELETE}.</p>
     */
    DML,

    /**
     * CREATE , ALTER ,DROP ,TRUNCATE操作
     *
     * <p>Such as {@code CREATE}, {@code ALTER}, {@code DROP}, {@code TRUNCATE}.</p>
     */
    DDL
}