1. 程式人生 > >JAVA併發程式設計:執行緒池 ThreadPoolExecutor

JAVA併發程式設計:執行緒池 ThreadPoolExecutor

生活

前期追深度,否則會華而不實,後期追廣度,否則會坐井觀天;

前言

在前面,我們已經對Thread有了比較深入的瞭解,並且已經學會了通過new Thread()來建立一個執行緒,並通過start方法來啟動一個執行緒,這種方法非常簡單,同樣也存在弊端:
1、每次通過new Thread()建立物件效能不佳
2、執行緒缺乏統一管理,可能無限建立執行緒,相互競爭,極端情況下回出現OOM
3、無法提供定時執行、定期執行
所以在企業級專案裡,用到執行緒的地方,大多有執行緒池的介入。

ThreadPoolExecutor的成員

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

用來表示執行緒池的狀態以及正在執行的執行緒數。其中高3位描述執行緒池的狀態,低29位用來表示正在執行的執行緒數。當執行緒池建立時,預設的狀態是Running,預設的正在執行的執行緒數為0.
執行緒池的狀態有以下幾種:
RUNNING:允許已經和執行新任務
SHUTDOWN:不允許提交新任務,會執行已經提交到佇列的任務
STOP:不允許提交新任務,也不執行在佇列等待的任務,設定正在執行的任務的中斷標誌位
DITYING:所有任務執行完畢,執行緒池中的工作執行緒為0,等待執行鉤子方法terminated
TERMINATED:鉤子方法執行完畢

注意這裡的狀態指的是執行緒池的狀態,並不是指執行緒池中的執行緒狀態。
執行shutdown方法,可以使執行緒池的狀態由RUNNING轉為SHUTDOWN;
執行shutdownNow方法,可以使執行緒池的狀態由RUNNING轉為STOP
SHUTDOWN和STOP都會先轉為DITYING,再轉為TERMINATED.

以下成員對執行緒池的效能有很大影響,放在構造器裡詳說。

    private final BlockingQueue<Runnable> workQueue;
    private volatile int maximumPoolSize;
    private volatile long keepAliveTime;
    private volatile ThreadFactory threadFactory;
    private volatile RejectedExecutionHandler handler;
    private volatile int corePoolSize;
    private volatile int maximumPoolSize;

//執行緒池建立至今 最大的執行緒數
private int largestPoolSize;
//已完成的任務數
private long completedTaskCount;
//是否允許在空閒時銷燬核心執行緒
private volatile boolean allowCoreThreadTimeOut;
//工作執行緒的集合
private final HashSet<Worker> workers = new HashSet<Worker>();

ThreadPoolExecutor 建構函式

//所有建構函式最終都呼叫到這個構造器,來看下他的引數
 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

corePoolSize:執行緒池中核心執行緒數的最大值
maximumPoolSize :執行緒池中最多的執行緒數
workQueue:用來快取任務的阻塞佇列
用一個新增新任務的場景來描述上面三者的關係
1、如果沒有空閒執行緒且執行緒數小於核心執行緒數,就建立一個新的執行緒執行
2、如果沒有空閒執行緒且執行緒數等於核心執行緒數,就把任務快取到阻塞佇列
3、如果沒有空閒執行緒且執行緒數小於最大執行緒數且阻塞佇列已滿,則建立一個新的執行緒執行
4、如果沒有空閒執行緒且執行緒數大於最大執行緒數且阻塞佇列已滿,則根據建構函式傳入的拒絕策略做出相應操作。

keepAliveTime:空閒超過這個時間的執行緒會被銷燬【maximumPoolSize為true時,核心執行緒也由這個時間控制銷燬】
unit:上面超時時間的單位
threadFactory:建立執行緒的工廠
handler:拒絕策略
拒絕策略有以下四種
1、AbortPolicy:直接丟擲異常
2、CallerRunsPolicy:用執行緒池提交任務的執行緒去執行 直接run
3、DiscardOldestPolicy:丟棄最早進入佇列沒有執行的執行緒
4、DiscardPolicy:丟棄這個執行緒

執行緒池的提交

執行緒池的submit execute最終都執行下面的方法來提交執行緒

 public void execute(Runnable command) {
 //判空
        if (command == null)
            throw new NullPointerException();
    
        int c = ctl.get();
        //當工作執行緒數小於核心執行緒時
        if (workerCountOf(c) < corePoolSize) {
        //新增新的工作執行緒
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        
	//判斷執行緒池時執行中狀態, 並加入到等待佇列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //判斷是否依舊執行中狀態,如果不是,就移除這個執行緒
            if (! isRunning(recheck) && remove(command))
                reject(command);
                //如果這時候工作執行緒為0,就建立一個執行緒去阻塞佇列獲取任務去執行
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //新增核心執行緒以外的工作執行緒來執行。失敗就呼叫拒絕策略。
        else if (!addWorker(command, false))
            reject(command);
    }
-------
 private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            //這個判斷看了好久。。注意,,如果你一定要把兩個判斷條件一起看,真的不太好理解
            //我是這麼理解的
            //if裡面兩個條件:
            //1、rs >= SHUTDOWN
            //2、  ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty())
            //中間用&連線,也就是1不成立或者2不成立才會不進if裡面,往下走
	   // 也就是  rs<SHUTDOWN為true 即 執行中
	   //或者 rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()  即SHUTDOWN狀態,並且當前沒有提交新任務,並且等待佇列非空
//連起來就得知 新建一個工作執行緒的 條件是   執行中 或者 (SHUTDOWN狀態,等待佇列非空,不提交新任務)
//滿足這些條件,才能往下走

            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                //判斷邊界條件
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                    //cas新增工作執行緒數
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                //狀態有變化,從外層迴圈再來一遍
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
			
			//如果執行中 或者  已經SHUTDOWN,但是沒有提交新任務,,這個寫法就友好多了,一目瞭然
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                            //新增到工作執行緒集合
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                //執行
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

執行緒池的執行

執行緒池的執行看Work的run方法,run呼叫到runWork

 final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
        //work裡的firstTask獲取佇列裡的task不為空,就往下去執行
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                //鉤子方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                    //執行
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                    //鉤子方法
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

看一下 getTask方法如何從佇列中得到任務呢

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            //允許核心執行緒 超時銷燬  或者  (當前執行緒數超過核心執行緒數)
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            (
		//當前執行緒數 大於最大執行緒數 或者
		//已經超時
		)
		並且
		(
                當前工作執行緒數大於1 
                獲取等待佇列已經空了
               )
               就嘗試縮減工人
	
		
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

          //如果設定超時了,就要poll超時獲取
          //否則用take,無限期阻塞
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

執行緒池的關閉

 public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 更新狀態
            advanceRunState(SHUTDOWN);
            //嘗試中斷worker,呼叫interruptIdleWorkers傳入false;
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                // 如果執行緒還沒有中斷 並且能夠獲得Worker 的鎖,說明已經執行完了,就可以中斷到
//奇怪,不知道為啥這個worker沒有去繼承可重入鎖,而是寫了一模一樣的程式碼進去。。。。
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

後記

明天來研究下Executors給我們提供的幾個預設的ThreadPoolExecutor