1. 程式人生 > >Java 執行緒池ThreadPoolExecutor(基於jdk1.8)(一)

Java 執行緒池ThreadPoolExecutor(基於jdk1.8)(一)

介紹

執行緒池的作用就是提供一種對執行緒的管理,避免由於過多的建立和銷燬執行緒所造成的開銷。在一個“池”中維護著一定數量的執行緒,達到可重複利用的效果。在Java中,執行緒池的實現主要是通過ThreadPoolExecutor來實現的。接下來先從類圖結構來分析一下。

類結構

這裡寫圖片描述

  • Executor
public interface Executor {
    void execute(Runnable command);
}

在這看得出Executor是一個頂層的介面,裡邊只有execute方法。這個介面的目的就是表明需要執行一個Runnable任務。

  • ExecutorService
    它直接繼承至Executor,但是依然是一個介面,只是在此基礎上增加了其他的方法。
<T> Future<T> submit(Callable<T> task);

submit方法主要就是接受Callable介面的引數,不再是Runnable引數了,而且增加了返回值Future。
在ExecutorService類中還有好幾個過載函式。這幾個方法的設計主要是為了讓執行任務者能夠得到任務的執行結果。

void shutdown();

這個方法主要是提供了關閉執行緒池的操作,呼叫此方法後,執行緒池不再接收新的任務,但是會把當前快取佇列的任務全部執行完畢。

List<Runnable> shutdownNow();

這個方法呼叫後,不但不能接收新的任務,也會嘗試中斷正在執行的任務,同時不再執行快取佇列中的任務。

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

這個方法提供執行一系列的任務的功能,最後返回所有任務的Future物件,用於活動任務的執行結果。

  • AbstractExecutorService
    它是一個抽象類,實現了ExecutorService介面。對其中絕大部分的方法進行了實現。
public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

任務為空直接返回空指標異常,新建一個ftask物件,最終還是呼叫了execute方法去執行任務。現在追蹤一下newTaskFor方法

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

返回的是一個FutureTask物件,這個類即實現了Runnable介面又實現了Callable介面,這樣它既可以當作執行緒的執行物件又可以對任務執行後的結果進行獲取。(為了不脫離主線,暫時就不再分析FutureTask的原始碼了)

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks) {
                RunnableFuture<T> f = newTaskFor(t);
                futures.add(f);
                execute(f);
            }
            for (int i = 0, size = futures.size(); i < size; i++) {
                Future<T> f = futures.get(i);
                if (!f.isDone()) {
                    try {
                        f.get();
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    }
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
    }

首先建立用於儲存結果的集合futures,大小為傳入的任務數(一個任務對應一個future物件)。然後遍歷所有的Callable物件,把它們封裝到RunnableFuture中(實際傳入的是futureTask物件),然後把建立的futureTask物件加入到結果集futures中,然後呼叫execute方法去依次執行傳入的任務。
接下來又是一個for迴圈,在其中去保證每個任務已經執行完畢,當判斷某一個任務if (!f.isDone())沒有完成時,會呼叫f.get(),這個方法是一個阻塞方法,也就是說當前執行緒會一直等到任務執行結束才會返回。這樣保證了所有的任務都會在這個for迴圈中全部執行完畢,然後返回futures結果集。
此抽象類中並沒有實現execute、shutdown、shutdownNow等方法,具體的實現放在了ThreadPoolExecutor中。

  • ThreadPoolExecutor
    這個類繼承自AbstractExecutorService ,實現了execute方法。引入了執行緒池的管理。

原始碼分析

先看一下建構函式,因為執行緒池最重要的就是其中的一些引數,不同的引數配置,就可以實現不同的執行緒池。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
     .....
    }
  • corePoolSize
    指的就是執行緒池的核心執行緒數。噹噹前執行緒池中的執行緒個數小於corePoolSize時,對新來的任務,直接開啟一個新的執行緒去執行它。
  • maximumPoolSize
    代表最大能夠容納的執行緒數量。當執行緒池中的執行緒個數大於等於corePoolSize後,當需要執行一個新的任務時會先把任務放入快取佇列中,等待後續空閒的執行緒去執行。如果此時快取佇列已滿,那麼就會新啟一個執行緒去執行它,如果執行緒數量已經超過了maximumPoolSize,那麼就會呼叫reject方法,拒絕執行該次任務(後邊會分析reject方法)。
  • keepAliveTime
    用於指定執行緒存活的時間,當執行緒池中的執行緒大於corePoolSize後,會監控每一個執行緒的空閒時間,如果某個執行緒的空閒時間大於keepAliveTime,那麼就會銷燬該執行緒,釋放資源。
  • unit
    這個是keepAliveTime的單位,可以為秒、毫秒等等。

  • workQueue
    這個就是我們的任務快取隊列了。是一個阻塞佇列的型別,常用的有ArrayBlockingQueue、LinkedBlockingQueue(預設容量是Integer.MAX_VALUE)和SynchronousQueue。

  • threadFactory
    這個就是建立執行緒的工廠類。用於新建執行緒實體。

  • handler
    這是拒絕某個任務的回撥。當執行緒池不能夠處理某個任務時,會通過呼叫handler.rejectedExecution()去處理。內建了四種策略
    AbortPolicy(預設情況):直接丟棄,並且丟擲RejectedExecutionException異常。
    DiscardPolicy:直接丟棄,不做任何處理。
    DiscardOldestPolicy:從快取佇列丟棄最老的任務,然後呼叫execute立刻執行該任務。
    CallerRunsPolicy:在呼叫者的當前執行緒去執行這個任務。

分析完了建構函式,想必大家對其中的引數有了一定的認識,可以看出,不同的引數組合就可以實現不同需求的執行緒池,當然Java中已經為我們內建了很多常用的執行緒池,它們全部位於Executors類當中。如果沒有特殊需求,建議直接使用其中的執行緒池。
看完了建構函式,接下來分析最重要的execute函式,看看到底是怎麼一個執行流程。英語好的直接閱讀註釋,已經寫的非常仔細了。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

不喜歡看文字的直接看下邊的流程圖:
這裡寫圖片描述

既不想看註釋也不喜歡流程圖的,咱們直接分析程式碼:
首先是判斷

workerCountOf(c) < corePoolSize

workerCountOf(c)其實就是計算當前執行緒池中的執行緒數,如果小於核心執行緒數那麼直接

addWorker(command, true)

如果新增成功,那麼方法就結束了。新增失敗,則需要

isRunning(c) && workQueue.offer(command)

其實就是判斷執行緒池是否處於執行狀態,且嘗試往快取佇列新增任務,如果同時為真,則:

int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
    reject(command);
else if (workerCountOf(recheck) == 0)
    addWorker(null, false);

進入if的語句塊,前邊不是已經判斷過是否處於執行狀態了嗎?為什麼還要再次判斷呢?因為考慮多執行緒訪問,而且沒有進行同步措施,所以需要再次進行double-check。
如果新增任務到快取佇列失敗,那麼就嘗試新增新的執行緒:

else if (!addWorker(command, false))
     reject(command);

如果新增失敗就需要拒絕此次任務了。
次方法中關鍵呼叫的幾個方法為:addWorker(),workQueue.offer(),reject(),接下來一個一個分析。