資料庫路由中介軟體MyCat - 原始碼篇(12)
阿新 • • 發佈:2018-11-07
此文已由作者張鎬薪授權網易雲社群釋出。
歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。
NodeList ruleNodes = e.getElementsByTagName("rule"); int length = ruleNodes.getLength(); if (length > 1) { throw new ConfigException("only one rule can defined :" + name); } //目前只處理第一個,未來可能有多列複合邏輯需求 //RuleConfig是儲存著rule與function對應關係的物件 RuleConfig rule = loadRule((Element) ruleNodes.item(0)); String funName = rule.getFunctionName(); //判斷function是否存在,獲取function AbstractPartitionAlgorithm func = functions.get(funName); if (func == null) { throw new ConfigException("can't find function of name :" + funName); } rule.setRuleAlgorithm(func); //儲存到tableRules tableRules.put(name, new TableRuleConfig(name, rule)); } } }
這樣,所有的tableRule和function就載入完畢。儲存在一個變數中,就是tableRules:XMLRuleLoader.java:
private final Map<String, TableRuleConfig> tableRules;
4.2 schema.xml:
public XMLSchemaLoader(String schemaFile, String ruleFile) { //先讀取rule.xml XMLRuleLoader ruleLoader = new XMLRuleLoader(ruleFile); //將tableRules拿出,用於這裡載入Schema做rule有效判斷,以及之後的分片路由計算 this.tableRules = ruleLoader.getTableRules(); //釋放ruleLoader ruleLoader = null; this.dataHosts = new HashMap<String, DataHostConfig>(); this.dataNodes = new HashMap<String, DataNodeConfig>(); this.schemas = new HashMap<String, SchemaConfig>(); //讀取載入schema配置 this.load(DEFAULT_DTD, schemaFile == null ? DEFAULT_XML : schemaFile); }private void load(String dtdFile, String xmlFile) { InputStream dtd = null; InputStream xml = null; try { dtd = XMLSchemaLoader.class.getResourceAsStream(dtdFile); xml = XMLSchemaLoader.class.getResourceAsStream(xmlFile); Element root = ConfigUtil.getDocument(dtd, xml).getDocumentElement(); //先載入所有的DataHost loadDataHosts(root); //再載入所有的DataNode loadDataNodes(root); //最後載入所有的Schema loadSchemas(root); } catch (ConfigException e) { throw e; } catch (Exception e) { throw new ConfigException(e); } finally { if (dtd != null) { try { dtd.close(); } catch (IOException e) { } } if (xml != null) { try { xml.close(); } catch (IOException e) { } } } }
先看下DataHostConfig這個類的結構: XMLSchemaLoader.java:
private void loadDataHosts(Element root) { NodeList list = root.getElementsByTagName("dataHost"); for (int i = 0, n = list.getLength(); i < n; ++i) { Element element = (Element) list.item(i); String name = element.getAttribute("name"); //判斷是否重複 if (dataHosts.containsKey(name)) { throw new ConfigException("dataHost name " + name + "duplicated!"); } //讀取最大連線數 int maxCon = Integer.valueOf(element.getAttribute("maxCon")); //讀取最小連線數 int minCon = Integer.valueOf(element.getAttribute("minCon")); /** * 讀取負載均衡配置 * 1. balance="0", 不開啟分離機制,所有讀操作都發送到當前可用的 writeHost 上。 * 2. balance="1",全部的 readHost 和 stand by writeHost 參不 select 的負載均衡 * 3. balance="2",所有讀操作都隨機的在 writeHost、readhost 上分發。 * 4. balance="3",所有讀請求隨機的分發到 wiriterHost 對應的 readhost 執行,writerHost 不負擔讀壓力 */ int balance = Integer.valueOf(element.getAttribute("balance")); /** * 讀取切換型別 * -1 表示不自動切換 * 1 預設值,自動切換 * 2 基於MySQL主從同步的狀態決定是否切換 * 心跳詢句為 show slave status * 3 基於 MySQL galary cluster 的切換機制 */ String switchTypeStr = element.getAttribute("switchType"); int switchType = switchTypeStr.equals("") ? -1 : Integer.valueOf(switchTypeStr); //讀取從延遲界限 String slaveThresholdStr = element.getAttribute("slaveThreshold"); int slaveThreshold = slaveThresholdStr.equals("") ? -1 : Integer.valueOf(slaveThresholdStr); //如果 tempReadHostAvailable 設定大於 0 則表示寫主機如果掛掉, 臨時的讀服務依然可用 String tempReadHostAvailableStr = element.getAttribute("tempReadHostAvailable"); boolean tempReadHostAvailable = tempReadHostAvailableStr.equals("") ? false : Integer.valueOf(tempReadHostAvailableStr) > 0; /** * 讀取 寫型別 * 這裡只支援 0 - 所有寫操作僅配置的第一個 writeHost */ String writeTypStr = element.getAttribute("writeType"); int writeType = "".equals(writeTypStr) ? PhysicalDBPool.WRITE_ONLYONE_NODE : Integer.valueOf(writeTypStr); String dbDriver = element.getAttribute("dbDriver"); String dbType = element.getAttribute("dbType"); String filters = element.getAttribute("filters"); String logTimeStr = element.getAttribute("logTime"); long logTime = "".equals(logTimeStr) ? PhysicalDBPool.LONG_TIME : Long.valueOf(logTimeStr) ; //讀取心跳語句 String heartbeatSQL = element.getElementsByTagName("heartbeat").item(0).getTextContent(); //讀取 初始化sql配置,用於oracle NodeList connectionInitSqlList = element.getElementsByTagName("connectionInitSql"); String initConSQL = null; if (connectionInitSqlList.getLength() > 0) { initConSQL = connectionInitSqlList.item(0).getTextContent(); } //讀取writeHost NodeList writeNodes = element.getElementsByTagName("writeHost"); DBHostConfig[] writeDbConfs = new DBHostConfig[writeNodes.getLength()]; Map<Integer, DBHostConfig[]> readHostsMap = new HashMap<Integer, DBHostConfig[]>(2); for (int w = 0; w < writeDbConfs.length; w++) { Element writeNode = (Element) writeNodes.item(w); writeDbConfs[w] = createDBHostConf(name, writeNode, dbType, dbDriver, maxCon, minCon,filters,logTime); NodeList readNodes = writeNode.getElementsByTagName("readHost"); //讀取對應的每一個readHost if (readNodes.getLength() != 0) { DBHostConfig[] readDbConfs = new DBHostConfig[readNodes.getLength()]; for (int r = 0; r < readDbConfs.length; r++) { Element readNode = (Element) readNodes.item(r); readDbConfs[r] = createDBHostConf(name, readNode, dbType, dbDriver, maxCon, minCon,filters, logTime); } readHostsMap.put(w, readDbConfs); } } DataHostConfig hostConf = new DataHostConfig(name, dbType, dbDriver, writeDbConfs, readHostsMap, switchType, slaveThreshold, tempReadHostAvailable); hostConf.setMaxCon(maxCon); hostConf.setMinCon(minCon); hostConf.setBalance(balance); hostConf.setWriteType(writeType); hostConf.setHearbeatSQL(heartbeatSQL); hostConf.setConnectionInitSql(initConSQL); hostConf.setFilters(filters); hostConf.setLogTime(logTime); dataHosts.put(hostConf.getName(), hostConf); } }
先讀取每個DataHost的通用配置,之後讀取每個DataHost對應的writeHost以及每個writeHost對應的readHost。配置好後,儲存在:
private final Map<String, DataHostConfig> dataHosts;
之後讀取載入DataHost: XMLSchemaLoader.java:
private void loadDataNodes(Element root) { //讀取DataNode分支 NodeList list = root.getElementsByTagName("dataNode"); for (int i = 0, n = list.getLength(); i < n; i++) { Element element = (Element) list.item(i); String dnNamePre = element.getAttribute("name"); String databaseStr = element.getAttribute("database"); String host = element.getAttribute("dataHost"); //字串不為空 if (empty(dnNamePre) || empty(databaseStr) || empty(host)) { throw new ConfigException("dataNode " + dnNamePre + " define error ,attribute can't be empty"); } //dnNames(name),databases(database),hostStrings(dataHost)都可以配置多個,以',', '$', '-'區分,但是需要保證database的個數*dataHost的個數=name的個數 //多個dataHost與多個database如果寫在一個標籤,則每個dataHost擁有所有database //例如:<dataNode name="dn1$0-75" dataHost="localhost$1-10" database="db$0-759" /> //則為:localhost1擁有dn1$0-75,localhost2也擁有dn1$0-75(對應db$76-151) String[] dnNames = io.mycat.util.SplitUtil.split(dnNamePre, ',', '$', '-'); String[] databases = io.mycat.util.SplitUtil.split(databaseStr, ',', '$', '-'); String[] hostStrings = io.mycat.util.SplitUtil.split(host, ',', '$', '-'); if (dnNames.length > 1 && dnNames.length != databases.length * hostStrings.length) { throw new ConfigException("dataNode " + dnNamePre + " define error ,dnNames.length must be=databases.length*hostStrings.length"); } if (dnNames.length > 1) { List<String[]> mhdList = mergerHostDatabase(hostStrings, databases); for (int k = 0; k < dnNames.length; k++) { String[] hd = mhdList.get(k); String dnName = dnNames[k]; String databaseName = hd[1]; String hostName = hd[0]; createDataNode(dnName, databaseName, hostName); } } else { createDataNode(dnNamePre, databaseStr, host); } } }private void createDataNode(String dnName, String database, String host) { DataNodeConfig conf = new DataNodeConfig(dnName, database, host); if (dataNodes.containsKey(conf.getName())) { throw new ConfigException("dataNode " + conf.getName() + " duplicated!"); } if (!dataHosts.containsKey(host)) { throw new ConfigException("dataNode " + dnName + " reference dataHost:" + host + " not exists!"); } dataNodes.put(conf.getName(), conf); }
生成的是DataNode類,放入:
private final Map<String, DataNodeConfig> dataNodes;
更多網易技術、產品、運營經驗分享請點選。
相關文章:
【推薦】 react-native自定義原生元件