1. 程式人生 > >資料庫路由中介軟體MyCat - 原始碼篇(2)

資料庫路由中介軟體MyCat - 原始碼篇(2)

此文已由作者張鎬薪授權網易雲社群釋出。

歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。


2. 前端連線建立與認證

Title:MySql連線建立以及認證過程client->MySql:1.TCP連線請求 
MySql->client:2.接受TCP連線client->MySql:3.TCP連線建立MySql->client:4.握手包HandshakePacketclient->MySql:5.認證包AuthPacketMySql->client:6.如果驗證成功,則返回OkPacketclient->MySql:7.預設會發送查詢版本資訊的包MySql->client:8.返回結果包

2.2  (4)握手包HandshakePacket

NIOReactor其實就是一個網路事件反應轉發器。 這裡寫圖片描述 很多地方會用到NIOReactor,這裡先講FrontendConnection和NIOReactor繫結這一部分。上一節說到,NIOAcceptor的accept()最後將FrontendConnection交給了NIOReactor池其中的一個NIOReactor。呼叫的是 postRegister(AbstractConnection c)方法。

final void postRegister(AbstractConnection c) {
        reactorR.registerQueue.offer(c);
        reactorR.selector.wakeup();
    }

postRegister將剛才傳入的FrontendConnection放入RW執行緒的註冊佇列。之後,喚醒RW執行緒的selector。 為什麼放入RW執行緒的註冊佇列,而不是直接註冊呢?如果是直接註冊,那麼就是NIOAcceptor這個執行緒負責註冊,這裡就會有鎖競爭,因為NIOAcceptor這個執行緒和每個RW執行緒會去競爭selector的鎖。這樣NIOAcceptor就不能高效的處理連線。所以,更好的方式是將FrontendConnection放入RW執行緒的註冊佇列,之後讓RW執行緒自己完成註冊工作。 RW執行緒的原始碼:

private final class RW implements Runnable {        private final Selector selector;        private final ConcurrentLinkedQueue<AbstractConnection> registerQueue;        private long reactCount;        private RW() throws IOException {            this.selector = Selector.open();            this.registerQueue = new ConcurrentLinkedQueue<AbstractConnection>();
        }        @Override
        public void run() {            final Selector selector = this.selector;
            Set<SelectionKey> keys = null;            for (;;) {
                ++reactCount;                try {
                    selector.select(500L);                    //從註冊佇列中取出AbstractConnection之後註冊讀事件
                    //之後做一些列操作,請參考下面註釋
                    register(selector);
                    keys = selector.selectedKeys();                    for (SelectionKey key : keys) {
                        AbstractConnection con = null;                        try {
                            Object att = key.attachment();                            if (att != null) {
                                con = (AbstractConnection) att;                                if (key.isValid() && key.isReadable()) {                                    try {                                        //非同步讀取資料並處理資料
                                        con.asynRead();
                                    } catch (IOException e) {
                                        con.close("program err:" + e.toString());                                        continue;
                                    } catch (Exception e) {
                                        LOGGER.debug("caught err:", e);
                                        con.close("program err:" + e.toString());                                        continue;
                                    }
                                }                                if (key.isValid() && key.isWritable()) {                                    //非同步寫資料
                                    con.doNextWriteCheck();
                                }
                            } else {
                                key.cancel();
                            }
                        } catch (CancelledKeyException e) {                            if (LOGGER.isDebugEnabled()) {
                                LOGGER.debug(con + " socket key canceled");
                            }
                        } catch (Exception e) {
                            LOGGER.warn(con + " " + e);
                        }
                    }
                } catch (Exception e) {
                    LOGGER.warn(name, e);
                } finally {                    if (keys != null) {
                        keys.clear();
                    }

                }
            }
        }        private void register(Selector selector) {
            AbstractConnection c = null;            if (registerQueue.isEmpty()) {                return;
            }            while ((c = registerQueue.poll()) != null) {                try {                    //註冊讀事件
                    ((NIOSocketWR) c.getSocketWR()).register(selector);                    //連線註冊,對於FrontendConnection是傳送HandshakePacket並非同步讀取響應
                    //響應為AuthPacket,讀取其中的資訊,驗證使用者名稱密碼等資訊,如果符合條件
                    //則傳送OkPacket
                    c.register();
                } catch (Exception e) {
                    c.close("register err" + e.toString());
                }
            }
        }

    }

因為NIOAcceptor執行緒和RW執行緒這兩個都會操作RW執行緒的註冊佇列,所以要用ConcurrentLinkedQueue RW執行緒不斷檢查selector中需要響應的事件,並如果註冊佇列不為空,就不斷註冊其中的AbstractConnection,在這裡就是FrontendConnection。 之後執行FrontendConnection的register()方法:

@Override
    public void register() throws IOException {        if (!isClosed.get()) {            // 生成認證資料
            byte[] rand1 = RandomUtil.randomBytes(8);            byte[] rand2 = RandomUtil.randomBytes(12);            // 儲存認證資料
            byte[] seed = new byte[rand1.length + rand2.length];
            System.arraycopy(rand1, 0, seed, 0, rand1.length);
            System.arraycopy(rand2, 0, seed, rand1.length, rand2.length);            this.seed = seed;            // 傳送握手資料包
            HandshakePacket hs = new HandshakePacket();
            hs.packetId = 0;
            hs.protocolVersion = Versions.PROTOCOL_VERSION;
            hs.serverVersion = Versions.SERVER_VERSION;
            hs.threadId = id;
            hs.seed = rand1;
            hs.serverCapabilities = getServerCapabilities();
            hs.serverCharsetIndex = (byte) (charsetIndex & 0xff);
            hs.serverStatus = 2;
            hs.restOfScrambleBuff = rand2;            // 非同步寫,本節就講到這裡
            hs.write(this);            // 非同步讀取並處理,這個與RW執行緒中的asynRead()相同,之後客戶端收到握手包返回AuthPacket(就是下一節)就是從這裡開始看。
            this.asynRead();
        }
    }

這個方法就是生成HandshakePacket併發送出去,之後非同步讀取響應。 之前的示例中MySql的HandshakePacket結構: HandshakePacket 可以總結出: HandshakePacket:

  • packet length(3 bytes)

  • packet number (1)

  • protocol version (1)

  • version (null terminated string)

  • thread id (4)

  • salt (8)

  • server capabilities (2)

  • server charset (1)

  • server status (2)

  • unused (13)

  • salt (12)

  • 0x00 --- 結束

    這裡我們看下MyCat中的實現這一部分MySql協議棧的packet類結構:  這裡可以看出,每個包都實現了自己的包長度和資訊方法,並且針對前段後端連線都有讀寫方法實現,所以,之後讀寫資料都會根據場景不同調用這些類中的方法。這些包就是整個MySql協議棧除邏輯外的內容實現。 HandshakePacket.write(FrontendConnection c)方法將上面傳入的資料封裝成ByteBuffer,並傳入給FrontendConnection  c的write(ByteBuffer buffer),這個方法直接繼承自AbstractConnection:

public final void write(ByteBuffer buffer) {        //首先判斷是否為壓縮協議
        if(isSupportCompress())
        {            //CompressUtil為壓縮協議輔助工具類
            ByteBuffer     newBuffer= CompressUtil.compressMysqlPacket(buffer,this,compressUnfinishedDataQueue);            //將要寫的資料先放入寫快取佇列
            writeQueue.offer(newBuffer);

        }   else
        {            //將要寫的資料先放入寫快取佇列
            writeQueue.offer(buffer);
        }        try {            //處理寫事件,這個方法比較複雜,需要重點分析其思路
            this.socketWR.doNextWriteCheck();
        } catch (Exception e) {
            LOGGER.warn("write err:", e);            this.close("write err:" + e);

        }

    }

如程式碼註釋中所述,先將要寫的資料放入寫緩衝佇列,之後呼叫NIOSocketWR.doNextWriteCheck()處理寫事件。

public void doNextWriteCheck() {        //檢查是否正在寫,看CAS更新writing值是否成功
        if (!writing.compareAndSet(false, true)) {            return;
        }        try {            //利用快取佇列和寫緩衝記錄保證寫的可靠性,返回true則為全部寫入成功
            boolean noMoreData = write0();            //因為只有一個執行緒可以成功CAS更新writing值,所以這裡不用再CAS
            writing.set(false);            //如果全部寫入成功而且寫入佇列為空(有可能在寫入過程中又有新的Bytebuffer加入到佇列),則取消註冊寫事件
            //否則,繼續註冊寫事件
            if (noMoreData && con.writeQueue.isEmpty()) {                if ((processKey.isValid() && (processKey.interestOps() & SelectionKey.OP_WRITE) != 0)) {
                    disableWrite();
                }

            } else {                if ((processKey.isValid() && (processKey.interestOps() & SelectionKey.OP_WRITE) == 0)) {
                    enableWrite(false);
                }
            }

        } catch (IOException e) {            if (AbstractConnection.LOGGER.isDebugEnabled()) {
                AbstractConnection.LOGGER.debug("caught err:", e);
            }
            con.close("err:" + e);
        }

    }    private boolean write0() throws IOException {        int written = 0;
        ByteBuffer buffer = con.writeBuffer;        if (buffer != null) {            //只要寫緩衝記錄中還有資料就不停寫入,但如果寫入位元組為0,證明網路繁忙,則退出
            while (buffer.hasRemaining()) {
                written = channel.write(buffer);                if (written > 0) {
                    con.netOutBytes += written;
                    con.processor.addNetOutBytes(written);
                    con.lastWriteTime = TimeUtil.currentTimeMillis();
                } else {                    break;
                }
            }            //如果寫緩衝中還有資料證明網路繁忙,計數並退出,否則清空緩衝
            if (buffer.hasRemaining()) {
                con.writeAttempts++;                return false;
            } else {
                con.writeBuffer = null;
                con.recycle(buffer);
            }
        }        //讀取快取佇列並寫channel
        while ((buffer = con.writeQueue.poll()) != null) {            if (buffer.limit() == 0) {
                con.recycle(buffer);
                con.close("quit send");                return true;
            }
            buffer.flip();            while (buffer.hasRemaining()) {
                written = channel.write(buffer);                if (written > 0) {
                    con.lastWriteTime = TimeUtil.currentTimeMillis();
                    con.netOutBytes += written;
                    con.processor.addNetOutBytes(written);
                    con.lastWriteTime = TimeUtil.currentTimeMillis();
                } else {                    break;
                }
            }            //如果寫緩衝中還有資料證明網路繁忙,計數,記錄下這次未寫完的資料到寫緩衝記錄並退出,否則回收緩衝
            if (buffer.hasRemaining()) {
                con.writeBuffer = buffer;
                con.writeAttempts++;                return false;
            } else {
                con.recycle(buffer);
            }
        }        return true;
    }    private void disableWrite() {        try {
            SelectionKey key = this.processKey;
            key.interestOps(key.interestOps() & OP_NOT_WRITE);
        } catch (Exception e) {
            AbstractConnection.LOGGER.warn("can't disable write " + e + " con "
                    + con);
        }

    }    private void enableWrite(boolean wakeup) {        boolean needWakeup = false;        try {
            SelectionKey key = this.processKey;
            key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
            needWakeup = true;
        } catch (Exception e) {
            AbstractConnection.LOGGER.warn("can't enable write " + e);

        }        if (needWakeup && wakeup) {
            processKey.selector().wakeup();
        }
    }

註釋已經很詳細,如此執行完,便成功將握手包傳送給了客戶端。 在這裡稍微吐槽下,由於MyCat在網路通訊上同時做了AIO和NIO,但是在設計上AbstractionConnection和這些並沒有關係。但是又涉及到快取佇列,所以設計上出現了一些如下的類模式:  這樣應該是不推薦這麼設計的,目前我還沒想好如何去改善


免費體驗雲安全(易盾)內容安全、驗證碼等服務

更多網易技術、產品、運營經驗分享請點選




相關文章:
【推薦】 一行程式碼搞定Dubbo介面呼叫
【推薦】 網易美學-系統架構系列1-分散式與服務化
【推薦】 Vue框架核心之資料劫持