1. 程式人生 > >資料庫路由中介軟體MyCat - 原始碼篇(5)

資料庫路由中介軟體MyCat - 原始碼篇(5)

此文已由作者張鎬薪授權網易雲社群釋出。

歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。

3. 連線模組

如之前所述,MyCat的連線分為前端和後端,下面是連線基本相關類圖:

3.1 ClosableConnection:

public interface ClosableConnection {    String getCharset();    //關閉連線
    void close(String reason);    boolean isClosed();    public void idleCheck();    long getStartupTime();    String getHost();    int getPort();    int getLocalPort();    long getNetInBytes();    long getNetOutBytes();
}

根據字面意思,一個可以關閉的連線需要實現關閉方法-_-,並且需要原因判斷是否是正常關閉。MySQL的通訊都需要指定字符集。MyCat伺服器建立ServerSocket時輸入的埠為伺服器在其上面監聽客戶的連線,當有客戶連線時,在隨機選擇一個沒用的埠與客戶端通訊;建立客戶socket時輸入的為服務端的監聽埠,在本地選擇一個未用埠與伺服器通訊,至於伺服器怎麼知道和客戶端的哪個埠通訊,和客戶端怎麼知道和服務端的哪個埠通訊(因為這兩個埠都是隨機生成的),tcp是採用"三次握手"建立連線,而udp則是每次傳送資訊時將埠號放在ip報文的資料段裡面。所以,連線裡面需要提供獲得監聽埠和服務埠的方法。此外,還需要檢查連線是否為空閒狀態(idle)。最後,需要一些統計資料。

3.2 NIOConnection:

public interface NIOConnection extends ClosableConnection {    //connected
    void register() throws IOException;    //處理資料
    void handle(byte[] data);    // 寫出一塊緩衝資料
    void write(ByteBuffer buffer);

}

所有NIO的通訊需要在多路複用選擇器上註冊channel,這裡有個對應的register()方法需要實現。然後,讀取和寫入資料都需要通過緩衝。緩衝區(Buffer)就是在記憶體中預留指定大小的儲存空間用來對輸入/輸出(I/O)的資料作臨時儲存,這部分預留的記憶體空間就叫做緩衝區,使用緩衝區有這麼兩個好處:

  1. 減少實際的物理讀寫次數

  2. 緩衝區在建立時就被分配記憶體,這塊記憶體區域一直被重用,可以減少動態分配和回收記憶體的次數 讀取到的資料需要經過處理,這裡對應的就是handle(byte[])方法。

    3.3 AbstractConnection:

從上面的實體圖,我們發現,AbstractConnection其實就是把Java的NetworkChannel進行封裝,同時需要依賴其他幾個類來完成他所需要的操作,如下:  其中,NIOProcessor是對AbstractConnection實現NIO讀寫的方法類,NIOHandler是處理AbstractConnection讀取的資料的處理方法類,NIOSocketWR是執行以上方法的執行緒類。

  1. 3.3.1 NIOProcessor:

NIOProcessor的構建方法:

public NIOProcessor(String name, BufferPool bufferPool,
            NameableExecutor executor) throws IOException {        this.name = name;        this.bufferPool = bufferPool;        this.executor = executor;        this.frontends = new ConcurrentHashMap<Long, FrontendConnection>();        this.backends = new ConcurrentHashMap<Long, BackendConnection>();        this.commands = new CommandCount();
    }

呼叫位置:  MyCatServer.java

...
bufferPool = new BufferPool(processBuferPool, processBufferChunk,
                socketBufferLocalPercent / processorCount);
        businessExecutor = ExecutorUtil.create("BusinessExecutor",
                threadPoolSize);
...for (int i = 0; i < processors.length; i++) {
            processors[i] = new NIOProcessor("Processor" + i, bufferPool,
                    businessExecutor);
        }
...

每個MyCat例項會初始化processors個NIOProcessor,每個NIOProcessor公用同一個bufferPool和businessExecutor。 bufferPool是緩衝池,BufferPool這個類負責緩衝統一管理 businessExecutor如之前所述,是業務執行緒池。 NIOProcessor被池化,很簡單,就是儲存到陣列中,通過MyCatServer的nextProcessor()方法輪詢獲取一個NIOProcessor,之後每個AbstractConnection通過setNIOProcessor方法,設定NIOProcessor。

public NIOProcessor nextProcessor() {        int i = ++nextProcessor;        if (i >= processors.length) {
            i = nextProcessor = 0;
        }        return processors[i];
    }

可以看出,每個AbstractConnection依賴於一個NIOProcessor,每個NIOProcessor儲存著多個AbstractConnection。AbstractConnection分為FrontendConnection和BackendConnection被分別儲存在NIOProcessor的frontends和backends這兩個ConcurrentHashMap中。 用ConcurrentHashMap是因為NIOAcceptor和NIOConnector執行緒以及RW執行緒池都會訪問這兩個變數。 NIOProcessor其實主要負責連線資源的管理:  MyCat會定時檢查前端和後端空閒連線,並清理和回收資源: MyCatServer.java:

// 處理器定時檢查任務
    private TimerTask processorCheck() {        return new TimerTask() {            @Override
            public void run() {
                timerExecutor.execute(new Runnable() {                    @Override
                    public void run() {                        try {                            for (NIOProcessor p : processors) {
                                p.checkBackendCons();
                            }
                        } catch (Exception e) {
                            LOGGER.warn("checkBackendCons caught err:" + e);
                        }

                    }
                });
                timerExecutor.execute(new Runnable() {                    @Override
                    public void run() {                        try {                            for (NIOProcessor p : processors) {
                                p.checkFrontCons();
                            }
                        } catch (Exception e) {
                            LOGGER.warn("checkFrontCons caught err:" + e);
                        }
                    }
                });
            }
        };
    }

檢查前端連線,回收空閒資源:

    /**
     * 定時執行該方法,回收部分資源。
     */
    public void checkFrontCons() {
        frontendCheck();
    }    private void frontendCheck() {
        Iterator<Entry<Long, FrontendConnection>> it = frontends.entrySet()
                .iterator();        while (it.hasNext()) {
            FrontendConnection c = it.next().getValue();            // 刪除空連線
            if (c == null) {
                it.remove();                this.frontendsLength.decrementAndGet();                continue;
            }            // 清理已關閉連線,否則空閒檢查。
            if (c.isClosed()) {
                c.cleanup();
                it.remove();                this.frontendsLength.decrementAndGet();
            } else {                // very important ,for some data maybe not sent
                checkConSendQueue(c);
                c.idleCheck();
            }
        }
    }

在關閉前端連線時,會清理連線佔用的快取資源: FrontendConnection.java:

protected void cleanup() {        //回收讀緩衝
        if (readBuffer != null) {
            recycle(readBuffer);            this.readBuffer = null;            this.readBufferOffset = 0;
        }        //回收寫緩衝
        if (writeBuffer != null) {
            recycle(writeBuffer);            this.writeBuffer = null;
        }        //回收壓縮協議棧編碼解碼佇列
        if(!decompressUnfinishedDataQueue.isEmpty())
        {
            decompressUnfinishedDataQueue.clear();
        }        if(!compressUnfinishedDataQueue.isEmpty())
        {
            compressUnfinishedDataQueue.clear();
        }        //回收寫佇列
        ByteBuffer buffer = null;        while ((buffer = writeQueue.poll()) != null) {
            recycle(buffer);
        }
    }

後端連線檢查,除了要清理已關閉的連線,還有要檢查SQL執行時間是否超時:

    /**
     * 定時執行該方法,回收部分資源。
     */
    public void checkBackendCons() {
        backendCheck();
    }    // 後端連線檢查
    private void backendCheck() {        long sqlTimeout = MycatServer.getInstance().getConfig().getSystem().getSqlExecuteTimeout() * 1000L;
        Iterator<Entry<Long, BackendConnection>> it = backends.entrySet().iterator();        while (it.hasNext()) {
            BackendConnection c = it.next().getValue();            // 刪除空連線
            if (c == null) {
                it.remove();                continue;
            }            // SQL執行超時的連線關閉
            if (c.isBorrowed()
                    && c.getLastTime() < TimeUtil.currentTimeMillis()
                            - sqlTimeout) {
                LOGGER.warn("found backend connection SQL timeout ,close it "
                        + c);
                c.close("sql timeout");
            }            // 清理已關閉連線,否則空閒檢查。
            if (c.isClosed()) {
                it.remove();

            } else {                // very important ,for some data maybe not sent
                if (c instanceof AbstractConnection) {
                    checkConSendQueue((AbstractConnection) c);
                }
                c.idleCheck();
            }
        }
    }

同時,在檢查連線是否關閉時,需要檢查寫佇列是否為空。寫佇列不為空,證明還有請求沒有響應。需要將寫佇列的剩餘請求非同步寫出,通過NIOSocketWR。

    private void checkConSendQueue(AbstractConnection c) {        // very important ,for some data maybe not sent
        if (!c.writeQueue.isEmpty()) {
            c.getSocketWR().doNextWriteCheck();
        }
    }


免費體驗雲安全(易盾)內容安全、驗證碼等服務

更多網易技術、產品、運營經驗分享請點選




相關文章:
【推薦】 在一臺伺服器上搭建相對高可用HiveServer實踐
【推薦】 大公司怎麼做Android程式碼混淆的?