1. 程式人生 > >一步步動手實現高併發的Reactor模型 —— Kafka底層如何充分利用多執行緒優勢去處理網路I/O與業務分發

一步步動手實現高併發的Reactor模型 —— Kafka底層如何充分利用多執行緒優勢去處理網路I/O與業務分發

一、從《Apeche Kafka原始碼剖析》上搬來的概念和圖

Kafka網路採用的是Reactor模式,是一種基於事件驅動的模式。熟悉Java程式設計的讀者應該瞭解Java NIO提供了Reactor模式的API。常見的單執行緒Java NIO程式設計模式如圖所示。

熟悉NIO程式設計都應該知道這個Selector,我們可以通過輪詢它來獲取監聽事件,然後通過事件來進行不同的處理,比如OP_ACCEPT連線,OP_READ讀取資料等等。

這樣簡單的處理對於客戶端是沒什麼問題,但對於服務端來說就有些缺點了。在服務端,我們要求讀取請求、處理請求以及傳送響應各個環節必須能迅速完成,並且要儘可能做到互不影響。所以我們就需要對上述簡單的模型進行修改。

為了滿足高併發的需求,也為了充分利用伺服器的資源,我們對上述的架構稍作調整,將網路讀寫的邏輯與業務處理的邏輯進行拆分,讓其由不同的執行緒池來處理,如圖所示。

二、套餐一:直接擼Kafka原始碼

如果不想看本文下面這個很挫的Reactor模型,可以直接看Kafka的原始碼 ~ 如果需要稍微藉助一點中文註釋,我已經標註了十分多的註釋~ 可以直接看這個版本,基於Kafka0.10.0.1的原始碼解讀 ,當然也可以直接去看官方版本

SocketServer就是它的入口。

其中,內部類 Acceptor 負責建立並配置新連線

內部類 Processor 負責處理IO事件。

KafkaRequestHandler 這個類負責業務的處理。

而業務處理和IO之間的橋則是 RequestChannel。

三、套餐二:動手一步步實現Reactor模型

事先宣告,以下這個很挫(但也簡單)的Reactor模型只是保證它能用,而且思路和Kafka大致一致,並沒有去做很多的異常處理!!很多細節地方也做得不是很到位。

3.1 回憶一下selector是怎麼用的

        //1. 獲取服務端通道
        ServerSocketChannel ssChannel = ServerSocketChannel.open();
        ssChannel.bind(new InetSocketAddress(9898));
        //2. 設定為非阻塞模式
        ssChannel.configureBlocking(false);
        
        //3. 開啟一個監聽器
        Selector selector = Selector.open();
        //4. 向監聽器註冊接收事件
        ssChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (selector.select() > 0) {
            //5. 獲取監聽器上所有的監聽事件值
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();

            //6. 如果有值
            while (it.hasNext()) {
                //7. 取到SelectionKey
                SelectionKey key = it.next();

                //8. 根據key值判斷對應的事件
                if (key.isAcceptable()) {
                    //9. 接入處理
                    SocketChannel socketChannel = ssChannel.accept();
                    socketChannel.configureBlocking(false);
                    socketChannel.register(selector, SelectionKey.OP_READ);
                } else if (key.isReadable()) {

                    //10. 可讀事件處理
                    SocketChannel channel = (SocketChannel) key.channel();
                    readMsg(channel);
                }
                //11. 移除當前key
                it.remove();
            }
        }

這就是我們上面提到的第一張圖的模型,我們發現它的IO操作和業務處理是雜糅在一起的。當然我們簡單的做可以使用一個業務處理的執行緒池負責處理業務。

但是我們這裡是要去實現第二個圖的模型~

3.2 實現負責建立連線的Acceptor

  • 在 Acceptor 中監聽埠
    public Acceptor(InetSocketAddress inetSocketAddress, Processor[] processors) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);

        serverSocketChannel.socket()
                           .bind(inetSocketAddress);
        this.serverSocketChannel = serverSocketChannel;
        this.selector = Selector.open();
        this.processors = processors;// 先忽略這個東西 = =
    }
  • 註冊 OP_ACCEPT 事件,並且不斷輪詢進行連線的建立,kafka在初始化中大量使用了CountdownLaunch來確保初始化的成功,這裡偷懶省去這一步驟。
@Override
   public void run() {
       if (init) {
           System.out.println("已可以開始建立連線");
           init = false;
       }

       try {
           serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
       } catch (ClosedChannelException e) {
           e.printStackTrace();
       }

       int currentProcessors = 0;
       while (true) {
           try {
               int ready = selector.select(500); // 半秒輪詢一次
               if (ready > 0) {
                   Set<SelectionKey> selectionKeys = selector.selectedKeys();
                   for (SelectionKey selectionKey : selectionKeys) {
                       if (selectionKey.isAcceptable()) {
                           this.accept(selectionKey, processors[currentProcessors]);
                           currentProcessors = (currentProcessors + 1) % processors.length;
                       } else {
                           throw new RuntimeException("不應該出現的情況,因為只訂閱了OP_ACCEPT");
                       }
                   }
               }
           } catch (IOException e) {
               e.printStackTrace();
           }
       }
   }
	
	// 建立連線,並且使用RoundRobin分配給一個Processor,也就是負責IO的角色
   public void accept(SelectionKey selectionKey, Processor processor) throws IOException {
       SelectableChannel channel = selectionKey.channel();
       SocketChannel socketChannel = ((ServerSocketChannel) channel).accept();
       socketChannel.configureBlocking(false);
       socketChannel.socket()
                    .setTcpNoDelay(true);
       socketChannel.socket()
                    .setKeepAlive(true);

       // 將需要連線的socketChannel轉交給processor去處理
       processor.accept(socketChannel);
   }

3.3 實現負責處理IO的Processor

  • 新連線進來後的處理:這裡只是簡單將新建立的連線放在了newConnection中。
    public Processor(String name, RequestChannel requestChannel, ConcurrentHashMap<SelectionKey, ArrayBlockingQueue<ByteBuffer>> inFlightResponse) throws IOException {
        this.name = name;
        this.newConnection = new ConcurrentLinkedQueue<>();
        this.selector = Selector.open();
        this.inFlightResponse = inFlightResponse;
        this.requestChannel = requestChannel;
    }
	
    protected void accept(SocketChannel socketChannel) {
        try {
            System.out.println(name + "正在與" + socketChannel.getLocalAddress() + "建立連線");
        } catch (IOException e) {
            e.printStackTrace();
        }
        newConnection.add(socketChannel);
        // 還需要wakeUp,如果輪詢阻塞了,告訴它可以不阻塞了
        selector.wakeup();
    }
  • 處理newConnection,並註冊OP_READ,等待客戶端傳輸資料
    @Override
    public void run() {
        while (true) {

            /*
             * 處理新連結
             */
            while (!newConnection.isEmpty()) {
                SocketChannel socketChannel = newConnection.poll();
                try {
                    socketChannel.register(selector, SelectionKey.OP_READ);
                } catch (ClosedChannelException e) {
                    e.printStackTrace();
                }
            }

新接收到的資料,我們會將其丟進 RequestChannel,並取消關注OP_READ,保證不會讓多個請求同時進來。

requestChannel.sendRequest(new Request(selectionKey, byteBuffer));// 接受完資料後,把資料丟進佇列

而最新處理完的資料,我們則會將其快取在 inFlightRequest ,並關注OP_WIRTE。這是仿照 Kafka 的 inFlightRequest 做的,當然做得很粗糙。

Kafka 的 inFlightRequest 是將對應每個節點請求/應答的請求和響應放在了佇列中,確保在同一時間段內,一個節點只會有一個請求和應答。這也巧妙的避開了拆包粘包問題,首先 Kafka 保證了不會同時對一個節點發送請求,其次,Kafka 使用了自定的協議(其實就是包頭上標明瞭整個包的長度再加上CRC校驗)來保證一次請求的完整性。

我們的Selector輪詢中,會將剛才在上一步中關注了OP_WRITE的SelectionKey連同要返回的資料一同拿出,並進行處理,處理完成後,取消關注OP_WRITE,並重新關注OP_READ。

  • 處理新請求與新應答,我們將READ事件和WRITE事件放在了Processor來進行。

    /*
     * 將新應答放入緩衝佇列
     */
    Response response = requestChannel.receiveResponse();
    while (response != null) {
        SelectionKey key = response.getSelectionKey();
        key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);

        ArrayBlockingQueue<ByteBuffer> inFlight = inFlightResponse.getOrDefault(response.getSelectionKey(), new ArrayBlockingQueue<>(100));
        inFlightResponse.put(response.getSelectionKey(), inFlight);
        try {
            inFlight.put(response.getByteBuffer());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        response = requestChannel.receiveResponse();
    }

    int ready = selector.select(500);// 半秒輪詢一次
    if (ready > 0) {
        Set<SelectionKey> selectionKeys = selector.selectedKeys();
        for (SelectionKey selectionKey : selectionKeys) {
    
            /*
            * 處理新請求
            */
            if (selectionKey.isReadable()) {
              System.out.println(name + "正在處理新請求");
              SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
              ByteBuffer byteBuffer = ByteBuffer.allocate(1024);// 懶得定協議,就預設取這麼多吧 = =
              socketChannel.read(byteBuffer);// TODO 劃重點
              byteBuffer.flip();
              requestChannel.sendRequest(new Request(selectionKey, byteBuffer));// 接受完資料後,把資料丟進佇列
              selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_READ);// 不再關注read
            }
    
            /*
            * 處理新應答
            */
            if (selectionKey.isWritable()) {
              System.out.println(name + "正在處理新應答");
              ByteBuffer send = inFlightResponse.get(selectionKey)// // TODO 劃重點
                                                .poll();
              SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
              socketChannel.write(send);
              selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_WRITE);
              selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_READ);
            }
        }
    }
  • RequestChannel的實現實際上十分簡單..就是兩個佇列

/**
 * Created by Anur IjuoKaruKas on 2018/12/13
 */
public class RequestChannel {

    private ArrayBlockingQueue<Request> requestQueue;

    private ArrayBlockingQueue<Response> responseQueue;

    public RequestChannel() {
        requestQueue = new ArrayBlockingQueue<>(100);
        responseQueue = new ArrayBlockingQueue<>(100);
    }
	..........
}

3.4 實現負責處理業務的Handler

很容易想到,Handler 實際上就是負責從 RequestChannel 的 requestQueue 中拉取需要處理的資料,並塞回 RequestChannel 的 responseQueue 中。

我們可以根據接收資料的不同,來進行不同的業務處理。甚至如果需要拓展,這裡可以像 netty 一樣,僅僅把 Handler 當成Boss,具體業務的執行可以建立相應的執行緒池去進行處理,比如說 Fetch 業務比較耗時,我可以建立一個較大的執行緒池,去執行Fetch業務,而 Hello 業務,我們只需要 Executors.newSingleThreadExecutor() 即可。

   @Override
    public void run() {
        while (true) {
            Request request = requestChannel.receiveRequest();
            if (request != null) {
                System.out.println("接收的請求將由" + name + "進行處理");
                handler(request.getSelectionKey(), request.getByteBuffer());
            }
        }
    }
	
    public void handler(SelectionKey selectionKey, ByteBuffer byteBuffer) {
        byte[] bytes = byteBuffer.array();

        String msg = new String(bytes);
        try {
            Thread.sleep(500);        // 模擬業務處理
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        ByteBuffer response;
        if (msg.startsWith("Fetch")) {
            response = ByteBuffer.allocate(2048);
            response.put("Fetch ~~~~~~~~~~".getBytes());
            response.put(bytes);
            response.flip();
        } else if (msg.startsWith("Hello")) {
            response = ByteBuffer.allocate(2048);
            response.put("Hi ~~~~~~~~~~".getBytes());
            response.put(bytes);
            response.flip();
        } else {
            response = ByteBuffer.allocate(2048);
            response.put("Woww ~~~~~~~~~~".getBytes());
            response.put(bytes);
            response.flip();
        }
        System.out.println(name + "處理完畢,正將處理結果返回給Processor");
        requestChannel.sendResponse(new Response(selectionKey, response));
    }

3.5 執行我們很挫的模型

我們會發現現在這個很挫的 Reactor 模型的拓展性卻很好,大頭的兩個 Processor 和 Handler 都是可以隨意拓展數量的。Kafka 也是這麼做的,不過 Kafka 是根據伺服器核心的數量來建立 processor 和 handler 的:

// processors的建立

    val protocol = endpoint.protocolType
    // 網路協議
    val processorEndIndex = processorBeginIndex + numProcessorThreads

    for (i <- processorBeginIndex until processorEndIndex)
        processors(i) = newProcessor(i, connectionQuotas, protocol) // 建立Processor

    // 在這裡面會  // 迴圈啟動processor執行緒
    val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
        processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas) // 建立Acceptor


// handlers的建立

 // 儲存KafkaRequestHandler的執行執行緒
  val threads = new Array[Thread](numThreads)

  // KafkaRequestHandler集合
  val runnables = new Array[KafkaRequestHandler](numThreads)

  for (i <- 0 until numThreads) {
    runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)
    threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
    threads(i).start()
  }

這裡進行簡單處理,我將所有的東西統統扔進一個執行緒池。

執行一下我們的整個模型,然後我們使用 Hercules 模擬客戶端對我們的伺服器進行請求。

/**
 * Created by Anur IjuoKaruKas on 2018/12/12
 */
public class Reactor {

    public static final int PORT = 9999;

    public static void main(String[] args) throws IOException {
        RequestChannel requestChannel = new RequestChannel();
        ConcurrentHashMap<SelectionKey, ArrayBlockingQueue<ByteBuffer>> inFlightResponse = new ConcurrentHashMap<>();

        Processor processor1 = new Processor("p1", requestChannel, inFlightResponse);
        Processor processor2 = new Processor("p2", requestChannel, inFlightResponse);
        Acceptor acceptor = new Acceptor(new InetSocketAddress(PORT), new Processor[] {
            processor1,
            processor2
        });

        ExecutorService executorService = Executors.newFixedThreadPool(10);
        executorService.execute(acceptor);

        executorService.execute(processor1);
        executorService.execute(processor2);

        Handler handler1 = new Handler("h1", requestChannel);
        Handler handler2 = new Handler("h2", requestChannel);
        executorService.execute(handler1);
        executorService.execute(handler2);
    }
}

建立連線後,我們依次傳送 Hello baby,Fetch msg 和 nyanyanya。

得到如下響應:

並且伺服器日誌如下:

我們發現,h1和h2都會從RequestChannel中獲取任務來進行執行~ 當然如果連線增多,p1和p2也會從交替從Accept中獲取新的連線。

具體的程式碼請點選這裡,直接拉取下來即可執行,執行的主類是 src/reactor/Reactor

覺得好的話可以順手為文章點個贊喲~謝謝各位看官老爺!


參考文獻:

《Apeche Kafka原始碼剖析》—— 徐郡明著

Kafka 原始碼 0.10.0.1