執行緒池是資源複用的典範之作,其基本思想是維護一個含有一定數量的在執行的執行緒集合,在需要執行執行緒任務的時候直接從這個集合中取出一個執行緒去執行任務,而不是重新建立一個。

如果我們自己去實現一個執行緒池,那麼基本的想法是維護一個執行緒的集合,這些執行緒都從一個佇列中去取任務,如果佇列為空,則阻塞對應的執行緒,等待佇列不空的訊息通知。當執行緒完成了任務,應該將執行緒返回給執行緒佇列,而不是關閉執行緒。基本思想是這樣,那麼Java中具體是如何實現執行緒池的呢,我們來看看。

ThreadPoolExecutor

一般情況下,我們使用執行緒池會使用Executors的靜態方法獲取執行緒池,如
* public static ExecutorService newFixedThreadPool(int nThreads)
* public static ExecutorService newSingleThreadExecutor()
* public static ExecutorService newCachedThreadPool()
* public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

除了最後一個方法外,其它三個方法都是返回一個ThreadPoolExecutor例項,它是ExecutorService介面的一個實現類。瞄一眼ThreadPoolExecutor的構造方法

   public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

它的構造方法有多個過載,其中最終都是使用了上面這個構造方法,它包括7個引數,corePoolSize,核心執行緒數量;maximumPoolSize,最大執行緒數量;keepAliveTime,超過核心執行緒數量執行緒的在空閒後的存活時間;unit,存活時間的時間單位;workQueue,超過核心執行緒後任務存放的阻塞佇列;threadFactory,常用定義執行緒名字的執行緒工廠,可以使用預設工廠;最後一個handler,是阻塞佇列已滿,並且執行緒數達到maximumPoolSize的時候的處理策略,其中包括了丟擲異常(AbortPolicy),誰請求誰呼叫(CallerRunsPolicy),丟棄執行緒池中的一個任務來執行現在的任務(DiscardOldesPolicy),直接丟棄掉(DiscardPolicy)預設使用拋異常策略。

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

可以看到newFixedThreadPool使用預設工廠,預設拒絕策略,連結串列阻塞佇列,最大執行緒數和核心執行緒數相同,並且如果超過核心執行緒數的執行緒控制,立即失效。這種比較適合任務數固定的任務。

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

newSingleThreadExecutor的核心執行緒數和最大執行緒數都是1,並且使用LinkedBlockingQueue作為緩衝佇列。

   public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

newCachedThreadPool在沒有任務的時候並不建立執行緒,只有在任務出現的時候才使用SynchronousQueue佇列傳遞任務給執行緒,但SynchronousQueue每次只能傳遞一個任務,新的任務首先會通過offer(非阻塞)方法嘗試傳遞任務給執行緒,如果此時沒有空閒的執行緒,會新生成執行緒來完成任務,而且執行緒的空閒時間是60s,所以比較適合任務多而且短的任務集(短的原因是總可以有空閒執行緒去SynchronousQueue中取任務)。關於SynchronousQueue,可以參考這幾篇文章《SynchronousQueue使用例項》,《J.U.C之阻塞佇列:SynchronousQueue》。

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

ScheduledThreadPoolExecutor可以使用schedule方法在給定時間延遲時進行呼叫。

ThreadPoolExecutor是如何管理執行緒池的?

從上面我們看到使用ThreadPoolExecutor可以獲取執行緒池,並且可以呼叫execute(Runnable)和submit(Runnable)提交任務。那麼在執行execute方法的時候,ThreadPoolExecutor是如何管理執行緒池來完成當前的任務的。我們可以從execute的原始碼中一窺這個處理策略

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {//第一步
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {//第二步
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))//第三步
            reject(command);
    }

其基本策略有三大步,第一步,在當前的執行緒池中執行緒數量小於核心執行緒數量,直接呼叫addWork(Runnable,boolean)新增執行緒,並且執行任務任務,任務完成後這個執行緒會駐留在池中等待執行任務。如果執行緒數已經超過的核心執行緒數,第二步,則將任務使用阻塞佇列的offer非阻塞方式放入佇列中。如果阻塞佇列也滿了,則嘗試addWork(Runnable,boolean),如果執行緒數量超過了最大執行緒數量,則新增失敗,呼叫拒絕策略。
addWork(Runnable)方法基本思想是首先將Runable包裝成Worker,並執行Worker。Worker的資料結構使得在執行完該方法後,會嘗試從阻塞佇列中獲取任務繼續執行。
另外值得注意的是JDK中的執行緒池標註的5種狀態,如下圖所示

只有RUNNING狀態的執行緒池可以接收任務,處於SHUTDOWN狀態的執行緒池不可以接受新任務,但是可以繼續對已新增的任務進行處理。處於STOP狀態的執行緒池不接收新任務,不處理已新增的任務,並且會中斷正在處理的任務。TIDYING狀態的執行緒池在執行terminated函式,而TERMINATED狀態說明已經執行完terminated函式,徹底終止。其中RUNNING可以轉換成SHUTDOWN(shutdown()函式)或STOP狀態(shutDownNow()函式),有STOP或SHUTDOWN狀態可以轉換成TIDYING,而TIDYING可以轉換成TERMINATED。

參考文獻

宣告

本文首發表於我的部落格,歡迎關注!轉載須註明文章出處,作者保留文章所有權。

.