1. 程式人生 > >執行緒池的原始碼分析

執行緒池的原始碼分析

重要變數基本介紹

  1. corePoolSize:核心執行緒數
  2. maximumPoolSize:最大執行緒數(理論上最大值 1<<29 - 1)
  3. largestPoolSize:用於統計實際執行緒的最大併發量
  4. workQueue:執行緒的儲存佇列
  5. AtomicInteger:這個類是ThreadPoolExecutor的內部類,其成員變數value(int)引數又來儲存執行緒數大小,其中最高3位表示執行緒狀態,剩餘29位用來儲存執行緒數
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static
final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits // 1110 0000 0000 0000 0000 0000 0000 0000(是一個負數,具體負多少,可以自行計算一下) private static final int RUNNING = -1 << COUNT_BITS; // 0000 0000 0000 0000 0000 0000 0000 0000 private static final int SHUTDOWN = 0 << COUNT_BITS; // 0010 0000 0000 0000 0000 0000 0000 0000 private
static final int STOP = 1 << COUNT_BITS; // 0100 0000 0000 0000 0000 0000 0000 0000 private static final int TIDYING = 2 << COUNT_BITS; // 0110 0000 0000 0000 0000 0000 0000 0000 private static final int TERMINATED = 3 << COUNT_BITS;

**

工作原理,我們以ThreadPoolExecutor分析:

外部入口execute(Runnable command):

 public void execute
(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }

獲取當前執行緒數和執行緒狀態的引數ctl.get(),ctl在構建ThreadPoolExecutor物件是生成。看如下程式碼

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
...
...
...
private static int ctlOf(int rs, int wc) { return rs | wc; }// RUNNING: 1110 0000 0000 0000 0000 0000 0000 0000

通過函式workerCountOf(c)獲取當前執行緒數與corePoolSize比較,當前執行緒數小於核心執行緒數時,將執行緒加入到工作佇列中:

private static int workerCountOf(int c)  { return c & CAPACITY; }
//比如初始化第一次執行時:c = ctl.getValue(),c =  1110 0000 0000 0000 0000 0000 0000 0000
 & 0001 1111 1111 1111 1111 1111 1111 1111,結果為0,基表示當前執行緒數為0

分析addWork(command, true)原始碼:

 private boolean addWorker(Runnable firstTask, boolean core) { //core是否小於核心執行緒
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            //  private static int runStateOf(int c)     { return c & ~CAPACITY; }
            // 就是比較前三位
            //  比如: 1110 0000 0000 0000 0000 0000 0000 0000 
            //    &   1110 0000 0000 0000 0000 0000 0000 0000 
            //    =   1110 0000 0000 0000 0000 0000 0000 0000 

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && // 第一個條件執行緒池狀態大於0(SHUTDOWN=0,STOP=1,TIDYING=2,TERMINATED=3)
            //第二個條件不太理解
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY || //**在這裡可以看出最大執行緒數不能大於(1<<29) -1**
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))//滿足條件,此函式會把ctl中的value值對應增加,新增成功退出迴圈
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)//若此事執行緒狀態發生改變,則繼續迴圈
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)// 計算實際最大執行緒數
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start(); //執行執行緒
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted) // 如果新增失敗,講執行緒移除佇列,ctl.value值減少一位,並改變執行緒狀態
                addWorkerFailed(w);
        }
        return workerStarted;
    }

今天先寫到這裡,下次繼續。。。