1. 程式人生 > >數據庫路由中間件MyCat - 源代碼篇(17)

數據庫路由中間件MyCat - 源代碼篇(17)

實現 -i val sel etc null 輸出 批量插入 位置

此文已由作者張鎬薪授權網易雲社區發布。

歡迎訪問網易雲社區,了解更多網易技術產品運營經驗。


調用processInsert(sc,schema,sqlType,origSQL,tableName,primaryKey):


public static boolean processInsert(ServerConnection sc,SchemaConfig schema,            int sqlType,String origSQL,String tableName,String primaryKey) throws SQLNonTransientException {    int firstLeftBracketIndex = origSQL.indexOf("(");    int firstRightBracketIndex = origSQL.indexOf(")");
    String upperSql = origSQL.toUpperCase();    int valuesIndex = upperSql.indexOf("VALUES");    int selectIndex = upperSql.indexOf("SELECT");    int fromIndex = upperSql.indexOf("FROM");    //屏蔽insert into table1 select * from table2語句
    if(firstLeftBracketIndex < 0) {
        String msg = "invalid sql:" + origSQL;
        LOGGER.warn(msg);        throw new SQLNonTransientException(msg);
    }    //屏蔽批量插入
    if(selectIndex > 0 &&fromIndex>0&&selectIndex>firstRightBracketIndex&&valuesIndex<0) {
        String msg = "multi insert not provided" ;
        LOGGER.warn(msg);        throw new SQLNonTransientException(msg);
    }    //插入語句必須提供列結構,因為MyCat默認對於表結構無感知
    if(valuesIndex + "VALUES".length() <= firstLeftBracketIndex) {        throw new SQLSyntaxErrorException("insert must provide ColumnList");
    }    //如果主鍵不在插入語句的fields中,則需要進一步處理
    boolean processedInsert=!isPKInFields(origSQL,primaryKey,firstLeftBracketIndex,firstRightBracketIndex);    if(processedInsert){
        processInsert(sc,schema,sqlType,origSQL,tableName,primaryKey,firstLeftBracketIndex+1,origSQL.indexOf(‘(‘,firstRightBracketIndex)+1);
    }    return processedInsert;
}


對於主鍵不在插入語句的fields中的SQL,需要改寫。比如hotnews主鍵為id,插入語句為:

insert into hotnews(title) values(‘aaa‘);

需要改寫成:

insert into hotnews(id, title) values(next value for MYCATSEQ_hotnews,‘aaa‘);

這個在下面這個函數實現:

private static void processInsert(ServerConnection sc, SchemaConfig schema, int sqlType, String origSQL,
            String tableName, String primaryKey, int afterFirstLeftBracketIndex, int afterLastLeftBracketIndex) {    int primaryKeyLength = primaryKey.length();    int insertSegOffset = afterFirstLeftBracketIndex;
    String mycatSeqPrefix = "next value for MYCATSEQ_";    int mycatSeqPrefixLength = mycatSeqPrefix.length();    int tableNameLength = tableName.length();    char[] newSQLBuf = new char[origSQL.length() + primaryKeyLength + mycatSeqPrefixLength + tableNameLength + 2];
    origSQL.getChars(0, afterFirstLeftBracketIndex, newSQLBuf, 0);
    primaryKey.getChars(0, primaryKeyLength, newSQLBuf, insertSegOffset);
    insertSegOffset += primaryKeyLength;
    newSQLBuf[insertSegOffset] = ‘,‘;
    insertSegOffset++;
    origSQL.getChars(afterFirstLeftBracketIndex, afterLastLeftBracketIndex, newSQLBuf, insertSegOffset);
    insertSegOffset += afterLastLeftBracketIndex - afterFirstLeftBracketIndex;
    mycatSeqPrefix.getChars(0, mycatSeqPrefixLength, newSQLBuf, insertSegOffset);
    insertSegOffset += mycatSeqPrefixLength;
    tableName.getChars(0, tableNameLength, newSQLBuf, insertSegOffset);
    insertSegOffset += tableNameLength;
    newSQLBuf[insertSegOffset] = ‘,‘;
    insertSegOffset++;
    origSQL.getChars(afterLastLeftBracketIndex, origSQL.length(), newSQLBuf, insertSegOffset);
    processSQL(sc, schema, new String(newSQLBuf), sqlType);
}

最後的processSQL(sc, schema, new String(newSQLBuf), sqlType);是將語句放入執行隊列: 這裏MyCat考慮NIO線程吞吐量以及全局ID生成線程安全的問題,使用如下流程執行需要全局ID的SQL insert語句。 processSQL(sc, schema, new String(newSQLBuf), sqlType):

SessionSQLPair sessionSQLPair = new SessionSQLPair(sc.getSession2(), schema, sql, sqlType);
MycatServer.getInstance().getSequnceProcessor().addNewSql(sessionSQLPair);


5.4 DDL語句路由

可以分為兩步,整體源代碼:

public static RouteResultset routeToDDLNode(RouteResultset rrs, int sqlType, String stmt,SchemaConfig schema) throws SQLSyntaxErrorException {
    stmt = getFixedSql(stmt);
    String tablename = "";        
    final String upStmt = stmt.toUpperCase();    if(upStmt.startsWith("CREATE")){        if (upStmt.contains("CREATE INDEX ")){
            tablename = RouterUtil.getTableName(stmt, RouterUtil.getCreateIndexPos(upStmt, 0));
        }else tablename = RouterUtil.getTableName(stmt, RouterUtil.getCreateTablePos(upStmt, 0));
    }else if(upStmt.startsWith("DROP")){        if (upStmt.contains("DROP INDEX ")){
            tablename = RouterUtil.getTableName(stmt, RouterUtil.getDropIndexPos(upStmt, 0));
        }else tablename = RouterUtil.getTableName(stmt, RouterUtil.getDropTablePos(upStmt, 0));
    }else if(upStmt.startsWith("ALTER")){
        tablename = RouterUtil.getTableName(stmt, RouterUtil.getAlterTablePos(upStmt, 0));
    }else if (upStmt.startsWith("TRUNCATE")){
        tablename = RouterUtil.getTableName(stmt, RouterUtil.getTruncateTablePos(upStmt, 0));
    }
    tablename = tablename.toUpperCase();    if (schema.getTables().containsKey(tablename)){        if(ServerParse.DDL==sqlType){
            List<String> dataNodes = new ArrayList<>();
            Map<String, TableConfig> tables = schema.getTables();
            TableConfig tc;            if (tables != null && (tc = tables.get(tablename)) != null) {
                dataNodes = tc.getDataNodes();
            }
            Iterator<String> iterator1 = dataNodes.iterator();            int nodeSize = dataNodes.size();
            RouteResultsetNode[] nodes = new RouteResultsetNode[nodeSize];            for(int i=0;i<nodeSize;i++){
                String name = iterator1.next();
                nodes[i] = new RouteResultsetNode(name, sqlType, stmt);
            }
            rrs.setNodes(nodes);
        }        return rrs;
    }else if(schema.getDataNode()!=null){        //默認節點ddl
        RouteResultsetNode[] nodes = new RouteResultsetNode[1];
        nodes[0] = new RouteResultsetNode(schema.getDataNode(), sqlType, stmt);
        rrs.setNodes(nodes);        return rrs;
    }    //both tablename and defaultnode null
    LOGGER.error("table not in schema----"+tablename);    throw new SQLSyntaxErrorException("op table not in schema----"+tablename);
}

首先,獲取表名,步驟如下:

拿一個獲取表名的函數舉例:

/**
 * 獲取語句中前關鍵字位置和占位個數表名位置
 *
 * @param upStmt
 *            執行語句
 * @param start
 *            開始位置
 * @return int[]關鍵字位置和占位個數
 * @author aStoneGod
 */public static int[] getCreateIndexPos(String upStmt, int start) {
    String token1 = "CREATE ";
    String token2 = " INDEX ";
    String token3 = " ON ";    int createInd = upStmt.indexOf(token1, start);    int idxInd = upStmt.indexOf(token2, start);    int onInd = upStmt.indexOf(token3, start);    // 既包含CREATE又包含INDEX,且CREATE關鍵字在INDEX關鍵字之前, 且包含ON...
    if (createInd >= 0 && idxInd > 0 && idxInd > createInd && onInd > 0 && onInd > idxInd) {        return new int[] {onInd , token3.length() };
    } else {        return new int[] { -1, token2.length() };// 不滿足條件時,只關註第一個返回值為-1,第二個任意
    }
}

然後,根據表名獲取配置進行路由:

默認語句路由

對於有默認節點的schema,且不是show, describe, select @@之類的語句,則路由到默認的節點上。 對於show, describe, select @@之類的語句,利用查詢信息路由方法算出路由。接下來,取一個舉例,對於Show語句:analyseShowSQL(schema, rrs, stmt)方法技術分享圖片

5.5 AST語義解析路由

首先我們看一下MySQL的SQL解析步驟(硬解析和軟解析):MyCat的機制,仿照MySQL的,可以總結為:這裏我們可以總結一個優化思路,就是通過仿照MySQL物理優化原理(定時更新表配置,報表信息),來做進一步MyCat查詢的優化。語義解析基本過程:

1.詞法分析(一般抽象都叫Lexer):不同的關鍵詞有不同的含義

select concat(id,‘_‘,name),value from student where value>60 order by value

技術分享圖片詞法分析的輸出,就是一句帶上詞義的語句:

(select: Keyword)  (concat: Keyword)((: LB)…… (from: keyword) (student: identifier)

2.語法分析:

  • 分析關鍵詞之間的聯系,生成表達式(expression)

  • 基本語法正確性判斷(比如from這個keyword之後必須緊跟一個表名(就是一個identifier))

3.生成AST語意樹(完整解析的statement)根據MyCat權威指南,DruidParser比其他Parser快很多很多。 快在哪裏呢?主要是抽象靜態化的粒度,拿jsqlparser和druidparser對比。 這兩個parser都遵從了上面的步驟,對於詞(lexer),表達式(expression)和語句AST(statement)都有抽象。 但是對於語句AST(statement)的抽象, DruidParser做的粒度更細。如下圖對於Alter語句的對比:所以,不難推測為啥DruidParser快了


免費體驗雲安全(易盾)內容安全、驗證碼等服務

更多網易技術、產品、運營經驗分享請點擊。




相關文章:
【推薦】 windows系統下npm升級的正確姿勢以及原理

數據庫路由中間件MyCat - 源代碼篇(17)