數據庫路由中間件MyCat - 源代碼篇(17)
此文已由作者張鎬薪授權網易雲社區發布。
歡迎訪問網易雲社區,了解更多網易技術產品運營經驗。
調用processInsert(sc,schema,sqlType,origSQL,tableName,primaryKey):
public static boolean processInsert(ServerConnection sc,SchemaConfig schema, int sqlType,String origSQL,String tableName,String primaryKey) throws SQLNonTransientException { int firstLeftBracketIndex = origSQL.indexOf("("); int firstRightBracketIndex = origSQL.indexOf(")"); String upperSql = origSQL.toUpperCase(); int valuesIndex = upperSql.indexOf("VALUES"); int selectIndex = upperSql.indexOf("SELECT"); int fromIndex = upperSql.indexOf("FROM"); //屏蔽insert into table1 select * from table2語句 if(firstLeftBracketIndex < 0) { String msg = "invalid sql:" + origSQL; LOGGER.warn(msg); throw new SQLNonTransientException(msg); } //屏蔽批量插入 if(selectIndex > 0 &&fromIndex>0&&selectIndex>firstRightBracketIndex&&valuesIndex<0) { String msg = "multi insert not provided" ; LOGGER.warn(msg); throw new SQLNonTransientException(msg); } //插入語句必須提供列結構,因為MyCat默認對於表結構無感知 if(valuesIndex + "VALUES".length() <= firstLeftBracketIndex) { throw new SQLSyntaxErrorException("insert must provide ColumnList"); } //如果主鍵不在插入語句的fields中,則需要進一步處理 boolean processedInsert=!isPKInFields(origSQL,primaryKey,firstLeftBracketIndex,firstRightBracketIndex); if(processedInsert){ processInsert(sc,schema,sqlType,origSQL,tableName,primaryKey,firstLeftBracketIndex+1,origSQL.indexOf(‘(‘,firstRightBracketIndex)+1); } return processedInsert; }
對於主鍵不在插入語句的fields中的SQL,需要改寫。比如hotnews主鍵為id,插入語句為:
insert into hotnews(title) values(‘aaa‘);
需要改寫成:
insert into hotnews(id, title) values(next value for MYCATSEQ_hotnews,‘aaa‘);
這個在下面這個函數實現:
private static void processInsert(ServerConnection sc, SchemaConfig schema, int sqlType, String origSQL, String tableName, String primaryKey, int afterFirstLeftBracketIndex, int afterLastLeftBracketIndex) { int primaryKeyLength = primaryKey.length(); int insertSegOffset = afterFirstLeftBracketIndex; String mycatSeqPrefix = "next value for MYCATSEQ_"; int mycatSeqPrefixLength = mycatSeqPrefix.length(); int tableNameLength = tableName.length(); char[] newSQLBuf = new char[origSQL.length() + primaryKeyLength + mycatSeqPrefixLength + tableNameLength + 2]; origSQL.getChars(0, afterFirstLeftBracketIndex, newSQLBuf, 0); primaryKey.getChars(0, primaryKeyLength, newSQLBuf, insertSegOffset); insertSegOffset += primaryKeyLength; newSQLBuf[insertSegOffset] = ‘,‘; insertSegOffset++; origSQL.getChars(afterFirstLeftBracketIndex, afterLastLeftBracketIndex, newSQLBuf, insertSegOffset); insertSegOffset += afterLastLeftBracketIndex - afterFirstLeftBracketIndex; mycatSeqPrefix.getChars(0, mycatSeqPrefixLength, newSQLBuf, insertSegOffset); insertSegOffset += mycatSeqPrefixLength; tableName.getChars(0, tableNameLength, newSQLBuf, insertSegOffset); insertSegOffset += tableNameLength; newSQLBuf[insertSegOffset] = ‘,‘; insertSegOffset++; origSQL.getChars(afterLastLeftBracketIndex, origSQL.length(), newSQLBuf, insertSegOffset); processSQL(sc, schema, new String(newSQLBuf), sqlType); }
最後的processSQL(sc, schema, new String(newSQLBuf), sqlType);是將語句放入執行隊列:
這裏MyCat考慮NIO線程吞吐量以及全局ID生成線程安全的問題,使用如下流程執行需要全局ID的SQL insert語句。
processSQL(sc, schema, new String(newSQLBuf), sqlType):
SessionSQLPair sessionSQLPair = new SessionSQLPair(sc.getSession2(), schema, sql, sqlType); MycatServer.getInstance().getSequnceProcessor().addNewSql(sessionSQLPair);
5.4 DDL語句路由
可以分為兩步,整體源代碼:
public static RouteResultset routeToDDLNode(RouteResultset rrs, int sqlType, String stmt,SchemaConfig schema) throws SQLSyntaxErrorException { stmt = getFixedSql(stmt); String tablename = ""; final String upStmt = stmt.toUpperCase(); if(upStmt.startsWith("CREATE")){ if (upStmt.contains("CREATE INDEX ")){ tablename = RouterUtil.getTableName(stmt, RouterUtil.getCreateIndexPos(upStmt, 0)); }else tablename = RouterUtil.getTableName(stmt, RouterUtil.getCreateTablePos(upStmt, 0)); }else if(upStmt.startsWith("DROP")){ if (upStmt.contains("DROP INDEX ")){ tablename = RouterUtil.getTableName(stmt, RouterUtil.getDropIndexPos(upStmt, 0)); }else tablename = RouterUtil.getTableName(stmt, RouterUtil.getDropTablePos(upStmt, 0)); }else if(upStmt.startsWith("ALTER")){ tablename = RouterUtil.getTableName(stmt, RouterUtil.getAlterTablePos(upStmt, 0)); }else if (upStmt.startsWith("TRUNCATE")){ tablename = RouterUtil.getTableName(stmt, RouterUtil.getTruncateTablePos(upStmt, 0)); } tablename = tablename.toUpperCase(); if (schema.getTables().containsKey(tablename)){ if(ServerParse.DDL==sqlType){ List<String> dataNodes = new ArrayList<>(); Map<String, TableConfig> tables = schema.getTables(); TableConfig tc; if (tables != null && (tc = tables.get(tablename)) != null) { dataNodes = tc.getDataNodes(); } Iterator<String> iterator1 = dataNodes.iterator(); int nodeSize = dataNodes.size(); RouteResultsetNode[] nodes = new RouteResultsetNode[nodeSize]; for(int i=0;i<nodeSize;i++){ String name = iterator1.next(); nodes[i] = new RouteResultsetNode(name, sqlType, stmt); } rrs.setNodes(nodes); } return rrs; }else if(schema.getDataNode()!=null){ //默認節點ddl RouteResultsetNode[] nodes = new RouteResultsetNode[1]; nodes[0] = new RouteResultsetNode(schema.getDataNode(), sqlType, stmt); rrs.setNodes(nodes); return rrs; } //both tablename and defaultnode null LOGGER.error("table not in schema----"+tablename); throw new SQLSyntaxErrorException("op table not in schema----"+tablename); }
首先,獲取表名,步驟如下:
拿一個獲取表名的函數舉例:
/** * 獲取語句中前關鍵字位置和占位個數表名位置 * * @param upStmt * 執行語句 * @param start * 開始位置 * @return int[]關鍵字位置和占位個數 * @author aStoneGod */public static int[] getCreateIndexPos(String upStmt, int start) { String token1 = "CREATE "; String token2 = " INDEX "; String token3 = " ON "; int createInd = upStmt.indexOf(token1, start); int idxInd = upStmt.indexOf(token2, start); int onInd = upStmt.indexOf(token3, start); // 既包含CREATE又包含INDEX,且CREATE關鍵字在INDEX關鍵字之前, 且包含ON... if (createInd >= 0 && idxInd > 0 && idxInd > createInd && onInd > 0 && onInd > idxInd) { return new int[] {onInd , token3.length() }; } else { return new int[] { -1, token2.length() };// 不滿足條件時,只關註第一個返回值為-1,第二個任意 } }
然後,根據表名獲取配置進行路由:
默認語句路由
對於有默認節點的schema,且不是show, describe, select @@之類的語句,則路由到默認的節點上。 對於show, describe, select @@之類的語句,利用查詢信息路由方法算出路由。接下來,取一個舉例,對於Show語句:analyseShowSQL(schema, rrs, stmt)方法
5.5 AST語義解析路由
首先我們看一下MySQL的SQL解析步驟(硬解析和軟解析):MyCat的機制,仿照MySQL的,可以總結為:這裏我們可以總結一個優化思路,就是通過仿照MySQL物理優化原理(定時更新表配置,報表信息),來做進一步MyCat查詢的優化。語義解析基本過程:
1.詞法分析(一般抽象都叫Lexer):不同的關鍵詞有不同的含義
select concat(id,‘_‘,name),value from student where value>60 order by value
詞法分析的輸出,就是一句帶上詞義的語句:
(select: Keyword) (concat: Keyword)((: LB)…… (from: keyword) (student: identifier)
2.語法分析:
分析關鍵詞之間的聯系,生成表達式(expression)
基本語法正確性判斷(比如from這個keyword之後必須緊跟一個表名(就是一個identifier))
3.生成AST語意樹(完整解析的statement)根據MyCat權威指南,DruidParser比其他Parser快很多很多。 快在哪裏呢?主要是抽象靜態化的粒度,拿jsqlparser和druidparser對比。 這兩個parser都遵從了上面的步驟,對於詞(lexer),表達式(expression)和語句AST(statement)都有抽象。 但是對於語句AST(statement)的抽象, DruidParser做的粒度更細。如下圖對於Alter語句的對比:所以,不難推測為啥DruidParser快了
免費體驗雲安全(易盾)內容安全、驗證碼等服務
更多網易技術、產品、運營經驗分享請點擊。
相關文章:
【推薦】 windows系統下npm升級的正確姿勢以及原理
數據庫路由中間件MyCat - 源代碼篇(17)