1. 程式人生 > >netty原始碼分析(十七)Netty執行緒模型深度解讀與架構設計原則

netty原始碼分析(十七)Netty執行緒模型深度解讀與架構設計原則

上次分析到:

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return
promise; } }

之前提到的ChannelPromise 結構,其實是一個非同步返回結果的封裝,它持有channel和當前的SingleThreadEventLoop :
這裡寫圖片描述

進入unsafe實現類AbstractUnsafe的register方法:

 protected abstract class AbstractUnsafe implements Unsafe {

        private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this
); private RecvByteBufAllocator.Handle recvHandle; private boolean inFlush0; /** true if the channel has never been registered, false otherwise */ private boolean neverRegistered = true; public final void register(EventLoop eventLoop, final ChannelPromise promise) { if
(eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } //上邊的邏輯主要是一些非空判斷之類的東西 AbstractChannel.this.eventLoop = eventLoop; //主要的註冊邏輯分支,if和else分支可以看到最後呼叫的都是register0,但是else裡邊加了一個外殼---執行緒 if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } } }

eventLoop.inEventLoop()是分支邏輯的主要判斷依據(eventLoop實體是SingleThreadEventExecutor):

public interface EventExecutor extends EventExecutorGroup {
...略
    /**
     * Calls {@link #inEventLoop(Thread)} with {@link Thread#currentThread()} as argument
     * 將當前執行緒作為引數。
     */
    boolean inEventLoop();
    ...略
    }

找到他的實現類:

public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
    ...略
    public boolean inEventLoop() {
        return inEventLoop(Thread.currentThread());//引數為當前執行緒。
    }
    ...略
}

在SingleThreadEventExecutor:

    public boolean inEventLoop(Thread thread) {
        return thread == this.thread;
    }

很簡單,即,判斷當前執行緒是不是SingleThreadEventExecutor裡邊維護的執行緒。所以else裡邊的邏輯是SingleThreadEventExecutor裡邊的執行緒不是當前執行緒的時候,新建一個Thread去執行register0,下邊看一下register0的邏輯:

        private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                doRegister();//核心註冊方法
                neverRegistered = false;
                registered = true;

                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                // user may already fire events through the pipeline in the ChannelFutureListener.
                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();
                // Only fire a channelActive if the channel has never been registered. This prevents firing
                // multiple channel actives if the channel is deregistered and re-registered.
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // This channel was registered before and autoRead() is set. This means we need to begin read
                        // again so that we process inbound data.
                        //
                        // See https://github.com/netty/netty/issues/4805
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

看一下doRegister():
看一下AbstractNioChannel 實現類的doRegister邏輯:

public abstract class AbstractNioChannel extends AbstractChannel {
...略
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                //javaChannel()返回的是SelectableChannel(jdk的java.nio.channels.SelectableChannel)
                //javaChannel().register是將channel註冊到Selector上去,所以eventLoop().unwrappedSelector()返回的是Selector
                //(jdk的java.nio.channels.Selector)
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }
    ...略
}

到這裡我們看到了最終netty的JavaNio的註冊實現。最後說一下比較重要的一點,上邊提到的主要的註冊邏輯分支,if和else分支可以看到最後呼叫的都是register0,但是else裡邊加了一個外殼—執行緒,這個地方有這麼五點需要注意:

  1. 一個EventLoopGroup當中會包含一個或多個EventLoop。
  2. 一個EventLoop在它的整個生命週期當中都只會與唯一一個Thread進行繫結。
  3. 所有由EventLoop所處理的各種I/O事件都將在它所關聯的那個Thread上進行處理。
  4. 一個Channel在它的整個生命週期中只會註冊在一個EventLoop上。
  5. 一個EventLoop在執行過程中,會被分配給一個或多個Channel。

這是netty的核心的架構理念,非常重要!!!

相關推薦

netty原始碼分析()Netty執行模型深度解讀架構設計原則

上次分析到: public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop public ChannelFutu

Chrome原始碼分析之程序和執行模型(三)

關於Chrome的執行緒模型,在他的開發文件中有專門的介紹,原文地址在這裡:http://dev.chromium.org/developers/design-documents/threading chrome的程序,chrome沒有采用一般應用程式的單程序多執行緒的模

netty原始碼分析(六)Channel選擇器工廠輪詢演算法及註冊底層實現

上一節說到註冊的入口,即 MultithreadEventLoopGroup: public ChannelFuture register(Channel channel) { return next().register(chann

Mina、Netty、Twisted一起學():執行模型

要想開發一個高效能的TCP伺服器,熟悉所使用框架的執行緒模型非常重要。MINA、Netty、Twisted本身都是高效能的網路框架,如果再搭配上高效率的程式碼,才能實現一個高大上的伺服器。但是如果不瞭解它們的執行緒模型,就很難寫出高效能的程式碼。框架本身效率再高,程式寫的太差

Netty原始碼分析)----- read過程 原始碼分析

在上一篇文章中,我們分析了processSelectedKey這個方法中的accept過程,本文將分析一下work執行緒中的read過程。 private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {

Java基礎(三)- 執行相關

多執行緒常用操作方法: 1:執行緒的命名與取得 class MyThread implements Runnable { @Override public void run() { System.out.println(Thread.currentThread()

【4】Netty4原始碼分析-NioEventLoop實現的執行執行邏輯

轉自 http://xw-z1985.iteye.com/blog/1928244 在netty服務端啟動原始碼分析-執行緒建立一文中已分析SingleThreadEventExecutor所持有的執行緒的執行邏輯由NioEventLoop實現,那麼本文就著手分析NioEventLoop

原始碼分析]、ASP.NET Core 整合

  目錄      0. 簡介      1. 啟動流程      2. 程式碼分析      2.1 初始化操作      2.2 AbpAspNetCoreModule 模組      2.3 控制器與動態 API      2.4 過濾器      2.3 CSRF 防禦元件      2.4

[Abp 原始碼分析]、ASP.NET Core 整合

0. 簡介 整個 Abp 框架最為核心的除了 Abp 庫之外,其次就是 Abp.AspNetCore 庫了。雖然 Abp 本身是可以用於控制檯程式的,不過那樣的話 Abp 就基本沒什麼用,還是需要集合 ASP.NET Core 才能發揮它真正的作用。 在 Abp.AspNetCore 庫裡面,Abp 通過 W

Muduo網路庫原始碼分析(三)執行間使用eventfd通訊和EventLoop::runInLoop系列函式

先說第一點,執行緒(程序)間通訊有很多種方式(pipe,socketpair),為什麼這裡選擇eventfd? eventfd 是一個比 pipe 更高效的執行緒間事件通知機制,一方面它比 pipe

netty原始碼分析 1-1 Netty深入剖析

我們為什麼呢要學習Netty通訊呢?其實業內我們有很多大型架構都是Netty作為底層通訊,非同步非阻塞。如:Duddo,RocketMQ,Spark,Elasticsearch,Cassandra,Flink,Netty-SocketIO,Spring5,Play,Grpc。

ABP原始碼分析:DTO 自動校驗的實現

對傳給Application service物件中的方法的DTO引數,ABP都會在方法真正執行前自動完成validation(根據標註到DTO物件中的validate規則)。 ABP是如何做到的? 思路無外乎通過Castle的攔截器實現AOP。本文主要分析ABP是如何設計。 Ivalidate: 空介面,用

XGBoost原始碼分析之單機多執行的實現

上篇文章主要通過論文閱讀、數學推導,基本掌握了XGBoost的原理。於是開始閱讀XGBoost原始碼,並總結了幾處自己認為比較重要的方面。如有錯誤,請指正: 1. 總體框架: cli_main.cc 是程式的入口,main函式所在的檔案。除了有main函式以外,還有訓練

iOS開發筆記之四——多執行場景下的KVO使用參考方案

如果你取檢索網路資料會發現,有人直接不建議把KVO與多執行緒混合使用,因為KVO的響應和KVO觀察的值變化是在一個執行緒上的,不同的執行緒可能會導致不可預知的後果。參考資料見這裡: 當然,場景總是千變萬化的,下面我就介紹一種多執行緒下使用KVO的場景。 具體場景如

spring原始碼分析-controller的執行安全

大家都知道,struts1.2由於是執行緒安全的,每一個請求都去例項化一個action,造成大量併發時的資源浪費。   struts2在這一點上做了改進,每個action都是一個singleton,所有的請求都是請求同一個action例項。這樣在一定程度上能節約資源,但又有

ScheduledThreadPoolExecutor原始碼分析-你知道定時執行池是如何實現延遲執行和週期執行的嗎?

Java版本:8u261。 ## 1 簡介 ScheduledThreadPoolExecutor即定時執行緒池,是用來執行延遲任務或週期性任務的。相比於Timer的單執行緒,定時執行緒池在遇到任務丟擲異常的時候不會關閉整個執行緒池,更加健壯(需要提一下的是:ScheduledThreadPoolExec

Chrome原始碼剖析--Chrome的多執行模型

0. Chrome的併發模型 如果你仔細看了前面的圖,對Chrome的執行緒和程序框架應該有了個基本的瞭解。Chrome有一個主程序,稱為Browser程序,它是老大,管理Chrome大部分的日常事務;其次,會有很多Renderer程序,它們圈地而治,各管理一組站點的顯示和通

Netty原始碼分析第2章(NioEventLoop)---->第1節: NioEventLoopGroup之建立執行執行

  第二章: NioEventLoop   概述:         通過上一章的學習, 我們瞭解了Server啟動的大致流程, 有很多元件與模組並沒有細講, 從這個章開始, 我們開始詳細剖析netty的各個元件, 並結合啟動流程, 將這些元件

Netty原始碼分析第2章(NioEventLoop)---->第3節: 初始化執行選擇器

  第二章:NioEventLoop   第三節:初始化執行緒選擇器 回到上一小節的MultithreadEventExecutorGroup類的構造方法: protected MultithreadEventExecutorGroup(int nThreads, Exec

Netty原始碼分析第8章(高效能工具類FastThreadLocal和Recycler)---->第6節: 異執行回收物件

  Netty原始碼分析第八章: 高效能工具類FastThreadLocal和Recycler   第六節: 異執行緒回收物件   異執行緒回收物件, 就是建立物件和回收物件不在同一條執行緒的情況下, 物件回收的邏輯 我們之前小節簡單介紹過, 異執行緒回收物