1. 程式人生 > >資料庫路由中介軟體MyCat - 原始碼篇(3)

資料庫路由中介軟體MyCat - 原始碼篇(3)

此文已由作者張鎬薪授權網易雲社群釋出。

歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。

2. 前端連線建立與認證

Title:MySql連線建立以及認證過程client->MySql:1.TCP連線請求 
MySql->client:2.接受TCP連線client->MySql:3.TCP連線建立MySql->client:4.握手包HandshakePacketclient->MySql:5.認證包AuthPacketMySql->client:6.如果驗證成功,則返回OkPacketclient->MySql:7.預設會發送查詢版本資訊的包MySql->client:8.返回結果包

2.3 (5~6)認證包AuthPacket,如果驗證成功,則返回OkPacket

繼續執行FrontendConnection的register()方法:

// 非同步讀取並處理,這個與RW執行緒中的asynRead()相同,之後客戶端收到握手包返回AuthPacket就是從這裡開始看。
         this.asynRead();

FrontendConnection.asynRead()方法直接呼叫this.socketWR.asynRead();如之前所述,一個Connection對應一個socketWR。在這裡是一個FrontendConnection對應一個NIOSocketWR。NIOSocketWR是操作類,裡面的方法實現非同步讀寫。 NIOSocketWR.asynRead():

public void asynRead() throws IOException {
        ByteBuffer theBuffer = con.readBuffer;        if (theBuffer == null) {
            theBuffer = con.processor.getBufferPool().allocate();
            con.readBuffer = theBuffer;
        }        //從channel中讀取資料,並且儲存到對應AbstractConnection的readBuffer中,readBuffer處於write mode,返回讀取了多少位元組
        int got = channel.read(theBuffer);        //呼叫處理讀取到的資料的方法
        con.onReadData(got);
    }

讀取完資料到快取readBuffer後,呼叫處理readBuffer方法: AbstractConnection.onReadData(int got):

public void onReadData(int got) throws IOException {        //如果連線已經關閉,則不處理
        if (isClosed.get()) {            return;
        }
        ByteBuffer buffer = this.readBuffer;
        lastReadTime = TimeUtil.currentTimeMillis();        //讀取到的位元組小於0,表示流關閉,如果等於0,代表TCP連線關閉了
        if (got < 0) {            this.close("stream closed");           
         return;
        } else if (got == 0) {           
         if (!this.channel.isOpen()) {             
            this.close("socket closed");             
               return;
            }
        }
        netInBytes += got;
        processor.addNetInBytes(got);        // 迴圈處理位元組資訊
        //readBuffer一直處於write mode,position記錄最後的寫入位置
        int offset = readBufferOffset, length = 0, position = buffer.position();     
           for (; ; ) {            //獲取包頭的包長度資訊
            length = getPacketLength(buffer, offset);           
             if (length == -1) {               
              if (!buffer.hasRemaining()) {
                    buffer = checkReadBuffer(buffer, offset, position);
                }                break;
            }            //如果postion小於包起始位置加上包長度,證明readBuffer不夠大,需要擴容
            if (position >= offset + length) {
                buffer.position(offset);               
                 byte[] data = new byte[length];                //讀取一個完整的包
                buffer.get(data, 0, length);                //處理包,每種AbstractConnection的處理函式不同
                handle(data);                //記錄下讀取到哪裡了
                offset += length;                //如果最後寫入位置等於最後讀取位置,則證明所有的處理完了,可以清空快取和offset
                //否則,記錄下最新的offset
                //由於readBufferOffset只會單執行緒(繫結的RW執行緒)修改,但是會有多個執行緒訪問(定時執行緒池的清理任務),所以設為volatile,不用CAS
                if (position == offset) {                  
                  if (readBufferOffset != 0) {
                        readBufferOffset = 0;
                    }
                    buffer.clear();                  
                      break;
                } else {
                    readBufferOffset = offset;
                    buffer.position(position);             
                           continue;
                }
            } else {              
              if (!buffer.hasRemaining()) {
                    buffer = checkReadBuffer(buffer, offset, position);
                }                break;
            }
        }
    }private ByteBuffer checkReadBuffer(ByteBuffer buffer, int offset,                           
                int position) {        
                if (offset == 0) {       
                     if (buffer.capacity() >= maxPacketSize) {    
                throw new IllegalArgumentException(                   
                     "Packet size over the limit.");
            }            int size = buffer.capacity() << 1;
            size = (size > maxPacketSize) ? maxPacketSize : size;
            ByteBuffer newBuffer = processor.getBufferPool().allocate(size);
            buffer.position(offset);
            newBuffer.put(buffer);
            readBuffer = newBuffer;
            recycle(buffer);         
               return newBuffer;
        } else {
            buffer.position(offset);
            buffer.compact();
            readBufferOffset = 0;        
                return buffer;
        }
    }

可以看出,處理快取需要考慮到容量,擴容和位置記錄等方面。 這裡,readBuffer一直處於寫模式。MyCat通過position(),還有get(data, 0, length)來讀取資料。readBufferOffset用來記錄每次讀取的offset,需要設定為volatile。由於readBufferOffset只會單執行緒(繫結的RW執行緒)修改,但是會有多個執行緒訪問(定時執行緒池的清理空閒連線的任務),所以設為volatile,不用CAS。這是一個經典的用volatile代替CAS實現多執行緒安全訪問的場景。 MyCat的快取管理思路很好,之後我會仔細講。 讀取完整包之後,交給handler處理。每種AbstractConnection的handler不同,FrontendConnection的handler為FrontendAuthenticator

this.handler = new FrontendAuthenticator(this);

我們思考下,FrontendConnection會接收什麼請求呢?有兩種,認證請求和SQL命令請求。只有認證成功後,才會接受SQL命令請求。FrontendAuthenticator只負責認證請求,在認證成功後,將對應AbstractConnection的handler設為處理SQL請求的FrontendCommandHandler即可. 一切正常的話,這裡讀取到的應為客戶端發出的AuthPacket: 這裡寫圖片描述 AuthPacket:

  • packet length(3 bytes)

  • packet number (1)

  • client flags (4)

  • max packet (4)

  • charset(1)

  • username (null terminated string)

  • password (length code Binary)

  • database (null terminated string)

public void handle(byte[] data) {        // check quit packet
        if (data.length == QuitPacket.QUIT.length && data[4] == MySQLPacket.COM_QUIT) {
            source.close("quit packet");          
              return;
        }

        AuthPacket auth = new AuthPacket();
        auth.read(data);        // check user
        if (!checkUser(auth.user, source.getHost())) {
            failure(ErrorCode.ER_ACCESS_DENIED_ERROR, "Access denied for user '" + auth.user + "' with host '" + source.getHost()+ "'");          
              return;
        }        // check password
        if (!checkPassword(auth.password, auth.user)) {
            failure(ErrorCode.ER_ACCESS_DENIED_ERROR, "Access denied for user '" + auth.user + "', because password is error ");            return;
        }        // check degrade
        if ( isDegrade( auth.user ) ) {
             failure(ErrorCode.ER_ACCESS_DENIED_ERROR, "Access denied for user '" + auth.user + "', because service be degraded ");             return;
        }        // check schema
        switch (checkSchema(auth.database, auth.user)) {       
         case ErrorCode.ER_BAD_DB_ERROR:
            failure(ErrorCode.ER_BAD_DB_ERROR, "Unknown database '" + auth.database + "'");   
                     break;        case ErrorCode.ER_DBACCESS_DENIED_ERROR:
            String s = "Access denied for user '" + auth.user + "' to database '" + auth.database + "'";
            failure(ErrorCode.ER_DBACCESS_DENIED_ERROR, s);          
              break;       
               default:            //認證成功,設定好使用者資料庫和許可權等,將handler設定為FrontendCommandHandler
            success(auth);
        }
    }

認證成功後,會發送OkPacket,對應FrontendConnection的handler變成了FrontendCommandHandler,可以接受SQL請求了。 傳送OkPacket的過程與HandshakePacket相同,這裡不再贅述。

source.write(source.writeToBuffer(AUTH_OK, buffer));

OkPacket結構:

  • packet length(3 bytes)

  • packet number (1)

  • 0x00 (1,包體首位元組為0)

  • affect rows (length code Binary)

  • insert id (length code Binary)

  • server status (2)

  • warning status (2)

  • message (length code Binary)


免費體驗雲安全(易盾)內容安全、驗證碼等服務

更多網易技術、產品、運營經驗分享請點選




相關文章:
【推薦】 Kubernetes 在網易雲中的落地優化實踐
【推薦】 網易嚴選的wkwebview測試之路