kafka叢集Broker端基於Reactor模式請求處理流程深入剖析-kafka商業環境實戰
本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。期待加入IOT時代最具戰鬥力的團隊。QQ郵箱地址:[email protected],如有任何學術交流,可隨時聯絡。

1 Reactor單執行緒案例程式碼熱熱身
-
如下是單執行緒的JAVA NIO程式設計模型。
-
首先服務端建立ServerSocketChannel物件,並註冊到Select上OP_ACCEPT事件,然後ServerSocketChannel負責監聽指定埠上的連線請求。
-
客戶端一旦連線上ServerSocketChannel,就會觸發Acceptor來處理OP_ACCEPT事件,併為來自客戶端的連線建立Socket Channel,並設定為非阻塞模式,並在其Selector上註冊OP_READ或者OP_WRITE,最終實現客戶端與服務端的連線建立和資料通道打通。
-
這裡有一個明顯的問題,就是所有時間的處理邏輯都是在Acceptor單執行緒完成的,在併發連線數較小,資料量較小的場景下,是沒有問題的,但是......
-
Selector 允許一個單一的執行緒來操作多個 Channel. 如果我們的應用程式中使用了多個 Channel, 那麼使用 Selector 很方便的實現這樣的目的, 但是因為在一個執行緒中使用了多個 Channel, 因此也會造成了每個 Channel 傳輸效率的降低.
-
優化點在於:通道連線|讀取或寫入|業務處理均採用多執行緒來處理。通過執行緒池或者MessageQueue共享佇列,進一步優化了高併發的處理要求,這樣就解決了同一時間出現大量I/O事件時,單獨的Select就可能在分發事件時阻塞(或延時),而成為瓶頸的問題。
public class NioEchoServer { private static final int BUF_SIZE = 256; private static final int TIMEOUT = 3000;
public static void main(String args[]) throws Exception { // 開啟服務端 Socket ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 開啟 Selector Selector selector = Selector.open(); // 服務端 Socket 監聽8080埠, 並配置為非阻塞模式 serverSocketChannel.socket().bind(new InetSocketAddress(8080)); serverSocketChannel.configureBlocking(false); // 將 channel 註冊到 selector 中. // 通常我們都是先註冊一個 OP_ACCEPT 事件, 然後在 OP_ACCEPT 到來時, 再將這個 Channel 的 OP_READ // 註冊到 Selector 中. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { // 通過呼叫 select 方法, 阻塞地等待 channel I/O 可操作 if (selector.select(TIMEOUT) == 0) { System.out.print("."); continue; } // 獲取 I/O 操作就緒的 SelectionKey, 通過 SelectionKey 可以知道哪些 Channel 的哪類 I/O 操作已經就緒. Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); // 當獲取一個 SelectionKey 後, 就要將它刪除, 表示我們已經對這個 IO 事件進行了處理. keyIterator.remove(); if (key.isAcceptable()) { // 當 OP_ACCEPT 事件到來時, 我們就有從 ServerSocketChannel 中獲取一個 SocketChannel, // 代表客戶端的連線 // 注意, 在 OP_ACCEPT 事件中, 從 key.channel() 返回的 Channel 是 ServerSocketChannel. // 而在 OP_WRITE 和 OP_READ 中, 從 key.channel() 返回的是 SocketChannel. SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept(); clientChannel.configureBlocking(false); //在 OP_ACCEPT 到來時, 再將這個 Channel 的 OP_READ 註冊到 Selector 中. // 注意, 這裡我們如果沒有設定 OP_READ 的話, 即 interest set 仍然是 OP_CONNECT 的話, 那麼 select 方法會一直直接返回. clientChannel.register(key.selector(), OP_READ, ByteBuffer.allocate(BUF_SIZE)); } if (key.isReadable()) { SocketChannel clientChannel = (SocketChannel) key.channel(); ByteBuffer buf = (ByteBuffer) key.attachment(); long bytesRead = clientChannel.read(buf); if (bytesRead == -1) { clientChannel.close(); } else if (bytesRead > 0) { key.interestOps(OP_READ | SelectionKey.OP_WRITE); System.out.println("Get data length: " + bytesRead); } } if (key.isValid() && key.isWritable()) { ByteBuffer buf = (ByteBuffer) key.attachment(); buf.flip(); SocketChannel clientChannel = (SocketChannel) key.channel(); clientChannel.write(buf); if (!buf.hasRemaining()) { key.interestOps(OP_READ); } buf.compact(); } } } } 複製程式碼
}