1. 程式人生 > >線程池:對ThreadPoolExecutor的理解和源碼探索

線程池:對ThreadPoolExecutor的理解和源碼探索

cep star 等待 啟動 running 占用 效率 tee 創建線程

對線程池的理解

在沒有引入線程池之前,如果去創建多線程,就會出現這幾種情況:第一,創建現場本身就占用CPU資源,給CPU帶來壓力;第二,線程本身也要占用內存空間,大量的線程會占用內存資源並且可能會導致Out of Memory。第三,線程調用結束後,大量的線程回收也會給GC帶來很大的壓力。第四,頻繁的創建和銷毀線程會降低系統的效率。

這個時候,線程池就應運而生,為了避免重復的創建線程,線程池的出現可以讓線程進行復用。

ThreadPoolExecutor的核心字段

private final BlockingQueue<Runnable> workQueue:一個阻塞隊列,用來存儲等待執行的任務,當線程池中的線程數超過它的corePoolSize的時候,線程會進入阻塞隊列進行阻塞等待。通過workQueue,線程池實現了阻塞功能。

private final HashSet<Worker> workers:用來存儲在線程池中的所有Worker的集合。

private int largestPoolSize:線程池最大可實現的線程數量。

private volatile RejectedExecutionHandler handler:表示當拒絕處理任務時的策略,saturated 或者 shutdown被執行時被調用。

private volatile long keepAliveTime:表示線程沒有任務時最多保持多久然後停止。默認情況下,只有線程池中線程數大於corePoolSize 時,keepAliveTime 才會起作用。換句話說,當線程池中的線程數大於corePoolSize,並且一個線程空閑時間達到了keepAliveTime,那麽就是shutdown。

private volatile int corePoolSize:線程池核心線程數量。可以理解為允許線程池中允許同時運行的最大線程數。

private volatile int maximumPoolSize:線程池最大可容線程數量。

private final ReentrantLock mainLock:線程池可重入鎖。

private static final RejectedExecutionHandler defaultHandler:線程池拒絕策略。

線程池的狀態以及Bit值

private static final int RUNNING = -1 << COUNT_BITS;

private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

最核心的execute()方法

  /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn‘t, by returning false.
         *
      如果線程池中的線程數量小於corePollSize,則線程池會嘗試去創建一個新的線程,並把這個command作為第一個任務來執行。
      在調用addWorker方法時,會對runState和workerCount做原子性檢查。如果檢查失敗,addWorker方法會阻止添加線程,並返回false。
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
      
如果一個任務成功進入阻塞隊列,那麽我們需要進行一個雙重檢查來確保是我們已經添加一個線程(因為存在著一些線程在上次檢查後他已經死亡)
      或者當我們進入該方法時,該線程池已經關閉。
      所以,我們將重新檢查狀態,線程池關閉的情況下則回滾入隊列,線程池沒有線程的情況則創建一個新的線程。
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
     
如果任務無法入隊列(隊列滿了),那麽我們將嘗試新開啟一個線程(從corepoolsize到擴充到maximum),如果失敗了,那麽可以確定原因,要麽是
     線程池關閉了或者飽和了(達到maximum),所以我們執行拒絕策略
      */


     //AtomicInteger ctl,為原子操作類,在多線程下實現安全的自增自劍,排除了多線程下的可見性和指令重排序問題。 int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true))
        //執行成功,則返回
return; c = ctl.get(); }
    //檢查線程池狀態,如果是運行狀態,則進入阻塞隊列
if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); }
    //進行到這裏有兩種情況,1,線程池狀態不是RUNNING,2,線程池workerCount >= corePoolSize並且workQueue已滿,需要擴容corePoolSize到maximumPoolSize
else if (!addWorker(command, false)) reject(command); }

excute()方法執行的邏輯入下圖

技術分享圖片

現在看看addWorker方法的具體實現

/**
     * Checks if a new worker can be added with respect to current
     * pool state and the given bound (either core or maximum). If so,
     * the worker count is adjusted accordingly, and, if possible, a
     * new worker is created and started, running firstTask as its
     * first task. This method returns false if the pool is stopped or
     * eligible to shut down. It also returns false if the thread
     * factory fails to create a thread when asked.  If the thread
     * creation fails, either due to the thread factory returning
     * null, or due to an exception (typically OutOfMemoryError in
     * Thread#start), we roll back cleanly.
     *
     * @param firstTask the task the new thread should run first (or
     * null if none). Workers are created with an initial first task
     * (in method execute()) to bypass queuing when there are fewer
     * than corePoolSize threads (in which case we always start one),
     * or when the queue is full (in which case we must bypass queue).
     * Initially idle threads are usually created via
     * prestartCoreThread or to replace other dying workers.
     *
     * @param core if true use corePoolSize as bound, else
     * maximumPoolSize. (A boolean indicator is used here rather than a
     * value to ensure reads of fresh values after checking other pool
     * state).
     * @return true if successful
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry://外層循環,判斷線程池狀態
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
        //內層循環,主要是對worker數量加一
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
    //主要是把Runnable封裝到Worker中,並添加到WorkerSet集合中。
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
            //啟動線程 t.start(); workerStarted
= true; } } } finally {
      //添加失敗處理
if (! workerStarted) addWorkerFailed(w); } return workerStarted; }

線程池:對ThreadPoolExecutor的理解和源碼探索