資料庫路由中介軟體MyCat - 原始碼篇(5)
此文已由作者張鎬薪授權網易雲社群釋出。
歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。
3. 連線模組
如之前所述,MyCat的連線分為前端和後端,下面是連線基本相關類圖:
3.1 ClosableConnection:
public interface ClosableConnection { String getCharset(); //關閉連線 void close(String reason); boolean isClosed(); public void idleCheck(); long getStartupTime(); String getHost(); int getPort(); int getLocalPort(); long getNetInBytes(); long getNetOutBytes(); }
根據字面意思,一個可以關閉的連線需要實現關閉方法-_-,並且需要原因判斷是否是正常關閉。MySQL的通訊都需要指定字符集。MyCat伺服器建立ServerSocket時輸入的埠為伺服器在其上面監聽客戶的連線,當有客戶連線時,在隨機選擇一個沒用的埠與客戶端通訊;建立客戶socket時輸入的為服務端的監聽埠,在本地選擇一個未用埠與伺服器通訊,至於伺服器怎麼知道和客戶端的哪個埠通訊,和客戶端怎麼知道和服務端的哪個埠通訊(因為這兩個埠都是隨機生成的),tcp是採用"三次握手"建立連線,而udp則是每次傳送資訊時將埠號放在ip報文的資料段裡面。所以,連線裡面需要提供獲得監聽埠和服務埠的方法。此外,還需要檢查連線是否為空閒狀態(idle)。最後,需要一些統計資料。
3.2 NIOConnection:
public interface NIOConnection extends ClosableConnection { //connected void register() throws IOException; //處理資料 void handle(byte[] data); // 寫出一塊緩衝資料 void write(ByteBuffer buffer); }
所有NIO的通訊需要在多路複用選擇器上註冊channel,這裡有個對應的register()方法需要實現。然後,讀取和寫入資料都需要通過緩衝。緩衝區(Buffer)就是在記憶體中預留指定大小的儲存空間用來對輸入/輸出(I/O)的資料作臨時儲存,這部分預留的記憶體空間就叫做緩衝區,使用緩衝區有這麼兩個好處:
減少實際的物理讀寫次數
緩衝區在建立時就被分配記憶體,這塊記憶體區域一直被重用,可以減少動態分配和回收記憶體的次數 讀取到的資料需要經過處理,這裡對應的就是handle(byte[])方法。
3.3 AbstractConnection:
從上面的實體圖,我們發現,AbstractConnection其實就是把Java的NetworkChannel進行封裝,同時需要依賴其他幾個類來完成他所需要的操作,如下: 其中,NIOProcessor是對AbstractConnection實現NIO讀寫的方法類,NIOHandler是處理AbstractConnection讀取的資料的處理方法類,NIOSocketWR是執行以上方法的執行緒類。
3.3.1 NIOProcessor:
NIOProcessor的構建方法:
public NIOProcessor(String name, BufferPool bufferPool, NameableExecutor executor) throws IOException { this.name = name; this.bufferPool = bufferPool; this.executor = executor; this.frontends = new ConcurrentHashMap<Long, FrontendConnection>(); this.backends = new ConcurrentHashMap<Long, BackendConnection>(); this.commands = new CommandCount(); }
呼叫位置: MyCatServer.java
... bufferPool = new BufferPool(processBuferPool, processBufferChunk, socketBufferLocalPercent / processorCount); businessExecutor = ExecutorUtil.create("BusinessExecutor", threadPoolSize); ...for (int i = 0; i < processors.length; i++) { processors[i] = new NIOProcessor("Processor" + i, bufferPool, businessExecutor); } ...
每個MyCat例項會初始化processors個NIOProcessor,每個NIOProcessor公用同一個bufferPool和businessExecutor。 bufferPool是緩衝池,BufferPool這個類負責緩衝統一管理 businessExecutor如之前所述,是業務執行緒池。 NIOProcessor被池化,很簡單,就是儲存到陣列中,通過MyCatServer的nextProcessor()方法輪詢獲取一個NIOProcessor,之後每個AbstractConnection通過setNIOProcessor方法,設定NIOProcessor。
public NIOProcessor nextProcessor() { int i = ++nextProcessor; if (i >= processors.length) { i = nextProcessor = 0; } return processors[i]; }
可以看出,每個AbstractConnection依賴於一個NIOProcessor,每個NIOProcessor儲存著多個AbstractConnection。AbstractConnection分為FrontendConnection和BackendConnection被分別儲存在NIOProcessor的frontends和backends這兩個ConcurrentHashMap中。 用ConcurrentHashMap是因為NIOAcceptor和NIOConnector執行緒以及RW執行緒池都會訪問這兩個變數。 NIOProcessor其實主要負責連線資源的管理: MyCat會定時檢查前端和後端空閒連線,並清理和回收資源: MyCatServer.java:
// 處理器定時檢查任務 private TimerTask processorCheck() { return new TimerTask() { @Override public void run() { timerExecutor.execute(new Runnable() { @Override public void run() { try { for (NIOProcessor p : processors) { p.checkBackendCons(); } } catch (Exception e) { LOGGER.warn("checkBackendCons caught err:" + e); } } }); timerExecutor.execute(new Runnable() { @Override public void run() { try { for (NIOProcessor p : processors) { p.checkFrontCons(); } } catch (Exception e) { LOGGER.warn("checkFrontCons caught err:" + e); } } }); } }; }
檢查前端連線,回收空閒資源:
/** * 定時執行該方法,回收部分資源。 */ public void checkFrontCons() { frontendCheck(); } private void frontendCheck() { Iterator<Entry<Long, FrontendConnection>> it = frontends.entrySet() .iterator(); while (it.hasNext()) { FrontendConnection c = it.next().getValue(); // 刪除空連線 if (c == null) { it.remove(); this.frontendsLength.decrementAndGet(); continue; } // 清理已關閉連線,否則空閒檢查。 if (c.isClosed()) { c.cleanup(); it.remove(); this.frontendsLength.decrementAndGet(); } else { // very important ,for some data maybe not sent checkConSendQueue(c); c.idleCheck(); } } }
在關閉前端連線時,會清理連線佔用的快取資源: FrontendConnection.java:
protected void cleanup() { //回收讀緩衝 if (readBuffer != null) { recycle(readBuffer); this.readBuffer = null; this.readBufferOffset = 0; } //回收寫緩衝 if (writeBuffer != null) { recycle(writeBuffer); this.writeBuffer = null; } //回收壓縮協議棧編碼解碼佇列 if(!decompressUnfinishedDataQueue.isEmpty()) { decompressUnfinishedDataQueue.clear(); } if(!compressUnfinishedDataQueue.isEmpty()) { compressUnfinishedDataQueue.clear(); } //回收寫佇列 ByteBuffer buffer = null; while ((buffer = writeQueue.poll()) != null) { recycle(buffer); } }
後端連線檢查,除了要清理已關閉的連線,還有要檢查SQL執行時間是否超時:
/** * 定時執行該方法,回收部分資源。 */ public void checkBackendCons() { backendCheck(); } // 後端連線檢查 private void backendCheck() { long sqlTimeout = MycatServer.getInstance().getConfig().getSystem().getSqlExecuteTimeout() * 1000L; Iterator<Entry<Long, BackendConnection>> it = backends.entrySet().iterator(); while (it.hasNext()) { BackendConnection c = it.next().getValue(); // 刪除空連線 if (c == null) { it.remove(); continue; } // SQL執行超時的連線關閉 if (c.isBorrowed() && c.getLastTime() < TimeUtil.currentTimeMillis() - sqlTimeout) { LOGGER.warn("found backend connection SQL timeout ,close it " + c); c.close("sql timeout"); } // 清理已關閉連線,否則空閒檢查。 if (c.isClosed()) { it.remove(); } else { // very important ,for some data maybe not sent if (c instanceof AbstractConnection) { checkConSendQueue((AbstractConnection) c); } c.idleCheck(); } } }
同時,在檢查連線是否關閉時,需要檢查寫佇列是否為空。寫佇列不為空,證明還有請求沒有響應。需要將寫佇列的剩餘請求非同步寫出,通過NIOSocketWR。
private void checkConSendQueue(AbstractConnection c) { // very important ,for some data maybe not sent if (!c.writeQueue.isEmpty()) { c.getSocketWR().doNextWriteCheck(); } }
更多網易技術、產品、運營經驗分享請點選。
相關文章:
【推薦】 在一臺伺服器上搭建相對高可用HiveServer實踐
【推薦】 大公司怎麼做Android程式碼混淆的?