1. 程式人生 > >數據庫路由中間件MyCat - 源代碼篇(9)

數據庫路由中間件MyCat - 源代碼篇(9)

分享圖片 database frontend 計算 rac not nal 服務 run

此文已由作者張鎬薪授權網易雲社區發布。

歡迎訪問網易雲社區,了解更多網易技術產品運營經驗。

3. 連接模塊

3.5 後端連接

3.5.1 後端連接獲取與負載均衡

上一節我們講了後端連接的基本建立和響應處理,那麽這些後端連接是什麽時候建立的呢? 首先,MyCat配置文件中,DataHost標簽中有minIdle這個屬性。代表在MyCat初始化時,會在這個DataHost上初始化維護多少個連接(這些連接可以理解為連接池)。每個前端Client連接會創建Session,而Session會根據命令的不同而創建不同的Handler。每個Handler會從連接池中拿出所需要的連接並使用。在連接池大小不夠時,RW線程會異步驅使新建所需的連接補充連接池,但是連接數最大不能超過配置的maxCon。同時,如之前所述,有定時線程檢查並回收空閑後端連接。但池中最小不會小於minCon。 我們可以通過後端連接的工廠方法的調用鏈來理解: 技術分享圖片

看這個調用鏈,我們簡述下大概的流程。

st=>start: MyCat接受客戶端連接並為之建立唯一綁定的Session
e=>end: 將請求發送給對應連接,處理完之後歸還連接
op1=>operation: MyCat接受客戶端的請求,計算路由
op2=>operation: 根據請求和路由創建合適的handler,這裏為SingleNodeHandler
op3=>operation: 從PhysicalDBNode中獲取後端連接
cond=>condition: 嘗試獲取連接,連接夠用?
op4=>operation: 嘗試異步創建新的連接
op5=>operation: 通過DelegateResponseHandler將連接與之前的Handler,這裏是SingleNodeHandler綁定
st->op1->op2->op3->condcond(yes)->econd(no)->op4->op5->e

我們先從Session看起,在MyCat中實現類為NonBlockingSession。在前端連接建立時,會創建綁定唯一的Session: ServerConnectionFactory.java:

protected FrontendConnection getConnection(NetworkChannel channel) throws IOException {
    SystemConfig sys = MycatServer.getInstance().getConfig().getSystem();
    ServerConnection c = new ServerConnection(channel);
    MycatServer.getInstance().getConfig().setSocketParams(c, true);
    c.setPrivileges(MycatPrivileges.instance());
    c.setQueryHandler(new ServerQueryHandler(c));
    c.setLoadDataInfileHandler(new ServerLoadDataInfileHandler(c));    // c.setPrepareHandler(new ServerPrepareHandler(c));
    c.setTxIsolation(sys.getTxIsolation());    //創建綁定唯一Session
    c.setSession2(new NonBlockingSession(c));    return c;
}

Session主要處理事務,多節點轉發協調等,由不同的ResponseHandler實現; 技術分享圖片 這些ResponseHandler我們之後會在對應的模塊去細細分析。這裏先跳過。 查看SingleNodeHanlder的處理方法 SingleNodeHanlder.java:

public void execute() throws Exception {    //從這裏開始計算處理時間
    startTime=System.currentTimeMillis();
    ServerConnection sc = session.getSource();    this.isRunning = true;    this.packetId = 0;    final BackendConnection conn = session.getTarget(node);    //之前是否獲取過Connection並且Connection有效
    if (session.tryExistsCon(conn, node)) {
        _execute(conn);
    } else {        // create new connection
        MycatConfig conf = MycatServer.getInstance().getConfig();        //從config中獲取DataNode
        PhysicalDBNode dn = conf.getDataNodes().get(node.getName());        //獲取對應的數據庫連接
        dn.getConnection(dn.getDatabase(), sc.isAutocommit(), node, this,
                   node);
    }

}

從PhysicalDBNode中獲取合適的連接:

public void getConnection(String schema,boolean autoCommit, RouteResultsetNode rrs,
        ResponseHandler handler, Object attachment) throws Exception {
    checkRequest(schema);    //檢查數據庫連接池是否初始化成功,因為有reload命令
    if (dbPool.isInitSuccess()) {        //根據是否能在讀節點上運行獲取連接,一般是判斷是否為讀請求,並且讀請求不在事務中
        if (rrs.canRunnINReadDB(autoCommit)) {            //根據負載均衡策略選擇合適的後端連接
            dbPool.getRWBanlanceCon(schema,autoCommit, handler, attachment,                    this.database);
        } else {            //直接選擇當前連接池中的的後端連接
            dbPool.getSource().getConnection(schema,autoCommit, handler, attachment);
        }

    } else {        throw new IllegalArgumentException("Invalid DataSource:"
                + dbPool.getActivedIndex());
    }
}

PhysicalDBPool類中有負載均衡,切換writeHost,控制write方式等(分別對應balance,writeType等標簽)的實現。首先我們看如果有負載均衡策略(配置了balance)的獲取連接的方式:

public void getRWBanlanceCon(String schema, boolean autocommit,
                             ResponseHandler handler, Object attachment, String database) throws Exception {

    PhysicalDatasource theNode = null;
    ArrayList<PhysicalDatasource> okSources = null;    switch (banlance) {        //所有讀寫節點參與read請求的負載均衡,除了當前活躍的寫節點,balance=1
        case BALANCE_ALL_BACK: {            //返回所有寫節點和符合條件的讀節點,不包括當前的寫節點
            okSources = getAllActiveRWSources(true, false, checkSlaveSynStatus());            if (okSources.isEmpty()) {                //如果結果即為空,返回當前寫節點
                theNode = this.getSource();
            } else {                //不為空,隨機選一個
                theNode = randomSelect(okSources);
            }            break;
        }        //所有讀寫節點參與read請求的負載均衡,balance=2
        case BALANCE_ALL: {            //返回所有寫節點和符合條件的讀節點
            okSources = getAllActiveRWSources(true, true, checkSlaveSynStatus());            //隨機選一個
            theNode = randomSelect(okSources);            break;
        }        case BALANCE_ALL_READ: {            //返回所有符合條件的讀節點
            okSources = getAllActiveRWSources(false, false, checkSlaveSynStatus());            //隨機取一個
            theNode = randomSelect(okSources);            break;
        }        //不做負載均衡,balance=0或其他不為以上的值
        case BALANCE_NONE:        default:
            // return default write data source
            theNode = this.getSource();
    }    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug("select read source " + theNode.getName() + " for dataHost:" + this.getHostName());
    }
    theNode.getConnection(schema, autocommit, handler, attachment);
}

其中涉及到的方法:

  1. 返回符合條件節點集:

private ArrayList<PhysicalDatasource> getAllActiveRWSources(            boolean includeWriteNode, boolean includeCurWriteNode, boolean filterWithSlaveThreshold) {        int curActive = activedIndex;
        ArrayList<PhysicalDatasource> okSources = new ArrayList<PhysicalDatasource>(this.allDs.size());        //判斷寫節點
        for (int i = 0; i < this.writeSources.length; i++) {
            PhysicalDatasource theSource = writeSources[i];            //判斷寫節點是否是active,可能reload會置為inactive,可能多個寫節點但是只有一個是活躍在用的(writeType=0)
            if (isAlive(theSource)) {                //負載均衡策略是否包含寫節點
                if (includeWriteNode) {                    //判斷是否包含當前活躍的寫入節點
                    if (i == curActive && includeCurWriteNode == false) {                        // not include cur active source
                    } else if (filterWithSlaveThreshold) {                        //如果包含從節點同步延遲限制,檢查同步狀態
                        if (canSelectAsReadNode(theSource)) {
                            okSources.add(theSource);
                        } else {                            //如果同步狀態不對,則不添加這個寫節點
                            continue;
                        }

                    } else {
                        okSources.add(theSource);
                    }
                }                //檢查theSource對應的讀節點
                if (!readSources.isEmpty()) {                    // 檢查theSource對應的讀節點(從節點)
                    PhysicalDatasource[] allSlaves = this.readSources.get(i);                    if (allSlaves != null) {                        for (PhysicalDatasource slave : allSlaves) {                            if (isAlive(slave)) {                                //如果包含從節點同步延遲限制,檢查同步狀態
                                if (filterWithSlaveThreshold) {                                    if (canSelectAsReadNode(slave)) {                                        //如果同步狀態正確,則把讀節點加入
                                        okSources.add(slave);
                                    } else {                                        continue;
                                    }

                                } else {
                                    okSources.add(slave);
                                }
                            }
                        }
                    }
                }

            } else {                // TODO : add by zhuam
                // 如果寫節點不OK, 也要保證臨時的讀服務正常
                if (this.dataHostConfig.isTempReadHostAvailable()) {                    if (!readSources.isEmpty()) {                        // check all slave nodes
                        PhysicalDatasource[] allSlaves = this.readSources.get(i);                        if (allSlaves != null) {                            for (PhysicalDatasource slave : allSlaves) {                                if (isAlive(slave)) {                                    if (filterWithSlaveThreshold) {                                        if (canSelectAsReadNode(slave)) {
                                            okSources.add(slave);
                                        } else {                                            continue;
                                        }

                                    } else {
                                        okSources.add(slave);
                                    }
                                }
                            }
                        }
                    }
                }
            }

        }        return okSources;
    }
  1. 檢查是否判斷主從延遲:

private boolean checkSlaveSynStatus() {        return (dataHostConfig.getSlaveThreshold() != -1)
                && (dataHostConfig.getSwitchType() == DataHostConfig.SYN_STATUS_SWITCH_DS);
    }
  1. 隨機選擇節點:

/**
     * TODO: modify by zhuam
     * <p/>
     * 隨機選擇,按權重設置隨機概率。
     * 在一個截面上碰撞的概率高,但調用量越大分布越均勻,而且按概率使用權重後也比較均勻,有利於動態調整提供者權重。
     *
     * @param okSources
     * @return
     */
    public PhysicalDatasource randomSelect(ArrayList<PhysicalDatasource> okSources) {        if (okSources.isEmpty()) {            return this.getSource();

        } else {            int length = okSources.size();    // 總個數
            int totalWeight = 0;            // 總權重
            boolean sameWeight = true;        // 權重是否都一樣
            for (int i = 0; i < length; i++) {                int weight = okSources.get(i).getConfig().getWeight();
                totalWeight += weight;        // 累計總權重
                if (sameWeight && i > 0
                        && weight != okSources.get(i - 1).getConfig().getWeight()) {      // 計算所有權重是否一樣
                    sameWeight = false;
                }
            }            if (totalWeight > 0 && !sameWeight) {                // 如果權重不相同且權重大於0則按總權重數隨機
                int offset = random.nextInt(totalWeight);                // 並確定隨機值落在哪個片斷上
                for (int i = 0; i < length; i++) {
                    offset -= okSources.get(i).getConfig().getWeight();                    if (offset < 0) {                        return okSources.get(i);
                    }
                }
            }            // 如果權重相同或權重為0則均等隨機
            return okSources.get(random.nextInt(length));            //int index = Math.abs(random.nextInt()) % okSources.size();
            //return okSources.get(index);
        }
    }
  1. 根據writeType獲取當前writeHost方法:

public PhysicalDatasource getSource() {        switch (writeType) {            //writeType=0,返回當前active的writeHost
            case WRITE_ONLYONE_NODE: {                return writeSources[activedIndex];
            }            //writeType=1,隨機發到一個writeHost
            case WRITE_RANDOM_NODE: {                int index = Math.abs(wnrandom.nextInt()) % writeSources.length;
                PhysicalDatasource result = writeSources[index];                if (!this.isAlive(result)) {                    // find all live nodes
                    ArrayList<Integer> alives = new ArrayList<Integer>(writeSources.length - 1);                    for (int i = 0; i < writeSources.length; i++) {                        if (i != index) {                            if (this.isAlive(writeSources[i])) {
                                alives.add(i);
                            }
                        }
                    }                    if (alives.isEmpty()) {
                        result = writeSources[0];
                    } else {                        // random select one
                        index = Math.abs(wnrandom.nextInt()) % alives.size();
                        result = writeSources[alives.get(index)];

                    }
                }                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("select write source " + result.getName()
                            + " for dataHost:" + this.getHostName());
                }                return result;
            }            //參數不正確
            default: {                throw new java.lang.IllegalArgumentException("writeType is "
                        + writeType + " ,so can‘t return one write datasource ");
            }
        }

    }


免費體驗雲安全(易盾)內容安全、驗證碼等服務

更多網易技術、產品、運營經驗分享請點擊。




相關文章:
【推薦】 canvas 動畫庫 CreateJs 之 EaselJS(下篇)
【推薦】 針對低網速的性能優化
【推薦】 nej+regular環境使用es6的低成本方案

數據庫路由中間件MyCat - 源代碼篇(9)