資料庫路由中介軟體MyCat - 原始碼篇(2)
此文已由作者張鎬薪授權網易雲社群釋出。
歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。
2. 前端連線建立與認證
Title:MySql連線建立以及認證過程client->MySql:1.TCP連線請求 MySql->client:2.接受TCP連線client->MySql:3.TCP連線建立MySql->client:4.握手包HandshakePacketclient->MySql:5.認證包AuthPacketMySql->client:6.如果驗證成功,則返回OkPacketclient->MySql:7.預設會發送查詢版本資訊的包MySql->client:8.返回結果包
2.2 (4)握手包HandshakePacket
NIOReactor其實就是一個網路事件反應轉發器。 很多地方會用到NIOReactor,這裡先講FrontendConnection和NIOReactor繫結這一部分。上一節說到,NIOAcceptor的accept()最後將FrontendConnection交給了NIOReactor池其中的一個NIOReactor。呼叫的是 postRegister(AbstractConnection c)方法。
final void postRegister(AbstractConnection c) { reactorR.registerQueue.offer(c); reactorR.selector.wakeup(); }
postRegister將剛才傳入的FrontendConnection放入RW執行緒的註冊佇列。之後,喚醒RW執行緒的selector。 為什麼放入RW執行緒的註冊佇列,而不是直接註冊呢?如果是直接註冊,那麼就是NIOAcceptor這個執行緒負責註冊,這裡就會有鎖競爭,因為NIOAcceptor這個執行緒和每個RW執行緒會去競爭selector的鎖。這樣NIOAcceptor就不能高效的處理連線。所以,更好的方式是將FrontendConnection放入RW執行緒的註冊佇列,之後讓RW執行緒自己完成註冊工作。 RW執行緒的原始碼:
private final class RW implements Runnable { private final Selector selector; private final ConcurrentLinkedQueue<AbstractConnection> registerQueue; private long reactCount; private RW() throws IOException { this.selector = Selector.open(); this.registerQueue = new ConcurrentLinkedQueue<AbstractConnection>(); } @Override public void run() { final Selector selector = this.selector; Set<SelectionKey> keys = null; for (;;) { ++reactCount; try { selector.select(500L); //從註冊佇列中取出AbstractConnection之後註冊讀事件 //之後做一些列操作,請參考下面註釋 register(selector); keys = selector.selectedKeys(); for (SelectionKey key : keys) { AbstractConnection con = null; try { Object att = key.attachment(); if (att != null) { con = (AbstractConnection) att; if (key.isValid() && key.isReadable()) { try { //非同步讀取資料並處理資料 con.asynRead(); } catch (IOException e) { con.close("program err:" + e.toString()); continue; } catch (Exception e) { LOGGER.debug("caught err:", e); con.close("program err:" + e.toString()); continue; } } if (key.isValid() && key.isWritable()) { //非同步寫資料 con.doNextWriteCheck(); } } else { key.cancel(); } } catch (CancelledKeyException e) { if (LOGGER.isDebugEnabled()) { LOGGER.debug(con + " socket key canceled"); } } catch (Exception e) { LOGGER.warn(con + " " + e); } } } catch (Exception e) { LOGGER.warn(name, e); } finally { if (keys != null) { keys.clear(); } } } } private void register(Selector selector) { AbstractConnection c = null; if (registerQueue.isEmpty()) { return; } while ((c = registerQueue.poll()) != null) { try { //註冊讀事件 ((NIOSocketWR) c.getSocketWR()).register(selector); //連線註冊,對於FrontendConnection是傳送HandshakePacket並非同步讀取響應 //響應為AuthPacket,讀取其中的資訊,驗證使用者名稱密碼等資訊,如果符合條件 //則傳送OkPacket c.register(); } catch (Exception e) { c.close("register err" + e.toString()); } } } }
因為NIOAcceptor執行緒和RW執行緒這兩個都會操作RW執行緒的註冊佇列,所以要用ConcurrentLinkedQueue RW執行緒不斷檢查selector中需要響應的事件,並如果註冊佇列不為空,就不斷註冊其中的AbstractConnection,在這裡就是FrontendConnection。 之後執行FrontendConnection的register()方法:
@Override public void register() throws IOException { if (!isClosed.get()) { // 生成認證資料 byte[] rand1 = RandomUtil.randomBytes(8); byte[] rand2 = RandomUtil.randomBytes(12); // 儲存認證資料 byte[] seed = new byte[rand1.length + rand2.length]; System.arraycopy(rand1, 0, seed, 0, rand1.length); System.arraycopy(rand2, 0, seed, rand1.length, rand2.length); this.seed = seed; // 傳送握手資料包 HandshakePacket hs = new HandshakePacket(); hs.packetId = 0; hs.protocolVersion = Versions.PROTOCOL_VERSION; hs.serverVersion = Versions.SERVER_VERSION; hs.threadId = id; hs.seed = rand1; hs.serverCapabilities = getServerCapabilities(); hs.serverCharsetIndex = (byte) (charsetIndex & 0xff); hs.serverStatus = 2; hs.restOfScrambleBuff = rand2; // 非同步寫,本節就講到這裡 hs.write(this); // 非同步讀取並處理,這個與RW執行緒中的asynRead()相同,之後客戶端收到握手包返回AuthPacket(就是下一節)就是從這裡開始看。 this.asynRead(); } }
這個方法就是生成HandshakePacket併發送出去,之後非同步讀取響應。 之前的示例中MySql的HandshakePacket結構: 可以總結出: HandshakePacket:
packet length(3 bytes)
packet number (1)
protocol version (1)
version (null terminated string)
thread id (4)
salt (8)
server capabilities (2)
server charset (1)
server status (2)
unused (13)
salt (12)
0x00 --- 結束
這裡我們看下MyCat中的實現這一部分MySql協議棧的packet類結構: 這裡可以看出,每個包都實現了自己的包長度和資訊方法,並且針對前段後端連線都有讀寫方法實現,所以,之後讀寫資料都會根據場景不同調用這些類中的方法。這些包就是整個MySql協議棧除邏輯外的內容實現。 HandshakePacket.write(FrontendConnection c)方法將上面傳入的資料封裝成ByteBuffer,並傳入給FrontendConnection c的write(ByteBuffer buffer),這個方法直接繼承自AbstractConnection:
public final void write(ByteBuffer buffer) { //首先判斷是否為壓縮協議 if(isSupportCompress()) { //CompressUtil為壓縮協議輔助工具類 ByteBuffer newBuffer= CompressUtil.compressMysqlPacket(buffer,this,compressUnfinishedDataQueue); //將要寫的資料先放入寫快取佇列 writeQueue.offer(newBuffer); } else { //將要寫的資料先放入寫快取佇列 writeQueue.offer(buffer); } try { //處理寫事件,這個方法比較複雜,需要重點分析其思路 this.socketWR.doNextWriteCheck(); } catch (Exception e) { LOGGER.warn("write err:", e); this.close("write err:" + e); } }
如程式碼註釋中所述,先將要寫的資料放入寫緩衝佇列,之後呼叫NIOSocketWR.doNextWriteCheck()處理寫事件。
public void doNextWriteCheck() { //檢查是否正在寫,看CAS更新writing值是否成功 if (!writing.compareAndSet(false, true)) { return; } try { //利用快取佇列和寫緩衝記錄保證寫的可靠性,返回true則為全部寫入成功 boolean noMoreData = write0(); //因為只有一個執行緒可以成功CAS更新writing值,所以這裡不用再CAS writing.set(false); //如果全部寫入成功而且寫入佇列為空(有可能在寫入過程中又有新的Bytebuffer加入到佇列),則取消註冊寫事件 //否則,繼續註冊寫事件 if (noMoreData && con.writeQueue.isEmpty()) { if ((processKey.isValid() && (processKey.interestOps() & SelectionKey.OP_WRITE) != 0)) { disableWrite(); } } else { if ((processKey.isValid() && (processKey.interestOps() & SelectionKey.OP_WRITE) == 0)) { enableWrite(false); } } } catch (IOException e) { if (AbstractConnection.LOGGER.isDebugEnabled()) { AbstractConnection.LOGGER.debug("caught err:", e); } con.close("err:" + e); } } private boolean write0() throws IOException { int written = 0; ByteBuffer buffer = con.writeBuffer; if (buffer != null) { //只要寫緩衝記錄中還有資料就不停寫入,但如果寫入位元組為0,證明網路繁忙,則退出 while (buffer.hasRemaining()) { written = channel.write(buffer); if (written > 0) { con.netOutBytes += written; con.processor.addNetOutBytes(written); con.lastWriteTime = TimeUtil.currentTimeMillis(); } else { break; } } //如果寫緩衝中還有資料證明網路繁忙,計數並退出,否則清空緩衝 if (buffer.hasRemaining()) { con.writeAttempts++; return false; } else { con.writeBuffer = null; con.recycle(buffer); } } //讀取快取佇列並寫channel while ((buffer = con.writeQueue.poll()) != null) { if (buffer.limit() == 0) { con.recycle(buffer); con.close("quit send"); return true; } buffer.flip(); while (buffer.hasRemaining()) { written = channel.write(buffer); if (written > 0) { con.lastWriteTime = TimeUtil.currentTimeMillis(); con.netOutBytes += written; con.processor.addNetOutBytes(written); con.lastWriteTime = TimeUtil.currentTimeMillis(); } else { break; } } //如果寫緩衝中還有資料證明網路繁忙,計數,記錄下這次未寫完的資料到寫緩衝記錄並退出,否則回收緩衝 if (buffer.hasRemaining()) { con.writeBuffer = buffer; con.writeAttempts++; return false; } else { con.recycle(buffer); } } return true; } private void disableWrite() { try { SelectionKey key = this.processKey; key.interestOps(key.interestOps() & OP_NOT_WRITE); } catch (Exception e) { AbstractConnection.LOGGER.warn("can't disable write " + e + " con " + con); } } private void enableWrite(boolean wakeup) { boolean needWakeup = false; try { SelectionKey key = this.processKey; key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); needWakeup = true; } catch (Exception e) { AbstractConnection.LOGGER.warn("can't enable write " + e); } if (needWakeup && wakeup) { processKey.selector().wakeup(); } }
註釋已經很詳細,如此執行完,便成功將握手包傳送給了客戶端。 在這裡稍微吐槽下,由於MyCat在網路通訊上同時做了AIO和NIO,但是在設計上AbstractionConnection和這些並沒有關係。但是又涉及到快取佇列,所以設計上出現了一些如下的類模式: 這樣應該是不推薦這麼設計的,目前我還沒想好如何去改善
更多網易技術、產品、運營經驗分享請點選。
相關文章:
【推薦】 一行程式碼搞定Dubbo介面呼叫
【推薦】 網易美學-系統架構系列1-分散式與服務化
【推薦】 Vue框架核心之資料劫持