1. 程式人生 > >淺談java執行緒池(基於jdk1.8)

淺談java執行緒池(基於jdk1.8)

多執行緒讓程式世界豐富多彩,也讓其錯綜複雜。對於執行緒的建立和銷燬成了一筆不小的開銷,為了減少這些開銷,出現了執行緒池。執行緒池對執行緒進行管理,對於需要使用多執行緒的你來說,只需要把你的任務丟給執行緒池就可以了。當你把任務丟給執行緒池的時候,它是如何處理的呢?我們去原始碼中尋找蹤跡。

ThreadPoolExecutor

執行緒池在JDK中的主要實現類就是這個ThreadPoolExecutor。我們首先看一下他的建構函式

    public ThreadPoolExecutor(int corePoolSize,//核心執行緒數
                              int
maximumPoolSize,//最大執行緒數 long keepAliveTime,//存活時間 TimeUnit unit,//存活時間的單位(秒、毫秒等) BlockingQueue<Runnable> workQueue,//阻塞佇列 RejectedExecutionHandler handler) {//拒絕策略 ... }

建構函式中出現的這幾個引數都是執行緒池的重要指標,我們用幾句話把它們串起來,順便說明他們是如何發揮作用的: 執行緒池中有兩種重要的元素,一是執行緒,二是阻塞佇列。 1、當執行緒池剛初始化時,執行緒為0,阻塞佇列為空。 2、第一個任務來臨時,執行緒池為它新建一個執行緒來執行,第二個任務來臨時,執行緒池再為它新建一個執行緒來執行,直到新建的執行緒數達到了核心執行緒數,執行緒池暫時就不會再新建執行緒了。 3、新來的任務將會被放到阻塞佇列中,隨著新任務的不斷到來,如果阻塞佇列已滿,那麼執行緒池將會繼續為新來的任務新建執行緒,直到執行緒數達到了最大執行緒數。 4、這時,對與新來的任務,執行緒池將不會直接接受,而是執行拒絕策略

。 5、拒絕策略有很多種,預設的是直接丟擲異常。還有其他三種①丟棄當前被拒絕的任務。②丟棄最老的任務,重新嘗試接受新任務。③在呼叫者執行緒中執行這個任務

下面我們去原始碼中尋找以上敘述的蹤跡

Execute執行緒池的執行入口

execute方法是執行的入口

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            //如果此時執行緒數小於核心執行緒數,則增加執行緒處理任務
            if (addWorker(command, true))//A
                return;//增加成功,結束
            c = ctl.get();
        }//執行緒數已經不小於核心執行緒數,進行入隊
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))//如果執行緒池已不再執行,取出任務,執行拒絕策略
                reject(command);
            else if (workerCountOf(recheck) == 0)//此刻沒有執行緒,就增加一個執行緒
                addWorker(null, false);//C
        }//如果我們沒能成功入隊,那麼久增加一個執行緒,如果失敗;說明執行緒池已關閉或已飽和,執行拒絕策略
        else if (!addWorker(command, false))//B
            reject(command);
    }

以上程式碼清晰可見,基本還原了上述文字敘述的流程。但我們發現當執行緒數不小於核心執行緒數時,入隊之後,還進行了一些檢測操作,就是看當前執行緒池是否還在執行,如果已經停止執行,那麼取出入隊的任務,執行拒絕策略。所以拒絕策略不只是執行緒池飽和之後執行,停止執行也會執行,當然這也是情理之中的事情。addWorker就是增加執行緒來處理任務,但我們發現這個方法的引數除了Runnable還有一個,是一個boobean值,並且在上面程式碼中的A處和B處分別呼叫了true和false,這裡有何奧妙?我們來剖開addWorker

增加工作者

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&//監測執行緒池狀態
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                //如果此時有效執行緒數已經超過bound(核心執行緒數或最大執行緒數),返回false
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    //CAS增加worker數量記錄,成功則跳出迴圈
                    break retry;
                c = ctl.get();  // Re-read ctl準備重試
                if (runStateOf(c) != rs)//如果執行緒池狀態傳送變化,從外迴圈重新開始(進行狀態監測)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

前期判斷

其實我們在上一段程式碼中就看到了c = ctl.get()這一句程式碼。這裡取出的c是一個int值,高3位表示的是此刻執行緒池的狀態,低29位表示的是此刻執行緒數。因為這個數會有多個執行緒對它進行操作,所以將它用AtomicInteger進行了包裝,並且提供從這個Int值中取出執行緒數和執行緒池狀態的方法(一些位操作),原始碼如下:

 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

從上也可以看到執行緒池有五種狀態。當呼叫shutdown()、shutdownnow()等方法時,執行緒池的狀態會發生變化,從而影響執行緒池對與新來任務的策略,這個也在addWorker中有所體現。進入addWorker這個方法首先就是進行執行緒池狀態的檢測,如果處於非執行狀態,就會返回false。但也有個特殊情況如果 (rs == SHUTDOWN &&firstTask == null && ! workQueue.isEmpty())這三個條件同時為真的話,將不會返回false。這個條件滿足時是什麼狀態呢?執行緒池處於SHUTDOWN狀態,佇列不為空,且呼叫的是addWorker(null,true/false)。此時基本就是這麼個狀態:執行緒池準備關閉了,需要新建一些執行緒來把佇列中的任務處理掉。 看完了這個檢測過程,就進入了內for迴圈,這個for迴圈中首先判斷執行緒數是否超過了某個值,如果超過,返回false,不再新建執行緒。可以發現這個值是由addWorker的第二個引數控制的,如果為true,這個值就是corePollSize,如果為false,這個值就是maxinumPoolSize。分別對應了核心執行緒的新建,和超過核心執行緒數其他執行緒的新建。然後CAS改變執行緒數量記錄,如果成功,跳出迴圈,進行執行緒的新建。如果不成功,則重試,並且如果執行緒池狀態發生了變化,還需要繼續外層迴圈,重新進行狀態檢測。兩個for迴圈之後的程式碼就是進行執行緒的新建,並且啟動這個執行緒。新建執行緒的工作是在Worker的建構函式中進行的

執行緒的新建

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

可以看到建構函式將自己傳給了newThread來新建執行緒,也就是說Worker類有一個thread成員變數,這個thread又是通過Worker來構造的。而我們啟動這個執行緒的時候呼叫的就是這個執行緒的start方法,下面看一下這個執行緒的執行邏輯,也就是worker的run方法

任務的執行

public void run() {
            runWorker(this);
        }

run方法就是呼叫了runWorker方法,我們再進入runWorker

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

這個方法主體是一個while迴圈,首先處理firstTask,處理完之後就去佇列裡getTask()。處理的過程很簡單,就是呼叫task的run方法(此刻的run方法呼叫才是對任務的處理)。getTask我們可以稍微看一下

任務的獲取

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?//判斷執行緒是否需要回收
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

如果執行緒不需要回收的話,他會去take任務,take方法如果取不到任務就會一直阻塞,取到就執行,因此這個執行緒就不會終結。但如果執行緒需要回收,那麼執行緒會去poll任務,阻塞時間一旦超過了keepAliveTime,poll就會返回null,從而執行緒也就不會繼續這個“取任務並執行任務”的迴圈,實現執行緒的回收。

各種執行緒池

JDK提供了一個工具類Executors來讓我們方便的建立各種執行緒池。 1、newFixedThreadPool 這個執行緒池中執行緒的數量是一定的,佇列無限長,不能及時處理的任務在佇列中等待。

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

2、newWorkStealingPool 這個執行緒池是一個支援並行任務處理的執行緒池,傳入的引數就是我們目標的並行度,為了減少爭用,內部可能出現多個佇列,實際的執行緒數也會動態的增加和減少,任務的先後執行順序並不是一定的。

public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

3、newSingleThreadExecutor 這個執行緒池中只會有一個執行緒和一個無界佇列。可以保證任務的執行順序,並且任何一個時刻只有一個任務在執行。

 public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

4、newCachedThreadPool 這個執行緒池只會在需要的時候建立執行緒,每個執行緒如果空閒時間超過60秒就會被回收。對執行緒的數量沒有限制,有記憶體溢位的風險。但長時間不適用的話它將是耗費資源最小的執行緒池,因為所有的執行緒都會被回收。

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

5、newScheduledThreadPool 這個執行緒池可以用來週期性的執行一些任務

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }