1. 程式人生 > >資料庫路由中介軟體MyCat - 原始碼篇(16)

資料庫路由中介軟體MyCat - 原始碼篇(16)

此文已由作者張鎬薪授權網易雲社群釋出。

歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。

5. 路由模組

真正取得RouteResultset的步驟:AbstractRouteStrategy的route方法:這裡寫圖片描述對應原始碼:

public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema, int sqlType, String origSQL,
            String charset, ServerConnection sc, LayerCachePool cachePool) throws SQLNonTransientException {    /**
     * 處理一些路由之前的邏輯
     * 全域性序列號,父子表插入
     */
    if ( beforeRouteProcess(schema, sqlType, origSQL, sc) )        return null;    /**
     * SQL 語句攔截
     */
    String stmt = MycatServer.getInstance().getSqlInterceptor().interceptSQL(origSQL, sqlType);    if (origSQL != stmt && LOGGER.isDebugEnabled()) {
        LOGGER.debug("sql intercepted to " + stmt + " from " + origSQL);
    }    //對應schema標籤checkSQLschema屬性,把表示schema的字元去掉
    if (schema.isCheckSQLSchema()) {
        stmt = RouterUtil.removeSchema(stmt, schema.getName());
    }

    RouteResultset rrs = new RouteResultset(stmt, sqlType);    /**
     * 優化debug loaddata輸出cache的日誌會極大降低效能
     */
    if (LOGGER.isDebugEnabled() && origSQL.startsWith(LoadData.loadDataHint)) {
        rrs.setCacheAble(false);
    }       /**
        * rrs攜帶ServerConnection的autocommit狀態用於在sql解析的時候遇到
        * select ... for update的時候動態設定RouteResultsetNode的canRunInReadDB屬性
        */
    if (sc != null ) {
        rrs.setAutocommit(sc.isAutocommit());
    }    /**
     * DDL 語句的路由
     */
    if (ServerParse.DDL == sqlType) {        return RouterUtil.routeToDDLNode(rrs, sqlType, stmt, schema);
    }    /**
     * 檢查是否有分片
     */
    if (schema.isNoSharding() && ServerParse.SHOW != sqlType) {
        rrs = RouterUtil.routeToSingleNode(rrs, schema.getDataNode(), stmt);
    } else {
        RouteResultset returnedSet = routeSystemInfo(schema, sqlType, stmt, rrs);        if (returnedSet == null) {
            rrs = routeNormalSqlWithAST(schema, stmt, rrs, charset, cachePool);
        }
    }    return rrs;
}


5.3 路由之前的邏輯 - 判斷子表插入以及全域性序列號的生成:


AbstractRouteStrategy.java

/**
 * 路由之前必要的處理
 * 主要是全域性序列號插入,還有子表插入
 */private boolean beforeRouteProcess(SchemaConfig schema, int sqlType, String origSQL, ServerConnection sc)
        throws SQLNonTransientException {    return RouterUtil.processWithMycatSeq(schema, sqlType, origSQL, sc)
            || (sqlType == ServerParse.INSERT && RouterUtil.processERChildTable(schema, origSQL, sc))
            || (sqlType == ServerParse.INSERT && RouterUtil.processInsert(schema, sqlType, origSQL, sc));
}


這裡利用了Java的一個特性,||表示式,前半部分如果為真,則後半部分不會被執行。首先執行RouterUtil.processWithMycatSeq(schema, sqlType, origSQL, sc),這個方法是判斷是否是顯示使用全域性序列號的sql語句,比如像:insert into table1(id,name) values(next value for MYCATSEQ_GLOBAL,‘test’);對於這樣的語句處理是先將改寫next value for MYCATSEQ_GLOBAL 為呼叫全域性ID生成的ID,之後進入AST語句解析路由

如果不是,則執行(sqlType == ServerParse.INSERT && RouterUtil.processERChildTable(schema, origSQL, sc)),這個方法判斷是否是子表插入:部分程式碼:

String tableName = StringUtil.getTableName(origSQL).toUpperCase();final TableConfig tc = schema.getTables().get(tableName);//判斷是否為子表,如果不是,只會返回falseif (null != tc && tc.isChildTable()) {final RouteResultset rrs = new RouteResultset(origSQL, ServerParse.INSERT);
String joinKey = tc.getJoinKey();//因為是Insert語句,用MySqlInsertStatement進行parseMySqlInsertStatement insertStmt = (MySqlInsertStatement) (new MySqlStatementParser(origSQL)).parseInsert();
......


這裡注意,所有型別的SQL語句都有druid對應的SQLparser,比如說這裡的插入語句就用MySqlInsertStatement解析。druidparser在這節先不講,會在 AST語義解析路由中詳細講述。

接上面程式碼:

//判斷條件完整性,取得解析後語句列中的joinkey列的index
    int joinKeyIndex = getJoinKeyIndex(insertStmt.getColumns(), joinKey);    if (joinKeyIndex == -1) {
        String inf = "joinKey not provided :" + tc.getJoinKey() + "," + insertStmt;
        LOGGER.warn(inf);        throw new SQLNonTransientException(inf);
    }    //子表不支援批量插入
    if (isMultiInsert(insertStmt)) {
        String msg = "ChildTable multi insert not provided";
        LOGGER.warn(msg);        throw new SQLNonTransientException(msg);
    }    //取得joinkey的值
    String joinKeyVal = insertStmt.getValues().getValues().get(joinKeyIndex).toString();

    String sql = insertStmt.toString();    // try to route by ER parent partion key
    //如果是二級子表(父表不再有父表),並且分片欄位正好是joinkey欄位,呼叫routeByERParentKey
    RouteResultset theRrs = RouterUtil.routeByERParentKey(sc, schema, ServerParse.INSERT, sql, rrs, tc, joinKeyVal);    if (theRrs != null) {        boolean processedInsert=false;        //判斷是否需要全域性序列號
              if ( sc!=null && tc.isAutoIncrement()) {
                  String primaryKey = tc.getPrimaryKey();
                  processedInsert=processInsert(sc,schema,ServerParse.INSERT,sql,tc.getName(),primaryKey);
              }              if(processedInsert==false){
                  rrs.setFinishedRoute(true);
                  sc.getSession2().execute(rrs, ServerParse.INSERT);
              }        return true;
    }    // route by sql query root parent's datanode
    //如果不是二級子表或者分片欄位不是joinKey欄位結果為空,則啟動非同步執行緒去後臺分片查詢出datanode
    //只要查詢出上一級表的parentkey欄位的對應值在哪個分片即可
    final String findRootTBSql = tc.getLocateRTableKeySql().toLowerCase() + joinKeyVal;    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug("find root parent's node sql " + findRootTBSql);
    }

    ListenableFuture<String> listenableFuture = MycatServer.getInstance().
            getListeningExecutorService().submit(new Callable<String>() {        @Override
        public String call() throws Exception {
            FetchStoreNodeOfChildTableHandler fetchHandler = new FetchStoreNodeOfChildTableHandler();            return fetchHandler.execute(schema.getName(), findRootTBSql, tc.getRootParent().getDataNodes());
        }
    });


    Futures.addCallback(listenableFuture, new FutureCallback<String>() {        @Override
        public void onSuccess(String result) {            //結果為空,證明上一級表中不存在那條記錄,失敗
            if (Strings.isNullOrEmpty(result)) {
                StringBuilder s = new StringBuilder();
                LOGGER.warn(s.append(sc.getSession2()).append(origSQL).toString() +                        " err:" + "can't find (root) parent sharding node for sql:" + origSQL);
                sc.writeErrMessage(ErrorCode.ER_PARSE_ERROR, "can't find (root) parent sharding node for sql:" + origSQL);                return;
            }            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("found partion node for child table to insert " + result + " sql :" + origSQL);
            }            //找到分片,進行插入(和其他的一樣,需要判斷是否需要全域性自增ID)
            boolean processedInsert=false;                  if ( sc!=null && tc.isAutoIncrement()) {                      try {
                          String primaryKey = tc.getPrimaryKey();
                    processedInsert=processInsert(sc,schema,ServerParse.INSERT,origSQL,tc.getName(),primaryKey);
                } catch (SQLNonTransientException e) {
                    LOGGER.warn("sequence processInsert error,",e);
                    sc.writeErrMessage(ErrorCode.ER_PARSE_ERROR , "sequence processInsert error," + e.getMessage());
                }
                  }                  if(processedInsert==false){
                      RouteResultset executeRrs = RouterUtil.routeToSingleNode(rrs, result, origSQL);
                      sc.getSession2().execute(executeRrs, ServerParse.INSERT);
                  }

        }        @Override
        public void onFailure(Throwable t) {
            StringBuilder s = new StringBuilder();
            LOGGER.warn(s.append(sc.getSession2()).append(origSQL).toString() +                    " err:" + t.getMessage());
            sc.writeErrMessage(ErrorCode.ER_PARSE_ERROR, t.getMessage() + " " + s.toString());
        }
    }, MycatServer.getInstance().
            getListeningExecutorService());    return true;
}return false;

如果返回false,則繼續執行(sqlType == ServerParse.INSERT && RouterUtil.processInsert(schema, sqlType, origSQL, sc)) 這個是處理一般的SQL插入語句,將其中的自增主鍵欄位的值改寫成內建的全域性ID生成器生成的id。RouterUtil.java:

public static boolean processInsert(SchemaConfig schema, int sqlType,
                                        String origSQL, ServerConnection sc) throws SQLNonTransientException {
    String tableName = StringUtil.getTableName(origSQL).toUpperCase();
    TableConfig tableConfig = schema.getTables().get(tableName);    boolean processedInsert=false;    //判斷是有自增欄位
    if (null != tableConfig && tableConfig.isAutoIncrement()) {
        String primaryKey = tableConfig.getPrimaryKey();
        processedInsert=processInsert(sc,schema,sqlType,origSQL,tableName,primaryKey);
    }    return processedInsert;
}


免費體驗雲安全(易盾)內容安全、驗證碼等服務

更多網易技術、產品、運營經驗分享請點選




相關文章:
【推薦】 為何要在網站上設定的驗證碼
【推薦】 如何玩轉基於風險的測試