資料庫路由中介軟體MyCat - 原始碼篇(10)
阿新 • • 發佈:2018-11-02
此文已由作者張鎬薪授權網易雲社群釋出。
歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。
3. 連線模組
3.5 後端連線
3.5.2 後端連接獲取與維護管理
還是那之前的流程,
st=>start: MyCat接受客戶端連線併為之建立唯一繫結的Session e=>end: 將請求傳送給對應連線,處理完之後歸還連線 op1=>operation: MyCat接受客戶端的請求,計算路由 op2=>operation: 根據請求和路由建立合適的handler,這裡為SingleNodeHandler op3=>operation: 從PhysicalDBNode中獲取後端連線 cond=>condition: 嘗試獲取連線,連線夠用? op4=>operation: 嘗試非同步建立新的連線 op5=>operation: 通過DelegateResponseHandler將連線與之前的Handler,這裡是SingleNodeHandler繫結 st->op1->op2->op3->condcond(yes)->econd(no)->op4->op5->e
現在我們到了嘗試獲取連線的階段 PhysicalDataSource.java:
public void getConnection(String schema,boolean autocommit, final ResponseHandler handler, final Object attachment) throws IOException {//從當前連線map中拿取已建立好的後端連線 BackendConnection con = this.conMap.tryTakeCon(schema,autocommit); if (con != null) {//如果不為空,則繫結對應前端請求的handler takeCon(con, handler, attachment, schema); return; } else {//如果為空,新建連線 int activeCons = this.getActiveCount();//當前最大活動連線 if(activeCons+1>size){//下一個連線大於最大連線數 LOGGER.error("the max activeConnnections size can not be max than maxconnections"); throw new IOException("the max activeConnnections size can not be max than maxconnections"); }else{ // create connection LOGGER.info("not ilde connection in pool,create new connection for " + this.name + " of schema "+schema); createNewConnection(handler, attachment, schema); } } }
private void createNewConnection(final ResponseHandler handler, final Object attachment, final String schema) throws IOException { //非同步建立連線,將連線的handler繫結為DelegateResponseHandler MycatServer.getInstance().getBusinessExecutor().execute(new Runnable() { public void run() { try { createNewConnection(new DelegateResponseHandler(handler) { @Override public void connectionError(Throwable e, BackendConnection conn) { handler.connectionError(e, conn); } @Override public void connectionAcquired(BackendConnection conn) { takeCon(conn, handler, attachment, schema); } }, schema); } catch (IOException e) { handler.connectionError(e, null); } } }); }
非同步呼叫工廠方法建立後端連線,這裡為MySQLConnection MySQLDataSource.java:
@Override public void createNewConnection(ResponseHandler handler,String schema) throws IOException { factory.make(this, handler,schema); }
根據之前所述,MySQLConnection的工廠方法會先將NIOhandler設定為MySQLConnectionAuthenticator: MySQLConnectionFactory.java:
public MySQLConnection make(MySQLDataSource pool, ResponseHandler handler, String schema) throws IOException { //DBHost配置 DBHostConfig dsc = pool.getConfig(); //根據是否為NIO返回SocketChannel或者AIO的AsynchronousSocketChannel NetworkChannel channel = openSocketChannel(MycatServer.getInstance() .isAIO()); //新建MySQLConnection MySQLConnection c = new MySQLConnection(channel, pool.isReadNode()); //根據配置初始化MySQLConnection MycatServer.getInstance().getConfig().setSocketParams(c, false); c.setHost(dsc.getIp()); c.setPort(dsc.getPort()); c.setUser(dsc.getUser()); c.setPassword(dsc.getPassword()); c.setSchema(schema); //目前實際連線還未建立,handler為MySQL連線認證MySQLConnectionAuthenticator,傳入的handler為後端連線處理器ResponseHandler c.setHandler(new MySQLConnectionAuthenticator(c, handler)); c.setPool(pool); c.setIdleTimeout(pool.getConfig().getIdleTimeout()); //AIO和NIO連線方式建立實際的MySQL連線 if (channel instanceof AsynchronousSocketChannel) { ((AsynchronousSocketChannel) channel).connect( new InetSocketAddress(dsc.getIp(), dsc.getPort()), c, (CompletionHandler) MycatServer.getInstance() .getConnector()); } else { //通過NIOConnector建立連線 ((NIOConnector) MycatServer.getInstance().getConnector()) .postConnect(c); } return c; }
這裡傳入的ResponseHandler為DelegateResponseHandler,在連線建立驗證之後,會呼叫: MySQLConnectionAuthenticator.java:
public void handle(byte[] data) { //省略 //設定ResponseHandler if (listener != null) { listener.connectionAcquired(source); } //省略}
DelegateResponseHandler.java:
private final ResponseHandler target;@Override public void connectionAcquired(BackendConnection conn) { //將後端連線的ResponseHandler設定為target target.connectionAcquired(conn); }
這樣,原來沒獲取到連線的ResponseHandler就獲得需要的連線,之後進行處理。處理完後,歸還到連線池中。
private void returnCon(BackendConnection c) { //清空連線的Attachment c.setAttachment(null); //設定為未使用 c.setBorrowed(false); //更新上次使用時間,用於清理空閒連線 c.setLastTime(TimeUtil.currentTimeMillis()); //獲取連線池對應的佇列 ConQueue queue = this.conMap.getSchemaConQueue(c.getSchema()); //按照是否Autocommit分類歸還連線 boolean ok = false; if (c.isAutocommit()) { ok = queue.getAutoCommitCons().offer(c); } else { ok = queue.getManCommitCons().offer(c); } //歸還失敗,關閉連線,記錄 if (!ok) { LOGGER.warn("can't return to pool ,so close con " + c); c.close("can't return to pool "); } }
4.配置模組
MyCat例項初始化時究竟會有什麼操作呢?看下MyCat程式入口: MycatStartup.java:
public static void main(String[] args) { //是否啟用zk配置,/myid.properties中的loadZk屬性決定,預設不啟用,從本地xml檔案中讀取配置 ZkConfig.instance().initZk(); try { String home = SystemConfig.getHomePath(); if (home == null) { System.out.println(SystemConfig.SYS_HOME + " is not set."); System.exit(-1); } // init MycatServer server = MycatServer.getInstance(); server.beforeStart(); // startup server.startup(); System.out.println("MyCAT Server startup successfully. see logs in logs/mycat.log"); while (true) { Thread.sleep(300 * 1000); } } catch (Exception e) { SimpleDateFormat sdf = new SimpleDateFormat(dateFormat); LogLog.error(sdf.format(new Date()) + " startup error", e); System.exit(-1); } }
從程式碼中,可以簡單的分為三步:
MycatServer.getInstance():獲取MyCat例項,其實就是讀取配置檔案,並驗證正確性等
server.beforeStart():獲取環境變數,日誌配置
server.startup():啟動MyCat,啟動執行緒,初始化執行緒池和連線池等。
更多網易技術、產品、運營經驗分享請點選。
相關文章:
【推薦】 SpringBoot入門(二)——起步依賴