數據庫路由中間件MyCat - 源代碼篇(1)
此文已由作者張鎬薪授權網易雲社區發布。
歡迎訪問網易雲社區,了解更多網易技術產品運營經驗。
進入了源代碼篇,我們先從整體入手,之後拿一個簡單流程前端連接建立與認證作為例子,理清代碼思路和設計模式。然後,針對每一個重點模塊進行分析。
1. 整體通信與業務框架:
前端與後端通信框架都為NIO/AIO,因為目前生產上用的linux發行版內核都沒有真正實現網絡上的AIO,如果應用用AIO的話可能比NIO還要慢一些,所以,我們這裏只分析NIO相關的通信模塊。
NIOAcceptor:作為服務器接受客戶端連接(前端NIO通信)
NIOConnector:作為客戶端去連接後臺數據庫(MySql,後端NIO通信)
NIOReactor:Reactor模式的NIO,處理並轉發請求到RW線程,其實就是把對應AbstractConnection(就是NIO的channel的封裝)註冊到RW線程的selector上,只註冊讀標記;原因之後細講
NIOReactorPool:一般高性能網絡通信框架采用多Reactor(多dispatcher)模式,這裏將NIOReactor池化;每次NIOConnector接受一個連接或者NIOAcceptor請求一個連接,都會封裝成AbstractConnection,同時請求NIOReactorPool每次輪詢出一個NIOReactor,之後AbstractConnection與這個NIOReactor綁定(就是3之中說的註冊)。
RW:RW線程,負責執行NIO的channel讀寫,這裏channel封裝成了AbstractConnection
NIOSocketWR:每個前端和後端連接都有一個對應的緩沖區,對連接讀寫操作具體如何操作的方法和緩存方式,封裝到了這個類裏面。
通過上面的分析,我們大致知道了通信是由誰負責的了,但是為什麽NIOReactor只註冊讀標記?還有網絡通信channel(之後的文章我們就都用AbstractConnection代替了)讀寫有線程執行了,但是中間的業務步驟,比如SQL攔截,SQL解析還有結果合並是誰執行呢?然後,還有些定時的任務,比如檢查心跳連接等,如何執行呢? 首先,Reactor不會主動驅動寫請求,寫請求只會由業務步驟和定時任務觸發。首先看,Reactor與前端AbstractConnection還有後端AbstractConnection,接收到的請求有兩種,前端的SQL請求,還有後端的結果。但是這兩種都不能直接轉發,前端的SQL請求需要經過SQL解析等業務步驟才能寫到後端,後端的結果也需要經過業務處理才能寫到前端。所以只要執行業務步驟的線程去註冊寫標記,Reactor只要在檢查到寫標記後去寫之後取消標記即可。定時任務同理。 那麽誰去執行業務請求呢?MyCat會初始化一個BusinessExecutor線程池去處理業務請求,這個BusinessExecutor接受Reactor調度,定時任務由一個Timer線程調度並由一個TimerExecutor線程池執行。 整體結構如下所示,所有橢圓形的圖形是線程或者進程(省略了很多,比如緩沖、緩存、連接以及對應的管理,這些之後會細細介紹):
2. 前端連接建立與認證
mysql客戶端連接mysql服務器抓包:流程是:
Title:MySql連接建立以及認證過程client->MySql:1.TCP連接請求 MySql->client:2.接受TCP連接client->MySql:3.TCP連接建立MySql->client:4.握手包HandshakePacketclient->MySql:5.認證包AuthPacketMySql->client:6.如果驗證成功,則返回OkPacketclient->MySql:7.默認會發送查詢版本信息的包MySql->client:8.返回結果包
在之後的協議分析,我們會深入每一個包進行分析
2.1 (1~3)TCP連接請求->接受TCP連接->TCP連接建立
首先,接受TCP連接(為了三次握手,上面流程的前三個包)需要通過NIOAcceptor實現,NIOAcceptor主要完成綁定端口,註冊OP_ACCEPT監聽客戶端連接事件,有客戶連接,則放接受連接,將返回的channel封裝成為FrontendConnection(AbstarctConnection的子類),從NIOReactorPool中拿出一個NIOReactor並將FrontendConnection交給它綁定。 到此,NIOAcceptor就處理完一個客戶端的連接請求。
public NIOAcceptor(String name, String bindIp, int port, FrontendConnectionFactory factory, NIOReactorPool reactorPool) throws IOException { super.setName(name); this.port = port; this.selector = Selector.open(); this.serverChannel = ServerSocketChannel.open(); this.serverChannel.configureBlocking(false); //設置TCP屬性 serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 1024 * 16 * 2); // backlog=100 serverChannel.bind(new InetSocketAddress(bindIp, port), 100); //註冊OP_ACCEPT,監聽客戶端連接 this.serverChannel.register(selector, SelectionKey.OP_ACCEPT); //FrontendConnectionFactory,用來封裝channel成為FrontendConnection this.factory = factory; //NIOReactor池 this.reactorPool = reactorPool; }
構造器讀取ip,端口,前端連接工廠和NIOReactor池,初始化TCP參數,並bind,在selector上註冊OP_ACCEPT。 在NIOAcceptor啟動後:
@Override public void run() { final Selector tSelector = this.selector; for (; ; ) { ++acceptCount; try { //輪詢發現新連接請求 tSelector.select(1000L); Set<SelectionKey> keys = tSelector.selectedKeys(); try { for (SelectionKey key : keys) { if (key.isValid() && key.isAcceptable()) { //接受連接操作 accept(); } else { key.cancel(); } } } finally { keys.clear(); } } catch (Exception e) { LOGGER.warn(getName(), e); } } }
NIOAcceptor這個線程不斷輪詢接受新的客戶端連接請求,接受連接操作:
private void accept() { SocketChannel channel = null; try { //得到通信channel並設置為非阻塞 channel = serverChannel.accept(); channel.configureBlocking(false); //封裝channel為FrontendConnection FrontendConnection c = factory.make(channel); c.setAccepted(true); c.setId(ID_GENERATOR.getId()); //利用NIOProcessor管理前端鏈接,定期清除空閑連接,同時做寫隊列檢查 NIOProcessor processor = (NIOProcessor) MycatServer.getInstance() .nextProcessor(); c.setProcessor(processor); //和具體執行selector響應感興趣事件的NIOReactor綁定 NIOReactor reactor = reactorPool.getNextReactor(); reactor.postRegister(c); } catch (Exception e) { LOGGER.warn(getName(), e); closeChannel(channel); } }
NIOProcessor持有所有的前後端連接,裏面有空閑檢查和寫隊列檢查。RW線程,TimerExecutor線程池會執行裏面的方法來實現空閑寫入和定時連接檢查等等。之後我還會詳細介紹。 關閉Channel方法以及生成連接id方法很簡單,就不加註釋和贅述了。下面是他們的源代碼:
private static void closeChannel(SocketChannel channel) { if (channel == null) { return; } Socket socket = channel.socket(); if (socket != null) { try { socket.close(); } catch (IOException e) { LOGGER.error("closeChannelError", e); } } try { channel.close(); } catch (IOException e) { LOGGER.error("closeChannelError", e); } } /** * 前端連接ID生成器 * * @author mycat */ private static class AcceptIdGenerator { private static final long MAX_VALUE = 0xffffffffL; private long acceptId = 0L; private final Object lock = new Object(); private long getId() { synchronized (lock) { if (acceptId >= MAX_VALUE) { acceptId = 0L; } return ++acceptId; } } }
免費體驗雲安全(易盾)內容安全、驗證碼等服務
更多網易技術、產品、運營經驗分享請點擊。
相關文章:
【推薦】 一個小白的測試環境docker化之路
【推薦】 網易七魚 Android 高性能日誌寫入方案
【推薦】 淺談由管理者角色引出的B端產品設計思考點
數據庫路由中間件MyCat - 源代碼篇(1)