資料庫路由中介軟體MyCat - 原始碼篇(11)
阿新 • • 發佈:2018-11-02
此文已由作者張鎬薪授權網易雲社群釋出。
歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。
4.配置模組
每個MyCatServer初始化時,會初始化: MyCatServer.java:
public static final String NAME = "MyCat";private static final long LOG_WATCH_DELAY = 60000L;private static final long TIME_UPDATE_PERIOD = 20L;private static final MycatServer INSTANCE = new MycatServer();private static final Logger LOGGER = LoggerFactory.getLogger("MycatServer");private final RouteService routerService;private final CacheService cacheService;private Properties dnIndexProperties;//AIO連線群組private AsynchronousChannelGroup[] asyncChannelGroups;private volatile int channelIndex = 0;//全域性序列號private final MyCATSequnceProcessor sequnceProcessor = new MyCATSequnceProcessor();private final DynaClassLoader catletClassLoader;private final SQLInterceptor sqlInterceptor;private volatile int nextProcessor;private BufferPool bufferPool;private boolean aio = false;//XA事務全域性ID生成private final AtomicLong xaIDInc = new AtomicLong();private MycatServer() { //讀取檔案配置 this.config = new MycatConfig(); //定時執行緒池,單執行緒執行緒池 scheduler = Executors.newSingleThreadScheduledExecutor(); //SQL記錄器 this.sqlRecorder = new SQLRecorder(config.getSystem() .getSqlRecordCount()); /** * 是否線上,MyCat manager中有命令控制 * | offline | Change MyCat status to OFF | * | online | Change MyCat status to ON | */ this.isOnline = new AtomicBoolean(true); //快取服務初始化 cacheService = new CacheService(); //路由計算初始化 routerService = new RouteService(cacheService); // load datanode active index from properties dnIndexProperties = loadDnIndexProps(); try { //SQL解析器 sqlInterceptor = (SQLInterceptor) Class.forName( config.getSystem().getSqlInterceptor()).newInstance(); } catch (Exception e) { throw new RuntimeException(e); } //catlet載入器 catletClassLoader = new DynaClassLoader(SystemConfig.getHomePath() + File.separator + "catlet", config.getSystem() .getCatletClassCheckSeconds()); //記錄啟動時間 this.startupTime = TimeUtil.currentTimeMillis(); }
第一步是讀取檔案配置,主要是三個檔案:schema.xml,rule.xml和server.xml. 讀取後的配置會載入到MyCatConfig中。 MyCatConfig.java:
public MycatConfig() {//讀取schema.xml,rule.xml和server.xmlConfigInitializer confInit = new ConfigInitializer(true);this.system = confInit.getSystem();this.users = confInit.getUsers();this.schemas = confInit.getSchemas();this.dataHosts = confInit.getDataHosts();this.dataNodes = confInit.getDataNodes();for (PhysicalDBPool dbPool : dataHosts.values()) { dbPool.setSchemas(getDataNodeSchemasOfDataHost(dbPool.getHostName())); }this.quarantine = confInit.getQuarantine();this.cluster = confInit.getCluster();//初始化重載入配置時間this.reloadTime = TimeUtil.currentTimeMillis();this.rollbackTime = -1L;this.status = RELOAD;//配置載入鎖this.lock = new ReentrantLock(); }
它們都通過ConfigInitializer讀取:
public ConfigInitializer(boolean loadDataHost) { //讀取schema.xml SchemaLoader schemaLoader = new XMLSchemaLoader(); //讀取server.xml XMLConfigLoader configLoader = new XMLConfigLoader(schemaLoader); schemaLoader = null; //載入配置 this.system = configLoader.getSystemConfig(); this.users = configLoader.getUserConfigs(); this.schemas = configLoader.getSchemaConfigs(); //是否重新載入DataHost和對應的DataNode if (loadDataHost) { this.dataHosts = initDataHosts(configLoader); this.dataNodes = initDataNodes(configLoader); } //許可權管理 this.quarantine = configLoader.getQuarantineConfig(); this.cluster = initCobarCluster(configLoader); //不同型別的全域性序列處理器的配置載入 if (system.getSequnceHandlerType() == SystemConfig.SEQUENCEHANDLER_MYSQLDB) { IncrSequenceMySQLHandler.getInstance().load(); } if (system.getSequnceHandlerType() == SystemConfig.SEQUENCEHANDLER_LOCAL_TIME) { IncrSequenceTimeHandler.getInstance().load(); } //檢查user與schema配置對應以及schema配置不為空 this.checkConfig(); }
4.1 rule.xml
讀取schema之前會先讀取rule.xml。 XmlSchemaLoader.java:
public XMLSchemaLoader(String schemaFile, String ruleFile) { //先讀取rule.xml XMLRuleLoader ruleLoader = new XMLRuleLoader(ruleFile); this.tableRules = ruleLoader.getTableRules(); ruleLoader = null; this.dataHosts = new HashMap<String, DataHostConfig>(); this.dataNodes = new HashMap<String, DataNodeConfig>(); this.schemas = new HashMap<String, SchemaConfig>(); //讀取載入schema配置 this.load(DEFAULT_DTD, schemaFile == null ? DEFAULT_XML : schemaFile); }public XMLSchemaLoader() { this(null, null); }
XMLRuleLoader.java:
public XMLRuleLoader(String ruleFile) { // this.rules = new HashSet<RuleConfig>(); //rule名 -> rule this.tableRules = new HashMap<String, TableRuleConfig>(); //function名 -> 具體分片演算法 this.functions = new HashMap<String, AbstractPartitionAlgorithm>(); //預設為:/rule.dtd和/rule.xml load(DEFAULT_DTD, ruleFile == null ? DEFAULT_XML : ruleFile); }public XMLRuleLoader() { this(null); }
private void load(String dtdFile, String xmlFile) { InputStream dtd = null; InputStream xml = null; try { dtd = XMLRuleLoader.class.getResourceAsStream(dtdFile); xml = XMLRuleLoader.class.getResourceAsStream(xmlFile); //讀取出語意樹 Element root = ConfigUtil.getDocument(dtd, xml) .getDocumentElement(); //載入Function loadFunctions(root); //載入TableRule loadTableRules(root); } catch (ConfigException e) { throw e; } catch (Exception e) { throw new ConfigException(e); } finally { if (dtd != null) { try { dtd.close(); } catch (IOException e) { } } if (xml != null) { try { xml.close(); } catch (IOException e) { } } } }
ConfigUtil.java解析語意樹:
public static Document getDocument(final InputStream dtd, InputStream xml) throws ParserConfigurationException, SAXException, IOException { DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); factory.setValidating(true); factory.setNamespaceAware(false); DocumentBuilder builder = factory.newDocumentBuilder(); builder.setEntityResolver(new EntityResolver() { @Override public InputSource resolveEntity(String publicId, String systemId) { return new InputSource(dtd); } }); builder.setErrorHandler(new ErrorHandler() { @Override public void warning(SAXParseException e) { } @Override public void error(SAXParseException e) throws SAXException { throw e; } @Override public void fatalError(SAXParseException e) throws SAXException { throw e; } }); return builder.parse(xml); }
載入functions,XmlRuleLoader.java
private void loadFunctions(Element root) throws ClassNotFoundException, InstantiationException, IllegalAccessException, InvocationTargetException { NodeList list = root.getElementsByTagName("function"); for (int i = 0, n = list.getLength(); i < n; ++i) { Node node = list.item(i); if (node instanceof Element) { Element e = (Element) node; //獲取name標籤 String name = e.getAttribute("name"); //如果Map已有,則function重複 if (functions.containsKey(name)) { throw new ConfigException("rule function " + name + " duplicated!"); } //獲取class標籤 String clazz = e.getAttribute("class"); //根據class利用反射新建分片演算法 AbstractPartitionAlgorithm function = createFunction(name, clazz); ParameterMapping.mapping(function, ConfigUtil.loadElements(e)); //每個AbstractPartitionAlgorithm可能會實現init來初始化 function.init(); //放入functions map functions.put(name, function); } } }private AbstractPartitionAlgorithm createFunction(String name, String clazz) throws ClassNotFoundException, InstantiationException, IllegalAccessException, InvocationTargetException { Class<?> clz = Class.forName(clazz); //判斷是否繼承AbstractPartitionAlgorithm if (!AbstractPartitionAlgorithm.class.isAssignableFrom(clz)) { throw new IllegalArgumentException("rule function must implements " + AbstractPartitionAlgorithm.class.getName() + ", name=" + name); } return (AbstractPartitionAlgorithm) clz.newInstance(); }
載入所有的function的node,每一個node就是一個AbstractPartitionAlgorithm,並放入functions這個map中;
private final Map<String, TableRuleConfig> tableRules;
對於每一個node,通過反射新建對應引數的AbstractPartitionAlgorithm。這樣,所有的function就載入到了functions這個map中。 同理,載入TableRule,就加上了function是否存在的判斷:
/** * tableRule標籤結構: * <tableRule name="sharding-by-month"> * <rule> * <columns>create_date</columns> * <algorithm>partbymonth</algorithm> * </rule> * </tableRule> * @param root * @throws SQLSyntaxErrorException */private void loadTableRules(Element root) throws SQLSyntaxErrorException { //獲取每個tableRule標籤 NodeList list = root.getElementsByTagName("tableRule"); for (int i = 0, n = list.getLength(); i < n; ++i) { Node node = list.item(i); if (node instanceof Element) { Element e = (Element) node; //先判斷是否重複 String name = e.getAttribute("name"); if (tableRules.containsKey(name)) { throw new ConfigException("table rule " + name + " duplicated!"); } //獲取rule標籤
更多網易技術、產品、運營經驗分享請點選。
相關文章:
【推薦】 iOS安裝包瘦身(上篇)