1. 程式人生 > >java執行緒池的實現原理(netty)

java執行緒池的實現原理(netty)

部落格已經好久都沒有寫了,感覺自己變慵懶了。。。這本來也是應該早就應該要寫的。。。

在前面讀netty原始碼的時候就可以看到netty本身就自己實現了一個執行緒池,而且也自己實現了future,並且實現的功能更加的強大。。。future還可以新增listener,這個剛開始自己覺得最為神奇。。當看完了它是怎麼實現的之後覺得設計還是挺漂亮的。。。

要自己實現java的執行緒池,那麼有兩個介面是需要熟悉的。。。


最為上層的是executor介面,其中就定義了一個方法:

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the <tt>Executor</tt> implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution.
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
看註釋就知道,該方法要實現的功能很簡單,就是用來執行提交的runnalbe任務。。

接下來是executorservice介面,它擴充套件了executor介面,增加了關閉,submit,invokeAll方法。。使得功能更強一些。。

至於說AbstractExecutorService,它是一抽象類,它實現了ExecutorService中的方法,如果我們繼承這個類來實現自己的執行緒池的話,那麼需要實現留下的execute方法。。。這裡我們就拿其中它的一個方法實現來看看吧:

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
這裡呼叫了newTaskFor方法,將runnable包裝為RunnableFuture的型別,也就是java自定義的future型別。。最後在呼叫execute方法來執行這個task,然後將futuretask按照future返回。。。

好了接下來我們來看看netty執行緒池的實現吧:

(1)我們可以將executor理解為單個執行任務的處理器,看做單個執行緒

(2)將executorgroup看成是一個executor的集合,也就是把它看成執行緒池。。

這裡我們就先來看看DefaultEventExecutor的實現吧,先來看看它型別的整合體系:


這次我們從後向前看吧,先來看看DefaultEventExecutor的定義:

final class DefaultEventExecutor extends SingleThreadEventExecutor {

    DefaultEventExecutor(DefaultEventExecutorGroup parent, ThreadFactory threadFactory) {
        super(parent, threadFactory, true);
    }

    //這個方法將會在生成的執行緒當中被呼叫,用於不斷的來處理已經
    @Override
    protected void run() {
        for (;;) {  //一個迴圈,不斷的將task取出來,然後執行就可以了
            Runnable task = takeTask();
            if (task != null) {
                task.run();
                updateLastExecutionTime();
            }

            if (confirmShutdown()) {
                break;
            }
        }
    }
}
其實它的實現相對還是很簡單的,定義了一個run方法,它將會在生成的執行緒中呼叫,它完成的功能就是不斷的從佇列中取出任務然後執行。。。

接下來我們來看看一個比較重要的SingleThreadEventExecutor的定義吧:

首先它定義了兩個兩個任務佇列:

 private final Queue<Runnable> taskQueue;     //當前executor的任務佇列
    //這個佇列主要是用於處理帶有時間延遲的任務,可以將其理解為定時任務
    final Queue<ScheduledFutureTask<?>> delayedTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();   //帶優先權的任務佇列
前面的佇列預設採用的是LinkedBlockingQueue,這是執行緒安全的佇列,而還有一個優先權佇列,這個是用來實現延遲任務的,也就是定時任務。。。可以類比nginx或者libevent的定時的實現。。說白了都是同一個道理,通過時間來作為key,將已經超時的節點選出來。。

這裡說一個題外話:java的PriorityQueue是採用堆實現的,小根堆實現的。。有意思吧。。不過這個小根堆的儲存炒採用的是陣列。。。

我們再來看看在該型別裡面定義的執行緒吧,也就是最終的用於執行任務的執行緒:

thread = threadFactory.newThread(new Runnable() {    //建立執行緒
            @Override   //執行緒的執行函式
            public void run() {
                CURRENT_EVENT_LOOP.set(SingleThreadEventExecutor.this);  //儲存當前執行緒的本地物件
                boolean success = false;
                updateLastExecutionTime();
                try {
                    SingleThreadEventExecutor.this.run();  //開始當前executor的執行函式,run方法延後到了後面的類中實現
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    if (state < ST_SHUTTING_DOWN) {
                        state = ST_SHUTTING_DOWN;
                    }

                    // Check if confirmShutdown() was called at the end of the loop.
                    if (success && gracefulShutdownStartTime == 0) {
                        logger.error(
                                "Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                                "before run() implementation terminates.");
                    }

                    try {
                        // Run all remaining tasks and shutdown hooks.
                        for (;;) {
                            if (confirmShutdown()) {
                                break;
                            }
                        }
                    } finally {
                        try {
                            cleanup();
                        } finally {
                            synchronized (stateLock) {
                                state = ST_TERMINATED;
                            }
                            threadLock.release();
                            if (!taskQueue.isEmpty()) {
                                logger.warn(
                                        "An event executor terminated with " +
                                        "non-empty task queue (" + taskQueue.size() + ')');
                            }
                        }
                    }
                }
            }
        });
程式碼比較簡單,就不細說了。。

另外在該型別中還定義了一些基本的操作方法,例如takeTask,runAllTask等方法。

最後它還實現了最為重要的execute方法:

    //用於執行一個task,說白了就是把這個task放到任務隊列當中去,如果當前executor中定義的執行緒並沒有啟動的話,那麼要啟動它
    @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp) {
            wakeup(inEventLoop);
        }
    }
由這部分程式碼,我們可以看出,對於task的處理都是放入任務隊列當中去,然後再在前面提到的run方法中一個一個的處理。。。

接下來我們來看看AbstractEventExecutor型別的一些基本定義吧:

我們就來看看它的submit方法吧:

    //下面無非就是一些提交和排程任務,會呼叫newTaskFor方法將任務轉換為futuretask,這裡result為void
    @Override
    public Future<?> submit(Runnable task) {
        return (Future<?>) super.submit(task);
    }
其實是呼叫的AbstractExecutorService的submit方法,在這裡將會將傳進來的runnable轉化為callable介面,然後再將其轉化為futuretask,這裡有一點需要注意的是,轉化為futuretask的方法被netty重寫了,用於生成自己的定義的futuretask,在前面就已經說了netty自己實現了自己的future。。。

我們來看重寫的方法吧:

    @Override
    //重寫的abstractexecutorservice的方法,用於將提交的任務封裝為futuretask,這裡封裝成promisetask
    protected final <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new PromiseTask<T>(this, runnable, value);
    }
再來看看在AbstractExecutorServicesubmit方法吧:
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
這裡將會呼叫已經過載的newTaskFor方法來穿傳進來的task轉化為netty自己定義的futuretask,也就是promiseTask,然後再呼叫前面我們提到過的execute方法,它將會把當前的task放入到任務隊列當中去,最後再由最終executor的run方法來挨個挨個的處理。。。至此,對於任務的提交和執行的線路就已經比較的清晰了。。(netty自定義的future的部分以後再說吧)

好了。。接下來我們來看看group的實現吧,也就是池子是怎麼搞的。。。

這裡就選DefaultEventExecutorGroup來分析吧,還是先來看看它的繼承體系:


這裡我們還是從後向前看吧,來看看DefaultEventExecutorGroup的定義:

public class DefaultEventExecutorGroup extends MultithreadEventExecutorGroup {

    public DefaultEventExecutorGroup(int nThreads) {
        this(nThreads, null);
    }

    public DefaultEventExecutorGroup(int nThreads, ThreadFactory threadFactory) {
        super(nThreads, threadFactory);
    }

    @Override
    protected EventExecutor newChild(
            ThreadFactory threadFactory, Object... args) throws Exception {
        return new DefaultEventExecutor(this, threadFactory);
    }
}
定義比較簡單吧,不過這裡有定義一個比較重要的方法,newChild,用於生成一個一個的executor,這裡的executor也就是在前面說的DefaultEventExecutor。

接下來來看看MultithreadEventExecutorGroup的定義吧:

首先它有兩個比較重要的屬性:

    private final EventExecutor[] children;
    private final AtomicInteger childIndex = new AtomicInteger();
前面的children為一個數組,用於儲存當前這個group所有的executor,而另外一個執行緒安全的integer會用於簡單的負載均衡(真的很簡單)

來看看它的構造方法:

    protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        if (threadFactory == null) {
            threadFactory = newDefaultThreadFactory();
        }

        children = new SingleThreadEventExecutor[nThreads];
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
            	//呼叫子類的newChild方法,用於生成一個一個的executor
                children[i] = newChild(threadFactory, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }
    }
其實沒有做太多的事情,無非就是建立executor,並將它們儲存到陣列當中去。。。

另外還有一個比較重要的方法:

    @Override
    public EventExecutor next() {
        return children[Math.abs(childIndex.getAndIncrement() % children.length)];   //獲取當前index的值,然後將其+1,比較簡單的執行緒間負載均衡
    }
這裡就知道這個所謂的負載均衡有多麼簡單了吧。。。哈哈。。。

這裡我們再來看看AbstractEventExecutorGroup吧,我們就看其中一個方法就可以了:

    @Override    //用於提交任務
    public Future<?> submit(Runnable task) {
        return next().submit(task);
    }
到這裡我們就將整個任務的提交過程弄得比較的清晰了。。。

。。。好了。。好像其實這篇文章也沒有什麼新的內容。。。

但是也知道了,其實要實現一個執行緒池也不是什麼難事,無非是實現一下統一的藉口,然後自己在定義一下自己的執行緒機制就好了。。。完全可以模仿netty的實現方式來實現一個自己的執行緒池。。。