1. 程式人生 > >線程池ThreadPoolExecutor源碼解讀研究(JDK1.8)

線程池ThreadPoolExecutor源碼解讀研究(JDK1.8)

else if whether use rep 類之間關系 sin 如果 一段 源碼解讀

一、什麽是線程池

為什麽要使用線程池?在多線程並發開發中,線程的數量較多,且每個線程執行一定的時間後就結束了,下一個線程任務到來還需要重新創建線程,這樣線程數量特別龐大的時候,頻繁的創建線程和銷毀線程需要一定時間而且增加系統的額外開銷。基於這樣的場景,線程池就出現了,線程池可以做到一個線程的任務處理完可以接受下一個任務,並不需要頻繁的創建銷毀,這樣大大節省了時間和系統的開銷。

線程池,顧名思義,就是一個池子,任務提交的到線程池後,線程池會在池子裏邊找有沒有空閑的線程,如果沒有,就會進入等待狀態,有就會分配一個空閑的線程來接受這個任務,當服務執行完,從新放回到線程池,不需要銷毀。在這種模式下,系統大大減少了創建線程個銷毀線程的資源開銷,而且一個線程可以用來執行多個任務,我們可以根據系統的配置靈活調整線程池的大小,從而更高效的執行任務。

二、線程池類之間關系

線程池主要包含:Executors,Executor,ExecutorService,AbstractExecutorService,ThreadPoolExecutor這些類。Executors用來創建線程池,返回ExecutorService的對象,該對象就可以調用execute方法或者submit方法執行線程。當然,我們也可以自己new一個。

Executor,ExecutorService,AbstractExecutorService,ThreadPoolExecutor的繼承關系的繼承關系為:Executor是一個接口,裏面只有execute方法聲明,接口ExecutorService繼承Executor接口,裏面包含shutdown(),shutdownNow(),isTerminated(),submit等方法; AbstractExecutorService是ExecutorService的實現類,實現了該類中的方法,ThreadPoolExecutor繼承AbstractExecutorService。

三、線程池狀態說明

RUNNING可以接受新任務,也可以處理阻塞隊列裏面的任務

SHUTDOWN不接受新任務,但是可以處理阻塞隊列裏的任務

STOP不在接收新任務,也不再處理阻塞隊列裏的任務,並中斷正在處理的任務

TIDYING中間狀態:線程池中沒有有效的線程,調用terminate進入TERMINATE狀態

TERMINATE:終止狀態

四、線程池源碼分析

ExecutorService  executor = Executors.newFixedThreadPool(100);

通過API我們可以看到創建線程池的過程。

public static ExecutorService newFixedThreadPool(int
nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }

Executors這個類中基本都是靜態方法,代理了線程池的創建,大大簡化了我麽創建線程池工作量,通過方法名我們就可以創建我們想要的線程池,他的內部其實都是統一的方法實現的,通過構造方法重載實現不同的功能,但是不看源碼,是很難知道他們的具體作用的。我們可以看到,這裏面有好幾種創建線程池的方法,他們有什麽區別呢?

1. newFixedThreadPool(int)方法,內部實現如下:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

創建指定大小的線程池,如果超出大小,放入block隊列,即LinkedBlockingQueue隊列,默認的線程工廠為defaultThreadFactory。

2. newWorkStealingPool(int),內部實現如下:

public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}

JDK1.8新增,返回ForkJoin,個人感覺有一點mapReduce的思想。

3.newSingleThreadPool,源碼如下:

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));

創建單個線程的線程池。

4. newCachedThreadPool,源碼如下:

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

線程池長度超過處理需要,靈活回收空閑線程,若無可回收,則創建新線程。

Executors裏面還有好多方法,我們仔細查看API就可以了解的個大概,它是一個工具類,提供了一些靜態方法。從源碼中我們可以看到創建線程池返回的是return new ThreadPoolExecutor方法,它的源碼如下:

技術分享圖片
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
View Code

剛看源碼的時候的確很痛苦,我們得熟悉作者的思想,他為什麽要這麽寫,知道了作者的思想以後就好辦多了,我是結合英文說明來揣摩的,下面我們看每個參數的意思。

corePoolSize核心線程大小,線程數一旦超過這個值,多余的就會被放入等待隊列

maximumPoolSize線程池中的最大線程數量,這個一般用不到,源碼中可以看到corePoolSize和maximumPoolSize是一樣的,不同的是大於這個值會由丟棄處理機制來處理,不會被放入等待隊列。

keepAliveTime保持時間,當線程沒有任務處理後,保持多久結束,默認是0

workQueue等待隊列,默認為LinkedBlockingQueue,這就是前面提到的等待隊列,裏面是一個HashSet,內部包裝了一層。

threadFactory構造Thread方法,我們可以自己包裝和傳遞,實現newThread方法

handler這就是前面提到的丟棄處理機制方法,實現接口RejectExecutionHandler中的方法即可。

在做項目的時候發現線程池有兩個執行方法可供調用,分別是execute和submit,那麽這兩個方法有什麽區別呢?在看submit源碼的時候可以看到submit最終還是會調用execute方法。

技術分享圖片

不同的是submit方法提供了一個Future來托管返回值的處理,當調用這個方法需要有返回值的時候,可以用這個方法,execute只能接受Runnable作為參數,而submit除了Runnable還可以接收Callable。

下面來分析最重要的execute方法源碼:

技術分享圖片
 1 public void execute(Runnable command) {
 2         if (command == null)
 3             throw new NullPointerException();
 4         /*
 5          * Proceed in 3 steps:
 6          *
 7          * 1. If fewer than corePoolSize threads are running, try to
 8          * start a new thread with the given command as its first
 9          * task.  The call to addWorker atomically checks runState and
10          * workerCount, and so prevents false alarms that would add
11          * threads when it shouldn‘t, by returning false.
12          *
13          * 2. If a task can be successfully queued, then we still need
14          * to double-check whether we should have added a thread
15          * (because existing ones died since last checking) or that
16          * the pool shut down since entry into this method. So we
17          * recheck state and if necessary roll back the enqueuing if
18          * stopped, or start a new thread if there are none.
19          *
20          * 3. If we cannot queue task, then we try to add a new
21          * thread.  If it fails, we know we are shut down or saturated
22          * and so reject the task.
23          */
24         int c = ctl.get();
25         if (workerCountOf(c) < corePoolSize) {
26             if (addWorker(command, true))
27                 return;
28             c = ctl.get();
29         }
30         if (isRunning(c) && workQueue.offer(command)) {
31             int recheck = ctl.get();
32             if (! isRunning(recheck) && remove(command))
33                 reject(command);
34             else if (workerCountOf(recheck) == 0)
35                 addWorker(null, false);
36         }
37         else if (!addWorker(command, false))
38             reject(command);
39 }
View Code

代碼解釋:如果任務為空,返回空異常;接下來int c = ctl.get();獲取線程池的狀態位,進入if中計算線程池的數量,如果小於線程池的核心線程數,就封裝成一個工作(work),失敗了繼續獲取線程池狀態位;if (isRunning(c) && workQueue.offer(command))判斷線程池是否正常運行,正常的話就把當前線程添加到工作隊列並且再次獲取線程池狀態位,if (! isRunning(recheck) && remove(command))如果沒有運行的線程了,就把剛才添加的線程移除,移除成功後,使用拒絕策略reject(command); else if (workerCountOf(recheck) == 0)

addWorker(null, false);如果線程池的線程數為0,那麽就要添加一個空任務繼續運行,以此來保證可以繼續接收新任務而繼續運行。

else if (!addWorker(command, false))

reject(command);

如果核心線程滿了,工作隊列也飽和了,開啟非核心線程也失敗了就會拒絕,此時已經達到最大線程數了。

從英文解釋中,我們可以看到:基本分三步:

a) 開啟線程執行任務,直到達到最大核心線程數

b) 達到核心線程數時,將接受的新任務放入工作隊列

c) 當工作隊列也放滿後,就會開啟線程(非核心)執行任務,直到到達最大線程數

d) 以上條件都不滿足時,就執行默認的拒絕策略

addWork源碼:

技術分享圖片
private boolean addWorker(Runnable firstTask, boolean core) {
        retry: //循環標誌
        for (;;) { 死循環
            int c = ctl.get();//獲取狀態位
            int rs = runStateOf(c);//計算線程池的狀態

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;//這一段說的是線程池不能正常運行的情況:線程池狀態關閉、任務為空、隊列為空返回錯誤

            for (;;) {//死循環
                int wc = workerCountOf(c);//計算線程數
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;//如果線程數超出核心線程數,返回錯誤
                if (compareAndIncrementWorkerCount(c))//增加worker的數量
                    break retry;//回到進入該方法的循環狀態
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;//如果狀態發生改變,就回退
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;//線程是否開始運行
        boolean workerAdded = false;//worker是否添加成功
        Worker w = null;
        try {
            w = new Worker(firstTask);//封裝成worker
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;//加鎖
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());計算線程池狀態

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;//worker添加成功
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();//啟動剛剛添加的任務
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);//失敗後執行的操作
        }
        return workerStarted;
}
View Code

從對源碼的翻譯中我們可以知道這個方法是有什麽作用,簡單說就是:創建任務,封裝任務。

五、線程測試

進行一個簡單的測試模擬線程池的工作原理:

模擬多線程:

技術分享圖片
public class TestThreadPool implements Runnable {
    @Override
    public void run() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
View Code

測試類:

技術分享圖片
public static void main(String[] args) {
        //指定3個長度的工作隊列
        LinkedBlockingDeque<Runnable> workQueue=new LinkedBlockingDeque<>(3);
        //指定線程池參數:核心線程數,線程池最大線程數量,活躍時間,工作隊列
        ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(4, 7, 90, 
                TimeUnit.SECONDS, workQueue);
        for (int i = 0; i < 15; i++) {
            threadPoolExecutor.execute(new Thread(new TestThreadPool(), 
                    "線程:".concat(i+"")));
            System.out.println("線程池中活躍線程數"+threadPoolExecutor.getActiveCount());
            if(workQueue.size()>0){
                System.out.println("被阻塞的線程數為:"+workQueue.size());
            }
        }
    }
View Code

指定線程池核心數為4,最大線程數量7,工作隊列最大放入3個線程,模擬15個線程並發。運行結果如下:

技術分享圖片
線程池中活躍線程數1
線程池中活躍線程數2
線程池中活躍線程數3
線程池中活躍線程數4
線程池中活躍線程數4
被阻塞的線程數為:1
線程池中活躍線程數4
被阻塞的線程數為:2
線程池中活躍線程數4
被阻塞的線程數為:3
線程池中活躍線程數5
被阻塞的線程數為:3
線程池中活躍線程數6
被阻塞的線程數為:3
線程池中活躍線程數7
被阻塞的線程數為:3
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task Thread[線程:10,5,main] rejected from java.util.concurrent.ThreadPoolExecutor@42a57993[Running, pool size = 7, active threads = 7, queued tasks = 3, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    at main.Main.main(Main.java:19)
View Code

可以看到,創建了4個核心線程和3個非核心線程,當線程數超出了線程池可容納的的最大數量,執行了拒絕策略Reject,說明隊列和線程池都滿了,線程池處於飽和狀態,另外一個原因是完成的線程沒有及時釋放,而是進入了休眠。

線程池工作原理:任務開始後,開始創建新的線程,當達到核心線程數後,新的任務進來不在創建新的線程,這時候把任務加入工作隊列,當達到工作隊列的長度後,新任務開始創建新的普通線程,直到數量達到線程池的最大核心數量,後面再有新任務則執行飽和策略或拒絕,拋出異常。

線程池ThreadPoolExecutor源碼解讀研究(JDK1.8)