1. 程式人生 > >Java 執行緒池ThreadPoolExecutor(基於jdk1.8)(二)

Java 執行緒池ThreadPoolExecutor(基於jdk1.8)(二)

  • private boolean addWorker(Runnable firstTask, boolean core)
    首先分析一下引數,firstTask就是指我們使用者傳入的需要執行的認為,core引數,當為真時,表示當前執行緒數小於corePoolSize,為假就是執行緒數大於corePoolSize。該方法主要有兩塊大的邏輯,第一部分是通過CAS操作去跟新當前的執行緒數量:
retry:
for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);

    // Check if queue empty only if necessary.
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return
false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } }

    兩個大的for無限迴圈體,外邊的迴圈就是不斷檢測當前執行緒池的狀態,一旦發現不處於Running狀態了,就立刻返回false了,因為通過前邊文章分析我們知道,當執行緒池呼叫了shutdown方法後,就不再接收心的任務了。
    再看內層的迴圈體,主要就是通過CAS操作去更新執行緒池數量,
if (compareAndIncrementWorkerCount(c))如果更新成功,那麼就會跳出外層迴圈,繼續後邊的邏輯。如果更新失敗,需要再次檢查當前執行緒池的狀態,因為CAS操作是沒有進行同步操作的,所以有可能此時執行緒池的狀態已經變了,需要再次檢測。發現狀態發生變化,那麼需要跳出內層迴圈直接進行下一次的外層迴圈,重新檢查執行緒池狀態。如果執行緒池的狀態沒有發生變化,則繼續下一次的內層迴圈,重新嘗試更新執行緒池數量直至更新成功。
    執行緒池數量更新成功了,就進入下一個邏輯塊,新增執行緒:

boolean workerStarted = false;
 boolean workerAdded = false;
 Worker w = null;
 try {
     w = new Worker(firstTask);
     final Thread t = w.thread;
     if (t != null) {
         final ReentrantLock mainLock = this.mainLock;
         mainLock.lock();
         try {
             // Recheck while holding lock.
             // Back out on ThreadFactory failure or if
             // shut down before lock acquired.
             int rs = runStateOf(ctl.get());

             if (rs < SHUTDOWN ||
                 (rs == SHUTDOWN && firstTask == null)) {
                 if (t.isAlive()) // precheck that t is startable
                     throw new IllegalThreadStateException();
                 workers.add(w);
                 int s = workers.size();
                 if (s > largestPoolSize)
                     largestPoolSize = s;
                 workerAdded = true;
             }
         } finally {
             mainLock.unlock();
         }
         if (workerAdded) {
             t.start();
             workerStarted = true;
         }
     }
 } finally {
     if (! workerStarted)
         addWorkerFailed(w);
 }
 return workerStarted;
}

    第5行首先新建了一個worker,這個類內部封裝了我們的runnable物件還有執行緒物件(隨後會進行分析),第9行進行加鎖,線上程池操作中,所有涉及到執行緒池狀態變化的程式碼都需要進行同步操作,前一部分用的是CAS,這一部分用到了可重入鎖。
     在判斷執行緒池是處於running狀態後,在20行workers.add(w),把工作執行緒新增到一個Set集合中,workers就是一個HashSet用於存放當前執行緒池中存活的執行緒。後邊就很簡單,記錄了一下最大的執行緒數。
     關鍵看30行,t.start()啟動了執行緒。這個t就是我們新建的worker中的執行緒。這樣一下來我們線上程數小於corePoolSize的時候,發起的任務就直接通過新建執行緒開始運行了。
    為了瞭解發出的任務到底是怎麼執行的,還得繼續分析Worker這個類。

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    public void run() {
        runWorker(this);
    }
    ...//省略了重寫AbstractQueuedSynchronizer類中的方法
}

     這個worker還是繼承了Runnable介面,內部有三個成員變數,註釋中已經寫得很明白都要什麼作用。我們關注一下run方法,其實就呼叫了一個runWorker(this),把自己作為引數傳遞進去。那麼繼續跟進這個方法(請保持清醒,因為已經有好幾層呼叫了。。):

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
    while (task != null || (task = getTask()) != null) {
        w.lock();
        if ((runStateAtLeast(ctl.get(), STOP) ||
             (Thread.interrupted() &&
              runStateAtLeast(ctl.get(), STOP))) &&
            !wt.isInterrupted())
            wt.interrupt();
        try {
            beforeExecute(wt, task);
            Throwable thrown = null;
            try {
                task.run();
            } catch (RuntimeException x) {
                thrown = x; throw x;
            } catch (Error x) {
                thrown = x; throw x;
            } catch (Throwable x) {
                thrown = x; throw new Error(x);
            } finally {
                afterExecute(task, thrown);
            }
        } finally {
            task = null;
            w.completedTasks++;
            w.unlock();
        }
    }
    completedAbruptly = false;
} finally {
    processWorkerExit(w, completedAbruptly);
}
}

     主體是一個while迴圈,判斷當前task是否為空,不為空就直接執行此任務,關鍵程式碼在第19行task.run(),直接呼叫Runnable的run方法,那就是在當前執行緒執行它,那麼問題來了,現在當前執行緒到底是哪個執行緒了?是不是有點亂了,趕緊來個流程圖,我們觀察一下函式呼叫:

方法呼叫過程

     第一二步都是在使用者執行緒(呼叫execute方法時的那個執行緒),後邊三步之後已經切換到新建的worker執行緒中去了,所以終端使用者傳遞給execute的那個Runnable物件是在新的執行緒中執行的。
    ok!回到上邊程式碼的while迴圈,或條件判斷的第二個條件回去呼叫getTask方法。看名字都知道,肯定是去BlockingQueue(任務快取佇列)中去取下一個需要執行的任務了。
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {      
        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

    第7行的boolean timed = allowCoreThreadTimeOut || wc > corePoolSize,主要是判斷當前執行緒是否需要進行銷燬,allowCoreThreadTimeOut 為真,就是說不管當前的執行緒數有沒有超過corePoolSize,都需要根據keepAliveTime去判斷執行緒的存活期。如果為假那麼就看第二個語句,就是當前執行緒數大於corePoolSize就需要判斷執行緒的存活期,如果不大於,那麼就不需要判斷存活期了。
    接下來主要關注一下16-25行的try程式碼塊,需要判斷執行緒的存活期那麼r就為workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)返回的值,也就是說嘗試從快取佇列取任務,時間限制為keepAliveTime,如果在這段時間內取到了任務,就返回,如果沒取到那麼會丟擲異常,那麼就表明這個執行緒已經在keepAliveTime這個時間段裡沒有執行了,那麼該執行緒就該被銷燬了。
     time為false的話,就表明不需要判斷執行緒的存活期,直接呼叫workQueue.take(),由於執行緒池採用的阻塞佇列,那麼該執行緒就會一直阻塞,直到從佇列中取到新的任務,然後返回。
    到此我們的ThreadPoolExecutor的大致流程就分析完畢了,其中還有很多細節就沒法完全覆蓋到了,如果有理解不當的地方,歡迎大家指出,共同學習交流!