1. 程式人生 > >Netty執行緒模型及EventLoop和EventLoopGroup原始碼解析

Netty執行緒模型及EventLoop和EventLoopGroup原始碼解析

 1、netty執行緒模型

  一般在討論netty的執行緒模型的時候,我們會考慮到經典的Reactor執行緒模型,下面分別說明下經典的Reactor執行緒模型

1、1 Reactor單執行緒模型

     這個執行緒模型指的是所有的nio操作都是在一個執行緒中去完成的。nio執行緒的職責如下:      作為nio服務端,接收客戶端的tcp連線;      作為no客戶端,向服務端發起tcp連線;      讀取通訊對端的請求或是應答訊息;      向通訊對端傳送訊息或者應答訊息。      其單執行緒模型如下圖所示: 由於Reactor模式是非同步非阻塞i/o,所有的i/o操作都不會導致阻塞。理論上一個執行緒可以獨立處理所有的i/o相關的操作。例如,通過Acceptor接收客戶端的連線,當鏈路建立完成後通過Dispatch將對應的ByteBuffer派發到指定的handler上,進行訊息解碼。使用者執行緒訊息編碼後通過nio執行緒將訊息傳送給客戶端。
在一些小容量的應用場景下,可以使用單執行緒模型,但對於高負載,大併發的應用場景確不合適,主要原因如下: 一個nio執行緒同時處理成百上千的鏈路,效能無法滿足,即便nio執行緒的cpu達到100%,也無法滿足海量的訊息編碼、解碼、讀取和傳送。 nio執行緒負載過重,處理速度變慢,這會導致大量的客戶端的連線超時,超時之後往往會進行重發,這更加加重了nio執行緒的負載,最終導致大量訊息積壓和處理超時,成為效能瓶頸。 可靠性問題:一旦nio執行緒意外跑飛,或者進入死迴圈,會導致整個系統通訊模組不可用,不能接收和處理外部訊息,造成節點故障。

1、2 Reactor多執行緒模型

它與單執行緒模型最大的區別就是多了一組nio執行緒池來處理io操作,其模型如下:
特點如下: 有一個nio執行緒負責處理客戶端的連線; 增加了一組nio執行緒池來處理網路io操作; 在絕大數情況下,Reactor多執行緒模型可以滿足效能需求。但是,在個別特殊場景中,一個nio執行緒負責監聽和處理所有客戶端的連線可能會存在效能問題。例如併發百萬客戶端連線,或者服務端需要對客戶端握手進行安全認證,但是認證本身非常損耗效能。在這類場景下Acceptor執行緒會存在效能不足的問題,為了解決這個問題,產生了第三種模型,主從Reactor模型;

1、3 Reactor主從多執行緒模型

Reactor主從多執行緒模型,用於監聽客戶端的連線不在是一個Nio執行緒了,它是一個nio執行緒池進行監聽客戶端的連線包括安全認證,當鏈路建立後就將網路讀取的操作放在另外一個執行緒去進行讀取。其模型如下:
    利用多執行緒模型可以有效處理一個執行緒無法處理多個客戶端連線請求的情況,在netty官方Demo中,推薦使用該執行緒模型。

1、4 Netty執行緒模型

netty的執行緒模型不是一層不變的,它取決於使用者的啟動引數配置。通過設定不同的啟動引數,netty可以同時支援Reactor單執行緒模型、多執行緒模型、主從Reactor執行緒模型。 netty的執行緒模型如下: netty服務端啟動程式碼如下:
  1. /配置服務端的nio執行緒組,
  2. //EventLoopGroup是一個執行緒組,它包含了一組nio執行緒,專門用於網路事件的處理,實際上他們就是Reactor執行緒組
  3. //這裡建立2個的原因是一個用於服務端接受客戶的連線,另一個用於SockentChannel的網路讀寫。
  4.         EventLoopGroup bossGroup = new NioEventLoopGroup();  
  5.         EventLoopGroup workerGroup = new NioEventLoopGroup();  
  6. try {  
  7.             ServerBootstrap b = new ServerBootstrap();  
  8.             b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChildChannelHandler());  
  9. //繫結埠,同步等待成功
  10.             ChannelFuture f = b.bind(port).sync();  
  11. //等待服務端監聽埠關閉;
  12.             f.channel().closeFuture().sync();  
  13.         } finally {  
  14.             bossGroup.shutdownGracefully();  
  15.             workerGroup.shutdownGracefully();  
  16.         }  
如上看到,這裡定義了2個NioEventLoopGroup,實際上就是2個執行緒池,一個是用於監聽客戶端的連線,一個使用者處理網路io,或者執行系統task等。 用於接收客戶端請求的執行緒池職責如下: 1)接收客戶端tcp請求,初始化Channel引數。 2)將鏈路狀態變更事件通知給ChannelPipeline。 用於處理io操作的Reactor執行緒池職責如下: 1)非同步讀取通訊端的資料,傳送讀事件到ChannelPipeline; 2)非同步傳送訊息到通訊對端,呼叫ChannelPipeline的訊息傳送介面; 3)執行系統呼叫Task; 4)執行定時任務Task,例如鏈路空閒狀態監測定時任務;

2、NioEventLoop設計原理

NioEventLoop的設計並不是一個單純的io讀寫,還兼顧處理了以下2類任務: 系統task:通過呼叫NioEventLoop的execute(Runnable task)方法實現,Netty有很多系統task,建立他們的主要原因是:當io執行緒和使用者執行緒同時操作網路資源的時候,為了防止併發操作導致的鎖競爭,將使用者執行緒的操作封裝成Task放入訊息佇列中,由i/o執行緒負責執行,這樣就實現了局部無鎖化。 定時任務:執行schedule()方法 其類圖的結構如下:
NioEventLoopGroup是NioEventLoop的組合,用於管理NioEventLoop。 nioEventLoop需要處理網路的讀寫,因此它必須會有一個多路複用器物件。下面是Selector的定義: Selector selector; private SelectedSelectionKeySet selectedKeys; private final SelectorProvider provider; 它的初始化非常簡單直接是Selector.open方法進行初始化。 網路的讀取操作是在run方法中去執行的,首先看有沒有未執行的任務,有的話直接執行,否則就去輪訓看是否有就緒的Channel,如下: @Override protected void run() { for (;;) { oldWakenUp = wakenUp.getAndSet(false); try { if (hasTasks()) { selectNow(); } else { select(); // 'wakenUp.compareAndSet(false, true)' is always evaluated // before calling 'selector.wakeup()' to reduce the wake-up // overhead. (Selector.wakeup() is an expensive operation.) // // However, there is a race condition in this approach. // The race condition is triggered when 'wakenUp' is set to // true too early. // // 'wakenUp' is set to true too early if: // 1) Selector is waken up between 'wakenUp.set(false)' and // 'selector.select(...)'. (BAD) // 2) Selector is waken up between 'selector.select(...)' and // 'if (wakenUp.get()) { ... }'. (OK) // // In the first case, 'wakenUp' is set to true and the // following 'selector.select(...)' will wake up immediately. // Until 'wakenUp' is set to false again in the next round, // 'wakenUp.compareAndSet(false, true)' will fail, and therefore // any attempt to wake up the Selector will fail, too, causing // the following 'selector.select(...)' call to block // unnecessarily. // // To fix this problem, we wake up the selector again if wakenUp // is true immediately after selector.select(...). // It is inefficient in that it wakes up the selector for both // the first case (BAD - wake-up required) and the second case // (OK - no wake-up required). if (wakenUp.get()) { selector.wakeup(); } } cancelledKeys = 0; final long ioStartTime = System.nanoTime(); needsToSelectAgain = false; if (selectedKeys != null) { processSelectedKeysOptimized(selectedKeys.flip()); } else { processSelectedKeysPlain(selector.selectedKeys()); } final long ioTime = System.nanoTime() - ioStartTime; final int ioRatio = this.ioRatio; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { break; } } } catch (Throwable t) { logger.warn("Unexpected exception in the selector loop.", t); // Prevent possible consecutive immediate failures that lead to // excessive CPU consumption. try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore. } } } } 在進行輪訓的時候,可能為空,也沒有wakeup操作或是最新的訊息處理,則說明本次輪訓是一個空輪訓,此時會觸發jdk的epoll bug,它會導致Selector進行空輪訓,使i/o執行緒處於100%。為了避免這個bug。需要對selector進行統計: 1)對selector操作週期進行統計 2)每完成一次輪訓進行一次計數 3)當在某個週期內超過一定次數說明觸發了bug,此時需要進行重新建立Selector,並賦值新值,將原來的進行關閉。 呼叫rebuildSelector方法。 當輪訓到有就緒的Channel時,就進行網路的讀寫操作,其程式碼如下:
 cancelledKeys = 0;

                final long ioStartTime = System.nanoTime();
                needsToSelectAgain = false;
                if (selectedKeys != null) {
                    processSelectedKeysOptimized(selectedKeys.flip());
                } else {
                    processSelectedKeysPlain(selector.selectedKeys());
                }
                final long ioTime = System.nanoTime() - ioStartTime;

                final int ioRatio = this.ioRatio;
                runAllTasks(ioTime * (100 - ioRatio) / ioRatio);

                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        break;
                    }
                }
            } catch (Throwable t) {
                logger.warn("Unexpected exception in the selector loop.", t);

                // Prevent possible consecutive immediate failures that lead to
                // excessive CPU consumption.
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // Ignore.
                }
            }
        }
此時會去呼叫processSelectedKeysPlain方法,預設沒有開啟SelectedKey的優化方法。這裡執行的方法如下:
 private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
        // check if the set is empty and if so just return to not create garbage by
        // creating a new Iterator every time even if there is nothing to process.
        // See https://github.com/netty/netty/issues/597
        if (selectedKeys.isEmpty()) {
            return;
        }

        Iterator<SelectionKey> i = selectedKeys.iterator();
        for (;;) {
            final SelectionKey k = i.next();
            final Object a = k.attachment();
            i.remove();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (!i.hasNext()) {
                break;
            }

            if (needsToSelectAgain) {
                selectAgain();
                selectedKeys = selector.selectedKeys();

                // Create the iterator again to avoid ConcurrentModificationException
                if (selectedKeys.isEmpty()) {
                    break;
                } else {
                    i = selectedKeys.iterator();
                }
            }
        }
    }
這裡會去判斷selectedkey是否為空,如果不為空就去獲取selectedkey上的channel(為NioServerSocketChannel或是NioSocketChannel),獲取到channel後執行,判斷其型別,這裡netty的都是AbstractNioChannel類,執行的方法如下:
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
                if (!ch.isOpen()) {
                    // Connection already closed - no need to handle write.
                    return;
                }
            }
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }
        } catch (CancelledKeyException e) {
            unsafe.close(unsafe.voidPromise());
        }
    }
這裡會看選擇鍵是否可用,可用然後對位進行判斷,如果是讀或者是連線操作,則呼叫Unsafe的read方法。此處的Unsafe的實現是一個多型。(可能是呼叫NioServerSocketChannel或是NioSocketChannel的doReadBytes方法),對於服務端處理連線的請求如下:
 @Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = javaChannel().accept();

        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, childEventLoopGroup().next(), ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }

        return 0;
    }
對於客戶端的呼叫如下:
    @Override
    protected int doReadBytes(ByteBuf byteBuf) throws Exception {
        return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes());
    }
當網路位為寫的時候,則說明有半包訊息沒有傳送完成,需要繼續呼叫flush方法進行傳送。 後面的如果網路操作位為連線狀態,則需要對連線結果進行判斷。 處理完網路的io後,Eventloop要執行一些非io的系統task和定時任務,程式碼如下:
   final long ioTime = System.nanoTime() - ioStartTime;

                final int ioRatio = this.ioRatio;
                runAllTasks(ioTime * (100 - ioRatio) / ioRatio);

由於要同時執行io和非io的操作,為了充分使用cpu,會按一定的比例去進行執行,如果io的任務大於定時任務和task,則可以將io比例調大。反之調小,預設是50%,其執行方法如下:
  protected boolean runAllTasks(long timeoutNanos) {
        fetchFromDelayedQueue();
        Runnable task = pollTask();
        if (task == null) {
            return false;
        }

        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            try {
                task.run();
            } catch (Throwable t) {
                logger.warn("A task raised an exception.", t);
            }

            runTasks ++;

            // Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }

            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        this.lastExecutionTime = lastExecutionTime;
        return true;
    }
這裡先去獲取task,如果task為空,則退出,然後迴圈去執行task,當執行的task為64的時候,這時候會去比較上次執行時間和延時的關係,如果大於延時那麼就退出,這裡獲取naotime是每64次進行獲取一次。這樣做一來是獲取naotime比較耗時,另外也不能長時間執行task,讓io阻塞,所以一般每64個任務就會返回。 最後eventloop的run方法,會判斷是否優雅關閉,如果是優雅關閉會執行closeAll方法,如下:
private void closeAll() {
        selectAgain();
        Set<SelectionKey> keys = selector.keys();
        Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
        for (SelectionKey k: keys) {
            Object a = k.attachment();
            if (a instanceof AbstractNioChannel) {
                channels.add((AbstractNioChannel) a);
            } else {
                k.cancel();
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                invokeChannelUnregistered(task, k, null);
            }
        }

        for (AbstractNioChannel ch: channels) {
            ch.unsafe().close(ch.unsafe().voidPromise());
        }
    }
上述的程式碼解析是netty5的。