1. 程式人生 > >資料庫路由中介軟體MyCat - 原始碼篇(12)

資料庫路由中介軟體MyCat - 原始碼篇(12)

此文已由作者張鎬薪授權網易雲社群釋出。

歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。


NodeList ruleNodes = e.getElementsByTagName("rule");
            int length = ruleNodes.getLength();
            if (length > 1) {
                throw new ConfigException("only one rule can defined :"
                        + name);
            }
            //目前只處理第一個,未來可能有多列複合邏輯需求
            //RuleConfig是儲存著rule與function對應關係的物件
            RuleConfig rule = loadRule((Element) ruleNodes.item(0));
            String funName = rule.getFunctionName();
            //判斷function是否存在,獲取function
            AbstractPartitionAlgorithm func = functions.get(funName);
            if (func == null) {
                throw new ConfigException("can't find function of name :"
                        + funName);
            }
            rule.setRuleAlgorithm(func);
            //儲存到tableRules
            tableRules.put(name, new TableRuleConfig(name, rule));
        }
    }
}

這樣,所有的tableRule和function就載入完畢。儲存在一個變數中,就是tableRules:XMLRuleLoader.java:

private final Map<String, TableRuleConfig> tableRules;


4.2 schema.xml:

public XMLSchemaLoader(String schemaFile, String ruleFile) {    //先讀取rule.xml
    XMLRuleLoader ruleLoader = new XMLRuleLoader(ruleFile);    //將tableRules拿出,用於這裡載入Schema做rule有效判斷,以及之後的分片路由計算
    this.tableRules = ruleLoader.getTableRules();    //釋放ruleLoader
    ruleLoader = null;    this.dataHosts = new HashMap<String, DataHostConfig>();    this.dataNodes = new HashMap<String, DataNodeConfig>();    this.schemas = new HashMap<String, SchemaConfig>();    //讀取載入schema配置
    this.load(DEFAULT_DTD, schemaFile == null ? DEFAULT_XML : schemaFile);
}private void load(String dtdFile, String xmlFile) {
    InputStream dtd = null;
    InputStream xml = null;    try {
        dtd = XMLSchemaLoader.class.getResourceAsStream(dtdFile);
        xml = XMLSchemaLoader.class.getResourceAsStream(xmlFile);
        Element root = ConfigUtil.getDocument(dtd, xml).getDocumentElement();        //先載入所有的DataHost
        loadDataHosts(root);        //再載入所有的DataNode
        loadDataNodes(root);        //最後載入所有的Schema
        loadSchemas(root);
    } catch (ConfigException e) {        throw e;
    } catch (Exception e) {        throw new ConfigException(e);
    } finally {        if (dtd != null) {            try {
                dtd.close();
            } catch (IOException e) {
            }
        }        if (xml != null) {            try {
                xml.close();
            } catch (IOException e) {
            }
        }
    }
}

先看下DataHostConfig這個類的結構: 這裡寫圖片描述XMLSchemaLoader.java:

private void loadDataHosts(Element root) {
    NodeList list = root.getElementsByTagName("dataHost");    for (int i = 0, n = list.getLength(); i < n; ++i) {

        Element element = (Element) list.item(i);
        String name = element.getAttribute("name");        //判斷是否重複
        if (dataHosts.containsKey(name)) {            throw new ConfigException("dataHost name " + name + "duplicated!");
        }        //讀取最大連線數
        int maxCon = Integer.valueOf(element.getAttribute("maxCon"));        //讀取最小連線數
        int minCon = Integer.valueOf(element.getAttribute("minCon"));        /**
         * 讀取負載均衡配置
         * 1. balance="0", 不開啟分離機制,所有讀操作都發送到當前可用的 writeHost 上。
         * 2. balance="1",全部的 readHost 和 stand by writeHost 參不 select 的負載均衡
         * 3. balance="2",所有讀操作都隨機的在 writeHost、readhost 上分發。
         * 4. balance="3",所有讀請求隨機的分發到 wiriterHost 對應的 readhost 執行,writerHost 不負擔讀壓力
         */
        int balance = Integer.valueOf(element.getAttribute("balance"));        /**
         * 讀取切換型別
         * -1 表示不自動切換
         * 1 預設值,自動切換
         * 2 基於MySQL主從同步的狀態決定是否切換
         * 心跳詢句為 show slave status
         * 3 基於 MySQL galary cluster 的切換機制
         */
        String switchTypeStr = element.getAttribute("switchType");        int switchType = switchTypeStr.equals("") ? -1 : Integer.valueOf(switchTypeStr);        //讀取從延遲界限
        String slaveThresholdStr = element.getAttribute("slaveThreshold");        int slaveThreshold = slaveThresholdStr.equals("") ? -1 : Integer.valueOf(slaveThresholdStr);        //如果 tempReadHostAvailable 設定大於 0 則表示寫主機如果掛掉, 臨時的讀服務依然可用
        String tempReadHostAvailableStr = element.getAttribute("tempReadHostAvailable");
        boolean tempReadHostAvailable = tempReadHostAvailableStr.equals("") ? false : Integer.valueOf(tempReadHostAvailableStr) > 0;        /**
         * 讀取 寫型別
         * 這裡只支援 0 - 所有寫操作僅配置的第一個 writeHost
         */
        String writeTypStr = element.getAttribute("writeType");        int writeType = "".equals(writeTypStr) ? PhysicalDBPool.WRITE_ONLYONE_NODE : Integer.valueOf(writeTypStr);


        String dbDriver = element.getAttribute("dbDriver");
        String dbType = element.getAttribute("dbType");
        String filters = element.getAttribute("filters");
        String logTimeStr = element.getAttribute("logTime");        long logTime = "".equals(logTimeStr) ? PhysicalDBPool.LONG_TIME : Long.valueOf(logTimeStr) ;        //讀取心跳語句
        String heartbeatSQL = element.getElementsByTagName("heartbeat").item(0).getTextContent();        //讀取 初始化sql配置,用於oracle
        NodeList connectionInitSqlList = element.getElementsByTagName("connectionInitSql");
        String initConSQL = null;        if (connectionInitSqlList.getLength() > 0) {
            initConSQL = connectionInitSqlList.item(0).getTextContent();
        }        //讀取writeHost
        NodeList writeNodes = element.getElementsByTagName("writeHost");
        DBHostConfig[] writeDbConfs = new DBHostConfig[writeNodes.getLength()];
        Map<Integer, DBHostConfig[]> readHostsMap = new HashMap<Integer, DBHostConfig[]>(2);        for (int w = 0; w < writeDbConfs.length; w++) {
            Element writeNode = (Element) writeNodes.item(w);
            writeDbConfs[w] = createDBHostConf(name, writeNode, dbType, dbDriver, maxCon, minCon,filters,logTime);
            NodeList readNodes = writeNode.getElementsByTagName("readHost");            //讀取對應的每一個readHost
            if (readNodes.getLength() != 0) {
                DBHostConfig[] readDbConfs = new DBHostConfig[readNodes.getLength()];                for (int r = 0; r < readDbConfs.length; r++) {
                    Element readNode = (Element) readNodes.item(r);
                    readDbConfs[r] = createDBHostConf(name, readNode, dbType, dbDriver, maxCon, minCon,filters, logTime);
                }
                readHostsMap.put(w, readDbConfs);
            }
        }

        DataHostConfig hostConf = new DataHostConfig(name, dbType, dbDriver, 
                writeDbConfs, readHostsMap, switchType, slaveThreshold, tempReadHostAvailable);        

        hostConf.setMaxCon(maxCon);
        hostConf.setMinCon(minCon);
        hostConf.setBalance(balance);
        hostConf.setWriteType(writeType);
        hostConf.setHearbeatSQL(heartbeatSQL);
        hostConf.setConnectionInitSql(initConSQL);
        hostConf.setFilters(filters);
        hostConf.setLogTime(logTime);
        dataHosts.put(hostConf.getName(), hostConf);
    }
}

先讀取每個DataHost的通用配置,之後讀取每個DataHost對應的writeHost以及每個writeHost對應的readHost。配置好後,儲存在:

private final Map<String, DataHostConfig> dataHosts;

之後讀取載入DataHost: XMLSchemaLoader.java:

private void loadDataNodes(Element root) {    //讀取DataNode分支
    NodeList list = root.getElementsByTagName("dataNode");    for (int i = 0, n = list.getLength(); i < n; i++) {
        Element element = (Element) list.item(i);
        String dnNamePre = element.getAttribute("name");

        String databaseStr = element.getAttribute("database");
        String host = element.getAttribute("dataHost");        //字串不為空
        if (empty(dnNamePre) || empty(databaseStr) || empty(host)) {            throw new ConfigException("dataNode " + dnNamePre + " define error ,attribute can't be empty");
        }        //dnNames(name),databases(database),hostStrings(dataHost)都可以配置多個,以',', '$', '-'區分,但是需要保證database的個數*dataHost的個數=name的個數
        //多個dataHost與多個database如果寫在一個標籤,則每個dataHost擁有所有database
        //例如:<dataNode name="dn1$0-75" dataHost="localhost$1-10" database="db$0-759" />
        //則為:localhost1擁有dn1$0-75,localhost2也擁有dn1$0-75(對應db$76-151)
        String[] dnNames = io.mycat.util.SplitUtil.split(dnNamePre, ',', '$', '-');
        String[] databases = io.mycat.util.SplitUtil.split(databaseStr, ',', '$', '-');
        String[] hostStrings = io.mycat.util.SplitUtil.split(host, ',', '$', '-');        if (dnNames.length > 1 && dnNames.length != databases.length * hostStrings.length) {            throw new ConfigException("dataNode " + dnNamePre
                            + " define error ,dnNames.length must be=databases.length*hostStrings.length");
        }        if (dnNames.length > 1) {

            List<String[]> mhdList = mergerHostDatabase(hostStrings, databases);            for (int k = 0; k < dnNames.length; k++) {
                String[] hd = mhdList.get(k);
                String dnName = dnNames[k];
                String databaseName = hd[1];
                String hostName = hd[0];
                createDataNode(dnName, databaseName, hostName);
            }

        } else {
            createDataNode(dnNamePre, databaseStr, host);
        }

    }
}private void createDataNode(String dnName, String database, String host) {

    DataNodeConfig conf = new DataNodeConfig(dnName, database, host);        
    if (dataNodes.containsKey(conf.getName())) {        throw new ConfigException("dataNode " + conf.getName() + " duplicated!");
    }    if (!dataHosts.containsKey(host)) {        throw new ConfigException("dataNode " + dnName + " reference dataHost:" + host + " not exists!");
    }
    dataNodes.put(conf.getName(), conf);
}

生成的是DataNode類,放入:

private final Map<String, DataNodeConfig> dataNodes;



免費體驗雲安全(易盾)內容安全、驗證碼等服務

更多網易技術、產品、運營經驗分享請點選



相關文章:
【推薦】 react-native自定義原生元件