1. 程式人生 > >資料庫路由中介軟體MyCat - 原始碼篇(11)

資料庫路由中介軟體MyCat - 原始碼篇(11)

此文已由作者張鎬薪授權網易雲社群釋出。

歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。

4.配置模組

每個MyCatServer初始化時,會初始化: MyCatServer.java:

public static final String NAME = "MyCat";private static final long LOG_WATCH_DELAY = 60000L;private static final long TIME_UPDATE_PERIOD = 20L;private static final MycatServer INSTANCE = new MycatServer();private static final Logger LOGGER = LoggerFactory.getLogger("MycatServer");private final RouteService routerService;private final CacheService cacheService;private Properties dnIndexProperties;//AIO連線群組private AsynchronousChannelGroup[] asyncChannelGroups;private volatile int channelIndex = 0;//全域性序列號private final MyCATSequnceProcessor sequnceProcessor = new MyCATSequnceProcessor();private final DynaClassLoader catletClassLoader;private final SQLInterceptor sqlInterceptor;private volatile int nextProcessor;private BufferPool bufferPool;private boolean aio = false;//XA事務全域性ID生成private final AtomicLong xaIDInc = new AtomicLong();private MycatServer() {    //讀取檔案配置
    this.config = new MycatConfig();    //定時執行緒池,單執行緒執行緒池
    scheduler = Executors.newSingleThreadScheduledExecutor();    //SQL記錄器
    this.sqlRecorder = new SQLRecorder(config.getSystem()
            .getSqlRecordCount());    /**
     * 是否線上,MyCat manager中有命令控制
     * | offline | Change MyCat status to OFF |
     * | online | Change MyCat status to ON |
     */
    this.isOnline = new AtomicBoolean(true);    //快取服務初始化
    cacheService = new CacheService();    //路由計算初始化
    routerService = new RouteService(cacheService);    // load datanode active index from properties
    dnIndexProperties = loadDnIndexProps();    try {        //SQL解析器
        sqlInterceptor = (SQLInterceptor) Class.forName(
                config.getSystem().getSqlInterceptor()).newInstance();
    } catch (Exception e) {        throw new RuntimeException(e);
    }    //catlet載入器
    catletClassLoader = new DynaClassLoader(SystemConfig.getHomePath()
            + File.separator + "catlet", config.getSystem()
            .getCatletClassCheckSeconds());    //記錄啟動時間
    this.startupTime = TimeUtil.currentTimeMillis();
}

第一步是讀取檔案配置,主要是三個檔案:schema.xml,rule.xml和server.xml. 讀取後的配置會載入到MyCatConfig中。 MyCatConfig.java:

public MycatConfig() {//讀取schema.xml,rule.xml和server.xmlConfigInitializer confInit = new ConfigInitializer(true);this.system = confInit.getSystem();this.users = confInit.getUsers();this.schemas = confInit.getSchemas();this.dataHosts = confInit.getDataHosts();this.dataNodes = confInit.getDataNodes();for (PhysicalDBPool dbPool : dataHosts.values()) {
    dbPool.setSchemas(getDataNodeSchemasOfDataHost(dbPool.getHostName()));
}this.quarantine = confInit.getQuarantine();this.cluster = confInit.getCluster();//初始化重載入配置時間this.reloadTime = TimeUtil.currentTimeMillis();this.rollbackTime = -1L;this.status = RELOAD;//配置載入鎖this.lock = new ReentrantLock();
}

它們都通過ConfigInitializer讀取:

public ConfigInitializer(boolean loadDataHost) {    //讀取schema.xml
    SchemaLoader schemaLoader = new XMLSchemaLoader();    //讀取server.xml
    XMLConfigLoader configLoader = new XMLConfigLoader(schemaLoader);
    schemaLoader = null;    //載入配置
    this.system = configLoader.getSystemConfig();    this.users = configLoader.getUserConfigs();    this.schemas = configLoader.getSchemaConfigs();    //是否重新載入DataHost和對應的DataNode
    if (loadDataHost) {        this.dataHosts = initDataHosts(configLoader);        this.dataNodes = initDataNodes(configLoader);
    }    //許可權管理
    this.quarantine = configLoader.getQuarantineConfig();    this.cluster = initCobarCluster(configLoader);    //不同型別的全域性序列處理器的配置載入
    if (system.getSequnceHandlerType() == SystemConfig.SEQUENCEHANDLER_MYSQLDB) {
        IncrSequenceMySQLHandler.getInstance().load();
    }    if (system.getSequnceHandlerType() == SystemConfig.SEQUENCEHANDLER_LOCAL_TIME) {
        IncrSequenceTimeHandler.getInstance().load();
    }    //檢查user與schema配置對應以及schema配置不為空
    this.checkConfig();
}

4.1 rule.xml

讀取schema之前會先讀取rule.xml。 XmlSchemaLoader.java:

public XMLSchemaLoader(String schemaFile, String ruleFile) {    //先讀取rule.xml
    XMLRuleLoader ruleLoader = new XMLRuleLoader(ruleFile);    this.tableRules = ruleLoader.getTableRules();
    ruleLoader = null;    this.dataHosts = new HashMap<String, DataHostConfig>();    this.dataNodes = new HashMap<String, DataNodeConfig>();    this.schemas = new HashMap<String, SchemaConfig>();    //讀取載入schema配置
    this.load(DEFAULT_DTD, schemaFile == null ? DEFAULT_XML : schemaFile);
}public XMLSchemaLoader() {    this(null, null);
}

XMLRuleLoader.java:

public XMLRuleLoader(String ruleFile) {    // this.rules = new HashSet<RuleConfig>();
    //rule名 -> rule
    this.tableRules = new HashMap<String, TableRuleConfig>();    //function名 -> 具體分片演算法
    this.functions = new HashMap<String, AbstractPartitionAlgorithm>();    //預設為:/rule.dtd和/rule.xml
    load(DEFAULT_DTD, ruleFile == null ? DEFAULT_XML : ruleFile);
}public XMLRuleLoader() {    this(null);
}
private void load(String dtdFile, String xmlFile) {
    InputStream dtd = null;
    InputStream xml = null;    try {
        dtd = XMLRuleLoader.class.getResourceAsStream(dtdFile);
        xml = XMLRuleLoader.class.getResourceAsStream(xmlFile);        //讀取出語意樹
        Element root = ConfigUtil.getDocument(dtd, xml)
                .getDocumentElement();        //載入Function
        loadFunctions(root);        //載入TableRule
        loadTableRules(root);
    } catch (ConfigException e) {        throw e;
    } catch (Exception e) {        throw new ConfigException(e);
    } finally {        if (dtd != null) {            try {
                dtd.close();
            } catch (IOException e) {
            }
        }        if (xml != null) {            try {
                xml.close();
            } catch (IOException e) {
            }
        }
    }
}

ConfigUtil.java解析語意樹:

public static Document getDocument(final InputStream dtd, InputStream xml) throws ParserConfigurationException,
            SAXException, IOException {
    DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
    factory.setValidating(true);
    factory.setNamespaceAware(false);
    DocumentBuilder builder = factory.newDocumentBuilder();
    builder.setEntityResolver(new EntityResolver() {        @Override
        public InputSource resolveEntity(String publicId, String systemId) {            return new InputSource(dtd);
        }
    });
    builder.setErrorHandler(new ErrorHandler() {        @Override
        public void warning(SAXParseException e) {
        }        @Override
        public void error(SAXParseException e) throws SAXException {            throw e;
        }        @Override
        public void fatalError(SAXParseException e) throws SAXException {            throw e;
        }
    });    return builder.parse(xml);
}

載入functions,XmlRuleLoader.java

private void loadFunctions(Element root) throws ClassNotFoundException,
            InstantiationException, IllegalAccessException,
            InvocationTargetException {
        NodeList list = root.getElementsByTagName("function");        for (int i = 0, n = list.getLength(); i < n; ++i) {
            Node node = list.item(i);            if (node instanceof Element) {
                Element e = (Element) node;                //獲取name標籤
                String name = e.getAttribute("name");                //如果Map已有,則function重複
                if (functions.containsKey(name)) {                    throw new ConfigException("rule function " + name
                            + " duplicated!");
                }                //獲取class標籤
                String clazz = e.getAttribute("class");                //根據class利用反射新建分片演算法
                AbstractPartitionAlgorithm function = createFunction(name, clazz);

                ParameterMapping.mapping(function, ConfigUtil.loadElements(e));                //每個AbstractPartitionAlgorithm可能會實現init來初始化
                function.init();                //放入functions map
                functions.put(name, function);
            }
        }
    }private AbstractPartitionAlgorithm createFunction(String name, String clazz)
        throws ClassNotFoundException, InstantiationException,
        IllegalAccessException, InvocationTargetException {
    Class<?> clz = Class.forName(clazz);    //判斷是否繼承AbstractPartitionAlgorithm
    if (!AbstractPartitionAlgorithm.class.isAssignableFrom(clz)) {        throw new IllegalArgumentException("rule function must implements "
                + AbstractPartitionAlgorithm.class.getName() + ", name=" + name);
    }
    return (AbstractPartitionAlgorithm) clz.newInstance();
}

載入所有的function的node,每一個node就是一個AbstractPartitionAlgorithm,並放入functions這個map中;

private final Map<String, TableRuleConfig> tableRules;


對於每一個node,通過反射新建對應引數的AbstractPartitionAlgorithm。這樣,所有的function就載入到了functions這個map中。 同理,載入TableRule,就加上了function是否存在的判斷:


/**
* tableRule標籤結構:
 * <tableRule name="sharding-by-month">
 *     <rule>
 *         <columns>create_date</columns>
 *         <algorithm>partbymonth</algorithm>
 *     </rule>
 * </tableRule>
 * @param root
 * @throws SQLSyntaxErrorException
    */private void loadTableRules(Element root) throws SQLSyntaxErrorException {    //獲取每個tableRule標籤
    NodeList list = root.getElementsByTagName("tableRule");    for (int i = 0, n = list.getLength(); i < n; ++i) {
        Node node = list.item(i);        if (node instanceof Element) {
            Element e = (Element) node;            //先判斷是否重複
            String name = e.getAttribute("name");            if (tableRules.containsKey(name)) {                throw new ConfigException("table rule " + name
                        + " duplicated!");
            }            //獲取rule標籤


免費體驗雲安全(易盾)內容安全、驗證碼等服務

更多網易技術、產品、運營經驗分享請點選


相關文章:
【推薦】 iOS安裝包瘦身(上篇)