1. 程式人生 > >數據庫路由中間件MyCat - 源代碼篇(1)

數據庫路由中間件MyCat - 源代碼篇(1)

結果 參數 負責 con cancel size port can value

此文已由作者張鎬薪授權網易雲社區發布。

歡迎訪問網易雲社區,了解更多網易技術產品運營經驗。


進入了源代碼篇,我們先從整體入手,之後拿一個簡單流程前端連接建立與認證作為例子,理清代碼思路和設計模式。然後,針對每一個重點模塊進行分析。


1. 整體通信與業務框架:


前端與後端通信框架都為NIO/AIO,因為目前生產上用的linux發行版內核都沒有真正實現網絡上的AIO,如果應用用AIO的話可能比NIO還要慢一些,所以,我們這裏只分析NIO相關的通信模塊。


  1. NIOAcceptor:作為服務器接受客戶端連接(前端NIO通信)

  2. NIOConnector:作為客戶端去連接後臺數據庫(MySql,後端NIO通信)

  3. NIOReactor:Reactor模式的NIO,處理並轉發請求到RW線程,其實就是把對應AbstractConnection(就是NIO的channel的封裝)註冊到RW線程的selector上,只註冊讀標記;原因之後細講

  4. NIOReactorPool:一般高性能網絡通信框架采用多Reactor(多dispatcher)模式,這裏將NIOReactor池化;每次NIOConnector接受一個連接或者NIOAcceptor請求一個連接,都會封裝成AbstractConnection,同時請求NIOReactorPool每次輪詢出一個NIOReactor,之後AbstractConnection與這個NIOReactor綁定(就是3之中說的註冊)。

  5. RW:RW線程,負責執行NIO的channel讀寫,這裏channel封裝成了AbstractConnection

  6. NIOSocketWR:每個前端和後端連接都有一個對應的緩沖區,對連接讀寫操作具體如何操作的方法和緩存方式,封裝到了這個類裏面。


通過上面的分析,我們大致知道了通信是由誰負責的了,但是為什麽NIOReactor只註冊讀標記?還有網絡通信channel(之後的文章我們就都用AbstractConnection代替了)讀寫有線程執行了,但是中間的業務步驟,比如SQL攔截,SQL解析還有結果合並是誰執行呢?然後,還有些定時的任務,比如檢查心跳連接等,如何執行呢? 首先,Reactor不會主動驅動寫請求,寫請求只會由業務步驟和定時任務觸發。首先看,Reactor與前端AbstractConnection還有後端AbstractConnection,接收到的請求有兩種,前端的SQL請求,還有後端的結果。但是這兩種都不能直接轉發,前端的SQL請求需要經過SQL解析等業務步驟才能寫到後端,後端的結果也需要經過業務處理才能寫到前端。所以只要執行業務步驟的線程去註冊寫標記,Reactor只要在檢查到寫標記後去寫之後取消標記即可。定時任務同理。 那麽誰去執行業務請求呢?MyCat會初始化一個BusinessExecutor線程池去處理業務請求,這個BusinessExecutor接受Reactor調度,定時任務由一個Timer線程調度並由一個TimerExecutor線程池執行。 整體結構如下所示,所有橢圓形的圖形是線程或者進程(省略了很多,比如緩沖、緩存、連接以及對應的管理,這些之後會細細介紹):技術分享圖片


2. 前端連接建立與認證


mysql客戶端連接mysql服務器抓包:流程是:


Title:MySql連接建立以及認證過程client->MySql:1.TCP連接請求 
MySql->client:2.接受TCP連接client->MySql:3.TCP連接建立MySql->client:4.握手包HandshakePacketclient->MySql:5.認證包AuthPacketMySql->client:6.如果驗證成功,則返回OkPacketclient->MySql:7.默認會發送查詢版本信息的包MySql->client:8.返回結果包


在之後的協議分析,我們會深入每一個包進行分析


2.1 (1~3)TCP連接請求->接受TCP連接->TCP連接建立


首先,接受TCP連接(為了三次握手,上面流程的前三個包)需要通過NIOAcceptor實現,NIOAcceptor主要完成綁定端口,註冊OP_ACCEPT監聽客戶端連接事件,有客戶連接,則放接受連接,將返回的channel封裝成為FrontendConnection(AbstarctConnection的子類),從NIOReactorPool中拿出一個NIOReactor並將FrontendConnection交給它綁定。 到此,NIOAcceptor就處理完一個客戶端的連接請求。


public NIOAcceptor(String name, String bindIp, int port,
                       FrontendConnectionFactory factory, NIOReactorPool reactorPool)
            throws IOException {        super.setName(name);        this.port = port;        this.selector = Selector.open();        this.serverChannel =  ServerSocketChannel.open();        this.serverChannel.configureBlocking(false);        //設置TCP屬性 
        serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
        serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 1024 * 16 * 2);        // backlog=100
        serverChannel.bind(new InetSocketAddress(bindIp, port), 100);        //註冊OP_ACCEPT,監聽客戶端連接
        this.serverChannel.register(selector, SelectionKey.OP_ACCEPT);        //FrontendConnectionFactory,用來封裝channel成為FrontendConnection
        this.factory = factory;        //NIOReactor池
        this.reactorPool = reactorPool;
    }

構造器讀取ip,端口,前端連接工廠和NIOReactor池,初始化TCP參數,並bind,在selector上註冊OP_ACCEPT。 在NIOAcceptor啟動後:


@Override
    public void run() {        final Selector tSelector = this.selector;        for (; ; ) {
            ++acceptCount;            try {                //輪詢發現新連接請求
                tSelector.select(1000L);
                Set<SelectionKey> keys = tSelector.selectedKeys();                try {                    for (SelectionKey key : keys) {                        if (key.isValid() && key.isAcceptable()) {                            //接受連接操作
                            accept();
                        } else {
                            key.cancel();
                        }
                    }
                } finally {
                    keys.clear();
                }
            } catch (Exception e) {
                LOGGER.warn(getName(), e);
            }
        }
    }

NIOAcceptor這個線程不斷輪詢接受新的客戶端連接請求,接受連接操作:


private void accept() {
        SocketChannel channel = null;        try {            //得到通信channel並設置為非阻塞
            channel = serverChannel.accept();
            channel.configureBlocking(false);            //封裝channel為FrontendConnection
            FrontendConnection c = factory.make(channel);
            c.setAccepted(true);
            c.setId(ID_GENERATOR.getId());            //利用NIOProcessor管理前端鏈接,定期清除空閑連接,同時做寫隊列檢查
            NIOProcessor processor = (NIOProcessor) MycatServer.getInstance()
                    .nextProcessor();
            c.setProcessor(processor);            //和具體執行selector響應感興趣事件的NIOReactor綁定
            NIOReactor reactor = reactorPool.getNextReactor();
            reactor.postRegister(c);

        } catch (Exception e) {
            LOGGER.warn(getName(), e);
            closeChannel(channel);
        }
    }

NIOProcessor持有所有的前後端連接,裏面有空閑檢查和寫隊列檢查。RW線程,TimerExecutor線程池會執行裏面的方法來實現空閑寫入和定時連接檢查等等。之後我還會詳細介紹。 關閉Channel方法以及生成連接id方法很簡單,就不加註釋和贅述了。下面是他們的源代碼:


private static void closeChannel(SocketChannel channel) {        if (channel == null) {            return;
        }
        Socket socket = channel.socket();        if (socket != null) {            try {
                socket.close();
            } catch (IOException e) {
                LOGGER.error("closeChannelError", e);
            }
        }        try {
            channel.close();
        } catch (IOException e) {
            LOGGER.error("closeChannelError", e);
        }
    }    /**
     * 前端連接ID生成器
     *
     * @author mycat
     */
    private static class AcceptIdGenerator {        private static final long MAX_VALUE = 0xffffffffL;        private long acceptId = 0L;        private final Object lock = new Object();        private long getId() {            synchronized (lock) {                if (acceptId >= MAX_VALUE) {
                    acceptId = 0L;
                }                return ++acceptId;
            }
        }
    }



免費體驗雲安全(易盾)內容安全、驗證碼等服務

更多網易技術、產品、運營經驗分享請點擊。




相關文章:
【推薦】 一個小白的測試環境docker化之路
【推薦】 網易七魚 Android 高性能日誌寫入方案
【推薦】 淺談由管理者角色引出的B端產品設計思考點

數據庫路由中間件MyCat - 源代碼篇(1)