資料庫路由中介軟體MyCat - 原始碼篇(3)
此文已由作者張鎬薪授權網易雲社群釋出。
歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。
2. 前端連線建立與認證
Title:MySql連線建立以及認證過程client->MySql:1.TCP連線請求 MySql->client:2.接受TCP連線client->MySql:3.TCP連線建立MySql->client:4.握手包HandshakePacketclient->MySql:5.認證包AuthPacketMySql->client:6.如果驗證成功,則返回OkPacketclient->MySql:7.預設會發送查詢版本資訊的包MySql->client:8.返回結果包
2.3 (5~6)認證包AuthPacket,如果驗證成功,則返回OkPacket
繼續執行FrontendConnection的register()方法:
// 非同步讀取並處理,這個與RW執行緒中的asynRead()相同,之後客戶端收到握手包返回AuthPacket就是從這裡開始看。 this.asynRead();
FrontendConnection.asynRead()方法直接呼叫this.socketWR.asynRead();如之前所述,一個Connection對應一個socketWR。在這裡是一個FrontendConnection對應一個NIOSocketWR。NIOSocketWR是操作類,裡面的方法實現非同步讀寫。 NIOSocketWR.asynRead():
public void asynRead() throws IOException { ByteBuffer theBuffer = con.readBuffer; if (theBuffer == null) { theBuffer = con.processor.getBufferPool().allocate(); con.readBuffer = theBuffer; } //從channel中讀取資料,並且儲存到對應AbstractConnection的readBuffer中,readBuffer處於write mode,返回讀取了多少位元組 int got = channel.read(theBuffer); //呼叫處理讀取到的資料的方法 con.onReadData(got); }
讀取完資料到快取readBuffer後,呼叫處理readBuffer方法: AbstractConnection.onReadData(int got):
public void onReadData(int got) throws IOException { //如果連線已經關閉,則不處理 if (isClosed.get()) { return; } ByteBuffer buffer = this.readBuffer; lastReadTime = TimeUtil.currentTimeMillis(); //讀取到的位元組小於0,表示流關閉,如果等於0,代表TCP連線關閉了 if (got < 0) { this.close("stream closed"); return; } else if (got == 0) { if (!this.channel.isOpen()) { this.close("socket closed"); return; } } netInBytes += got; processor.addNetInBytes(got); // 迴圈處理位元組資訊 //readBuffer一直處於write mode,position記錄最後的寫入位置 int offset = readBufferOffset, length = 0, position = buffer.position(); for (; ; ) { //獲取包頭的包長度資訊 length = getPacketLength(buffer, offset); if (length == -1) { if (!buffer.hasRemaining()) { buffer = checkReadBuffer(buffer, offset, position); } break; } //如果postion小於包起始位置加上包長度,證明readBuffer不夠大,需要擴容 if (position >= offset + length) { buffer.position(offset); byte[] data = new byte[length]; //讀取一個完整的包 buffer.get(data, 0, length); //處理包,每種AbstractConnection的處理函式不同 handle(data); //記錄下讀取到哪裡了 offset += length; //如果最後寫入位置等於最後讀取位置,則證明所有的處理完了,可以清空快取和offset //否則,記錄下最新的offset //由於readBufferOffset只會單執行緒(繫結的RW執行緒)修改,但是會有多個執行緒訪問(定時執行緒池的清理任務),所以設為volatile,不用CAS if (position == offset) { if (readBufferOffset != 0) { readBufferOffset = 0; } buffer.clear(); break; } else { readBufferOffset = offset; buffer.position(position); continue; } } else { if (!buffer.hasRemaining()) { buffer = checkReadBuffer(buffer, offset, position); } break; } } }private ByteBuffer checkReadBuffer(ByteBuffer buffer, int offset, int position) { if (offset == 0) { if (buffer.capacity() >= maxPacketSize) { throw new IllegalArgumentException( "Packet size over the limit."); } int size = buffer.capacity() << 1; size = (size > maxPacketSize) ? maxPacketSize : size; ByteBuffer newBuffer = processor.getBufferPool().allocate(size); buffer.position(offset); newBuffer.put(buffer); readBuffer = newBuffer; recycle(buffer); return newBuffer; } else { buffer.position(offset); buffer.compact(); readBufferOffset = 0; return buffer; } }
可以看出,處理快取需要考慮到容量,擴容和位置記錄等方面。 這裡,readBuffer一直處於寫模式。MyCat通過position(),還有get(data, 0, length)來讀取資料。readBufferOffset用來記錄每次讀取的offset,需要設定為volatile。由於readBufferOffset只會單執行緒(繫結的RW執行緒)修改,但是會有多個執行緒訪問(定時執行緒池的清理空閒連線的任務),所以設為volatile,不用CAS。這是一個經典的用volatile代替CAS實現多執行緒安全訪問的場景。 MyCat的快取管理思路很好,之後我會仔細講。 讀取完整包之後,交給handler處理。每種AbstractConnection的handler不同,FrontendConnection的handler為FrontendAuthenticator
this.handler = new FrontendAuthenticator(this);
我們思考下,FrontendConnection會接收什麼請求呢?有兩種,認證請求和SQL命令請求。只有認證成功後,才會接受SQL命令請求。FrontendAuthenticator只負責認證請求,在認證成功後,將對應AbstractConnection的handler設為處理SQL請求的FrontendCommandHandler即可. 一切正常的話,這裡讀取到的應為客戶端發出的AuthPacket: AuthPacket:
packet length(3 bytes)
packet number (1)
client flags (4)
max packet (4)
charset(1)
username (null terminated string)
password (length code Binary)
database (null terminated string)
public void handle(byte[] data) { // check quit packet if (data.length == QuitPacket.QUIT.length && data[4] == MySQLPacket.COM_QUIT) { source.close("quit packet"); return; } AuthPacket auth = new AuthPacket(); auth.read(data); // check user if (!checkUser(auth.user, source.getHost())) { failure(ErrorCode.ER_ACCESS_DENIED_ERROR, "Access denied for user '" + auth.user + "' with host '" + source.getHost()+ "'"); return; } // check password if (!checkPassword(auth.password, auth.user)) { failure(ErrorCode.ER_ACCESS_DENIED_ERROR, "Access denied for user '" + auth.user + "', because password is error "); return; } // check degrade if ( isDegrade( auth.user ) ) { failure(ErrorCode.ER_ACCESS_DENIED_ERROR, "Access denied for user '" + auth.user + "', because service be degraded "); return; } // check schema switch (checkSchema(auth.database, auth.user)) { case ErrorCode.ER_BAD_DB_ERROR: failure(ErrorCode.ER_BAD_DB_ERROR, "Unknown database '" + auth.database + "'"); break; case ErrorCode.ER_DBACCESS_DENIED_ERROR: String s = "Access denied for user '" + auth.user + "' to database '" + auth.database + "'"; failure(ErrorCode.ER_DBACCESS_DENIED_ERROR, s); break; default: //認證成功,設定好使用者資料庫和許可權等,將handler設定為FrontendCommandHandler success(auth); } }
認證成功後,會發送OkPacket,對應FrontendConnection的handler變成了FrontendCommandHandler,可以接受SQL請求了。 傳送OkPacket的過程與HandshakePacket相同,這裡不再贅述。
source.write(source.writeToBuffer(AUTH_OK, buffer));
OkPacket結構:
packet length(3 bytes)
packet number (1)
0x00 (1,包體首位元組為0)
affect rows (length code Binary)
insert id (length code Binary)
server status (2)
warning status (2)
message (length code Binary)
更多網易技術、產品、運營經驗分享請點選。
相關文章:
【推薦】 Kubernetes 在網易雲中的落地優化實踐
【推薦】 網易嚴選的wkwebview測試之路