1. 程式人生 > >kafka叢集Broker端基於Reactor模式請求處理流程深入剖析-kafka商業環境實戰

kafka叢集Broker端基於Reactor模式請求處理流程深入剖析-kafka商業環境實戰

本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。期待加入IOT時代最具戰鬥力的團隊。QQ郵箱地址:[email protected],如有任何學術交流,可隨時聯絡。

1 Reactor單執行緒案例程式碼熱熱身

  • 如下是單執行緒的JAVA NIO程式設計模型。

  • 首先服務端建立ServerSocketChannel物件,並註冊到Select上OP_ACCEPT事件,然後ServerSocketChannel負責監聽指定埠上的連線請求。

  • 客戶端一旦連線上ServerSocketChannel,就會觸發Acceptor來處理OP_ACCEPT事件,併為來自客戶端的連線建立Socket Channel,並設定為非阻塞模式,並在其Selector上註冊OP_READ或者OP_WRITE,最終實現客戶端與服務端的連線建立和資料通道打通。

  • 這裡有一個明顯的問題,就是所有時間的處理邏輯都是在Acceptor單執行緒完成的,在併發連線數較小,資料量較小的場景下,是沒有問題的,但是......

  • Selector 允許一個單一的執行緒來操作多個 Channel. 如果我們的應用程式中使用了多個 Channel, 那麼使用 Selector 很方便的實現這樣的目的, 但是因為在一個執行緒中使用了多個 Channel, 因此也會造成了每個 Channel 傳輸效率的降低.

  • 優化點在於:通道連線|讀取或寫入|業務處理均採用多執行緒來處理。通過執行緒池或者MessageQueue共享佇列,進一步優化了高併發的處理要求,這樣就解決了同一時間出現大量I/O事件時,單獨的Select就可能在分發事件時阻塞(或延時),而成為瓶頸的問題。

    public class NioEchoServer { private static final int BUF_SIZE = 256; private static final int TIMEOUT = 3000;

      public static void main(String args[]) throws Exception {
          // 開啟服務端 Socket
          ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    
          // 開啟 Selector
          Selector selector = Selector.open();
    
          // 服務端 Socket 監聽8080埠, 並配置為非阻塞模式
          serverSocketChannel.socket().bind(new InetSocketAddress(8080));
          serverSocketChannel.configureBlocking(false);
    
          // 將 channel 註冊到 selector 中.
          // 通常我們都是先註冊一個 OP_ACCEPT 事件, 然後在 OP_ACCEPT 到來時, 再將這個 Channel 的 OP_READ
          // 註冊到 Selector 中.
          serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    
          while (true) {
              // 通過呼叫 select 方法, 阻塞地等待 channel I/O 可操作
              if (selector.select(TIMEOUT) == 0) {
                  System.out.print(".");
                  continue;
              }
    
              // 獲取 I/O 操作就緒的 SelectionKey, 通過 SelectionKey 可以知道哪些 Channel 的哪類 I/O 操作已經就緒.
              Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
    
              while (keyIterator.hasNext()) {
    
                  SelectionKey key = keyIterator.next();
    
                  // 當獲取一個 SelectionKey 後, 就要將它刪除, 表示我們已經對這個 IO 事件進行了處理.
                  keyIterator.remove();
    
                  if (key.isAcceptable()) {
                      // 當 OP_ACCEPT 事件到來時, 我們就有從 ServerSocketChannel 中獲取一個 SocketChannel,
                      // 代表客戶端的連線
                      // 注意, 在 OP_ACCEPT 事件中, 從 key.channel() 返回的 Channel 是 ServerSocketChannel.
                      // 而在 OP_WRITE 和 OP_READ 中, 從 key.channel() 返回的是 SocketChannel.
                      SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
                      clientChannel.configureBlocking(false);
                      //在 OP_ACCEPT 到來時, 再將這個 Channel 的 OP_READ 註冊到 Selector 中.
                      // 注意, 這裡我們如果沒有設定 OP_READ 的話, 即 interest set 仍然是 OP_CONNECT 的話, 那麼 select 方法會一直直接返回.
                      clientChannel.register(key.selector(), OP_READ, ByteBuffer.allocate(BUF_SIZE));
                  }
    
                  if (key.isReadable()) {
                      SocketChannel clientChannel = (SocketChannel) key.channel();
                      ByteBuffer buf = (ByteBuffer) key.attachment();
                      long bytesRead = clientChannel.read(buf);
                      if (bytesRead == -1) {
                          clientChannel.close();
                      } else if (bytesRead > 0) {
                          key.interestOps(OP_READ | SelectionKey.OP_WRITE);
                          System.out.println("Get data length: " + bytesRead);
                      }
                  }
    
                  if (key.isValid() && key.isWritable()) {
                      ByteBuffer buf = (ByteBuffer) key.attachment();
                      buf.flip();
                      SocketChannel clientChannel = (SocketChannel) key.channel();
    
                      clientChannel.write(buf);
    
                      if (!buf.hasRemaining()) {
                          key.interestOps(OP_READ);
                      }
                      buf.compact();
                  }
              }
          }
      }
    複製程式碼

}

2 Kafka Reactor模式設計思路

時間原因,後續補全