1. 程式人生 > >Vert.x系列(五)--ContextImpl原始碼分析

Vert.x系列(五)--ContextImpl原始碼分析

開發十年,就只剩下這套架構體系了! >>>   

前言:

執行緒安全是Vert.x的重要特性,但這一特性是由它依賴的netty實現的,Vert.x只是直接拿過來使用。

這裡涉及到很多個類。

ContextImpl、EventLoopContext、NioEventLoop、和NioEventLoop的父類SingleThreadEventLoop、和NioEventLoop的爺爺類SingleThreadEventExecutor。

 

原理:

Netty定義了EventExecutor事件執行器,用做對任務處理的封裝。執行器內部維護了Queue<Runable>

,實現了任務的順序執行。還定義了MultithreadEventExecutorGroup類,維護陣列變數EventExecutor[] children,實現了多核CPU的利用; (陣列佇列結構,非常像Hashmap的陣列連結串列結構)。一個Verticle和一個ContextImpl對應,再有一個ContextImpl和一個EventExecutor對應,使所有對Verticle的操作都在一個Queue<Runable>中依次執行,實現了執行緒安全。

 

程式碼:

程式碼1.構造器

對於佔了大部分的普通Verticle來說一般來說,會依次由VertxImpl.getOrCreateContext()、createEventLoopContext()、EventLoopContext構造方法、ContextImpl構造方法呼叫後,進入ContextImpl類

 

在建立ContextImpl 時 ,這下面的三個方法(或構造方法),

// 利用next(),從group中取一個。next()也實現了對group的平衡獲取

private static EventLoop getEventLoop(VertxInternal vertx) {
    EventLoopGroup group = vertx.getEventLoopGroup();
    if (group != null) {
        return group.next();
    } else {
        return null;
    }
}

// 需要注意this的第2個引數是getEventLoop(vertx)方法的呼叫。才

 

protected ContextImpl(VertxInternal vertx, WorkerPool internalBlockingPool, WorkerPool workerPool, String deploymentID, JsonObject config, 
ClassLoader tccl) {
    this(vertx, getEventLoop(vertx), internalBlockingPool, workerPool, deploymentID, config, tccl);
}

 

// 簡單的賦值

protected ContextImpl(VertxInternal vertx, EventLoop eventLoop, WorkerPool internalBlockingPool, WorkerPool workerPool, String deploymentID, JsonObject config,ClassLoader tccl) {
    if (DISABLE_TCCL && !tccl.getClass().getName().equals("sun.misc.Launcher$AppClassLoader")) {
        log.warn("You have disabled TCCL checks but you have a custom TCCL to set.");
    }
    this.deploymentID = deploymentID;
    this.config = config;
    this.eventLoop = eventLoop;
    this.tccl = tccl;
    this.owner = vertx;
    this.workerPool = workerPool;
    this.internalBlockingPool = internalBlockingPool;
    this.orderedTasks = new TaskQueue();
    this.internalOrderedTasks = new TaskQueue();
    this.closeHooks = new CloseHooks(log);
}

完成對屬性private final EventLoop eventLoop;的賦值,即對ContextImpl和EventLoop的1對1繫結。

VertxImpl的構造方法中,會對它的成員變數 eventLoopGroup 賦值

eventLoopGroup = transport.eventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory, NETTY_IO_RATIO);

在eventLoopGroup()方法為:

public EventLoopGroup eventLoopGroup(int nThreads, ThreadFactory threadFactory, int ioRatio) {
    NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(nThreads, threadFactory);
    eventLoopGroup.setIoRatio(ioRatio);
    return eventLoopGroup;
}

可以看到例項化了一個 NioEventLoopGroup 作為返回值。NioEventLoopGroup 就是若干個NioEventLoop的封裝,主要還是看NioEventLoop。

用ctrl+alt+U檢視下類圖,發現NioEventLoop的繼承結構有點複雜。可以看到 Executor、SingleThreadEventExecutor。

Executor 定義了 void execute(Runnable command); -- 處理任務的方法

SingleThreadEventExecutor 實現了void execute(Runnable command);

並定義了重要的任務佇列 private final Queue<Runnable> taskQueue;

也看看 NioEventLoopGroup的類圖:

在他的父類MultithreadEventExecutorGroup,定義了private final EventExecutor[] children;

那麼,對前面的eventLoopGroup()方法裡的

NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(nThreads, threadFactory);

這句話歸納,就是擁有 EventExecutor[] children; 的物件,而EventExecutor實現了對Queue<Runnable> taskQueue;的操作。就是“原理”裡說的陣列佇列結構。

 

程式碼2. runOnContext

對Verticle的操作,最後都會統一到 ContextImpl.runOnContext()方法處理,比如EventBusImpl.deliverToHandler()

runOnContext作為入口方法很簡單:

// Run the task asynchronously on this same context

@Override
public void runOnContext(Handler<Void> task) {
    try {
        executeAsync(task);
    } catch (RejectedExecutionException ignore) {
    // Pool is already shut down
    }
}

executeAsync 有 abstract 關鍵字修飾,需要檢視 ContextImpl 的子類EventLoopContext ,看看它是怎麼實現的

public void executeAsync(Handler<Void> task) {
// No metrics, we are on the event loop.
nettyEventLoop().execute(wrapTask(null, task, true, null));
}

這個wrapTask(程式碼略)方法把屬於Vertx的Handler封裝成JDK的Runable,傳給netty框架處理。再使用execute()執行。下面的邏輯就是netty如何處理Runnable.

 

程式碼3 SingleThreadEventExecutor.execute()

execute() 最上層的介面Executor定義的。NioEventLoop的父類SingleThreadEventExecutor 重寫了此方法.SingleThreadEventExecutor去執行execute() ,自己仍然還是一個代理,會把真正執行執行執行緒的邏輯(類似方法名doExecute做的事情)的邏輯交給 private final Executor executor;執行

@Override
public void execute(Runnable task) {
    if (task == null) {
    throw new NullPointerException("task");
    }
    boolean inEventLoop = inEventLoop();
    // 對Queue新增 addTask(Runnable task)--offerTask(Runnable task) --taskQueue.offer(task); 這一系列操作, // 完成了對 Queue<Runnable>的新增操作。
    addTask(task);
    if (!inEventLoop) {
    // 執行 
    //SingleThreadEventExecutor.execute--> SingleThreadEventExecutor.startThread-->
    // SingleThreadEventExecutor.doStartThread. -->成員 Executor executor的execute(),實現是ThreadPerTaskExecutor的execute()
    startThread();
    // 對Queue減少
    if (isShutdown() && removeTask(task)) {
    reject();
    }
    }
    if (!addTaskWakesUp && wakesUpForTask(task)) {
    wakeup(inEventLoop);
    }
}

 

這個Executor executor 是在SingleThreadEventExecutor的構造方法中例項化的ThreadPerTaskExecutor,是屬於Netty框架的。但是,ThreadPerTaskExecutor包含一個介面屬性ThreadFactory threadFactory。針對Vertx框架的場景,new ThreadPerTaskExecutor(threadFactory) 中的 threadFactory是屬於Vertx框架的VertxThreadFactory。

protected SingleThreadEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory,boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
// 粗體程式碼
this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler);
}
public final class ThreadPerTaskExecutor implements Executor {

    private final ThreadFactory threadFactory; // 這個實際是Vertx框架下的VertxThreadFactory
    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
           }
        this.threadFactory = threadFactory;
    }
    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start(); // 最最最底層的Thread.start()方法。
    }
}

這個變數的源頭,很早很早前,由VertxImpl在呼叫時傳入的

eventLoopGroup = transport.eventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory, NETTY_IO_RATIO);

所以,弄到現在,各種Factory包裹,N層邏輯。才最終還是使用抽象工廠模式,呼叫了Vertx實現的工廠。

 

需要注意的是 , NioEventLoop重寫了newTaskQueue()方法,

@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
// This event loop never calls takeTask()
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue(): PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}

所以Queue<Runnable> taskQueue 擁有的不是在SingleThreadEventExecutor.newTaskQueue()裡的 LinkedBlockingQueue , 而是 MpscUnboundedArrayQueue。

public final class ThreadPerTaskExecutor implements Executor {

private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}