1. 程式人生 > >資料庫路由中介軟體MyCat - 原始碼篇(10)

資料庫路由中介軟體MyCat - 原始碼篇(10)

此文已由作者張鎬薪授權網易雲社群釋出。

歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。

3. 連線模組

3.5 後端連線

3.5.2 後端連接獲取與維護管理

還是那之前的流程,

st=>start: MyCat接受客戶端連線併為之建立唯一繫結的Session
e=>end: 將請求傳送給對應連線,處理完之後歸還連線
op1=>operation: MyCat接受客戶端的請求,計算路由
op2=>operation: 根據請求和路由建立合適的handler,這裡為SingleNodeHandler
op3=>operation: 從PhysicalDBNode中獲取後端連線
cond=>condition: 嘗試獲取連線,連線夠用?
op4=>operation: 嘗試非同步建立新的連線
op5=>operation: 通過DelegateResponseHandler將連線與之前的Handler,這裡是SingleNodeHandler繫結
st->op1->op2->op3->condcond(yes)->econd(no)->op4->op5->e

現在我們到了嘗試獲取連線的階段 PhysicalDataSource.java:

public void getConnection(String schema,boolean autocommit, final ResponseHandler handler,    final Object attachment) throws IOException {//從當前連線map中拿取已建立好的後端連線
    BackendConnection con = this.conMap.tryTakeCon(schema,autocommit);    if (con != null) {//如果不為空,則繫結對應前端請求的handler
        takeCon(con, handler, attachment, schema);        return;
    } else {//如果為空,新建連線
        int activeCons = this.getActiveCount();//當前最大活動連線
        if(activeCons+1>size){//下一個連線大於最大連線數
            LOGGER.error("the max activeConnnections size can not be max than maxconnections");            throw new IOException("the max activeConnnections size can not be max than maxconnections");
        }else{            // create connection
            LOGGER.info("not ilde connection in pool,create new connection for " + this.name
                    + " of schema "+schema);
            createNewConnection(handler, attachment, schema);
        }

    }

}
private void createNewConnection(final ResponseHandler handler,        final Object attachment, final String schema) throws IOException {    //非同步建立連線,將連線的handler繫結為DelegateResponseHandler
    MycatServer.getInstance().getBusinessExecutor().execute(new Runnable() {        public void run() {            try {
                createNewConnection(new DelegateResponseHandler(handler) {                    @Override
                    public void connectionError(Throwable e,
                            BackendConnection conn) {
                        handler.connectionError(e, conn);
                    }                    @Override
                    public void connectionAcquired(BackendConnection conn) {
                        takeCon(conn, handler, attachment, schema);
                    }
                }, schema);
            } catch (IOException e) {
                handler.connectionError(e, null);
            }
        }
    });
}

非同步呼叫工廠方法建立後端連線,這裡為MySQLConnection MySQLDataSource.java:

@Override
    public void createNewConnection(ResponseHandler handler,String schema) throws IOException {
        factory.make(this, handler,schema);
}

根據之前所述,MySQLConnection的工廠方法會先將NIOhandler設定為MySQLConnectionAuthenticator: MySQLConnectionFactory.java:

public MySQLConnection make(MySQLDataSource pool, ResponseHandler handler,
            String schema) throws IOException {        //DBHost配置
        DBHostConfig dsc = pool.getConfig();        //根據是否為NIO返回SocketChannel或者AIO的AsynchronousSocketChannel
        NetworkChannel channel = openSocketChannel(MycatServer.getInstance()
                .isAIO());        //新建MySQLConnection
        MySQLConnection c = new MySQLConnection(channel, pool.isReadNode());        //根據配置初始化MySQLConnection
        MycatServer.getInstance().getConfig().setSocketParams(c, false);
        c.setHost(dsc.getIp());
        c.setPort(dsc.getPort());
        c.setUser(dsc.getUser());
        c.setPassword(dsc.getPassword());
        c.setSchema(schema);        //目前實際連線還未建立,handler為MySQL連線認證MySQLConnectionAuthenticator,傳入的handler為後端連線處理器ResponseHandler
        c.setHandler(new MySQLConnectionAuthenticator(c, handler));
        c.setPool(pool);
        c.setIdleTimeout(pool.getConfig().getIdleTimeout());        //AIO和NIO連線方式建立實際的MySQL連線
        if (channel instanceof AsynchronousSocketChannel) {
            ((AsynchronousSocketChannel) channel).connect(                    new InetSocketAddress(dsc.getIp(), dsc.getPort()), c,
                    (CompletionHandler) MycatServer.getInstance()
                            .getConnector());
        } else {            //通過NIOConnector建立連線
            ((NIOConnector) MycatServer.getInstance().getConnector())
                    .postConnect(c);

        }        return c;
    }

這裡傳入的ResponseHandler為DelegateResponseHandler,在連線建立驗證之後,會呼叫: MySQLConnectionAuthenticator.java:

public void handle(byte[] data) {    //省略                
    //設定ResponseHandler
    if (listener != null) {
            listener.connectionAcquired(source);
    }    //省略}

DelegateResponseHandler.java:

private final ResponseHandler target;@Override
   public void connectionAcquired(BackendConnection conn) {   //將後端連線的ResponseHandler設定為target
   target.connectionAcquired(conn);
}

這樣,原來沒獲取到連線的ResponseHandler就獲得需要的連線,之後進行處理。處理完後,歸還到連線池中。

private void returnCon(BackendConnection c) {    //清空連線的Attachment
    c.setAttachment(null);    //設定為未使用
    c.setBorrowed(false);    //更新上次使用時間,用於清理空閒連線
    c.setLastTime(TimeUtil.currentTimeMillis());    //獲取連線池對應的佇列
    ConQueue queue = this.conMap.getSchemaConQueue(c.getSchema());    //按照是否Autocommit分類歸還連線
    boolean ok = false;    if (c.isAutocommit()) {
        ok = queue.getAutoCommitCons().offer(c);
    } else {
        ok = queue.getManCommitCons().offer(c);
    }    //歸還失敗,關閉連線,記錄
    if (!ok) {

        LOGGER.warn("can't return to pool ,so close con " + c);
        c.close("can't return to pool ");
    }
}

4.配置模組

MyCat例項初始化時究竟會有什麼操作呢?看下MyCat程式入口: MycatStartup.java:

public static void main(String[] args) {    //是否啟用zk配置,/myid.properties中的loadZk屬性決定,預設不啟用,從本地xml檔案中讀取配置
    ZkConfig.instance().initZk();    try {
        String home = SystemConfig.getHomePath();        if (home == null) {
            System.out.println(SystemConfig.SYS_HOME + "  is not set.");
            System.exit(-1);
        }        // init
        MycatServer server = MycatServer.getInstance();
        server.beforeStart();        // startup
        server.startup();
        System.out.println("MyCAT Server startup successfully. see logs in logs/mycat.log");        while (true) {
            Thread.sleep(300 * 1000);
        }
    } catch (Exception e) {
        SimpleDateFormat sdf = new SimpleDateFormat(dateFormat);
        LogLog.error(sdf.format(new Date()) + " startup error", e);
        System.exit(-1);
    }
}

從程式碼中,可以簡單的分為三步:

  1. MycatServer.getInstance():獲取MyCat例項,其實就是讀取配置檔案,並驗證正確性等

  2. server.beforeStart():獲取環境變數,日誌配置

  3. server.startup():啟動MyCat,啟動執行緒,初始化執行緒池和連線池等。


免費體驗雲安全(易盾)內容安全、驗證碼等服務

更多網易技術、產品、運營經驗分享請點選




相關文章:
【推薦】 SpringBoot入門(二)——起步依賴