1. 程式人生 > >多執行緒學習(6)執行緒池 ThreadPool

多執行緒學習(6)執行緒池 ThreadPool

threadpool模型:

呼叫方通過呼叫api將任務,裝進queue裡,然後會有一個機制監事queue裡有沒有task,如果有task,就分配給某個worker去執行。workers代表執行緒池的話.worker就是某條執行緒了。

執行緒池的構造方法:

Executor框架最核心的類是ThreadPoolExecutor,他是執行緒池的實現類,主要由下列7個元件構成。

package java.util.concurrent;

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

int corePoolSize,   //  執行緒池可使用執行緒數的最小值


int maximumPoolSize,  // 執行緒池容量的最大值

maximumPoolSize:是一個靜態變數,在變數初始化的時候,有建構函式指定.

long keepAliveTime, //  當執行緒池中的執行緒數大於corePoolSize時,keepAliveTime為多餘的空閒執行緒等待新任務的最長時間,超過這個時間後多餘的執行緒將被終止。這裡把keepAliveTime設定為0L,意味著多餘的空閒執行緒會被立即終止。

TimeUnit unit, //  執行緒的阻塞時間單位,它的執行方法是TimeUnit.unit

.Sleep(keepAliveTime);

內部呼叫了Thread.sleep()方法。但是它和Thread.sleep()方法的區別是,Thread.Sleep只能設定毫秒數,而TimeUnit.unit.Sleep()中的unit可以換成時間單位,比如DAYS、HOURS、MINUTES,SECONDS、MILLISECONDS和NANOSECONDS。

TimeUnit.MINUTES.sleep(4);  // sleeping for 4 minutes

BlockingQueue<Runnable> workQueue,  // 阻塞佇列,裡面是Runnable型別,執行緒的任務
ThreadFactory threadFactory,  // 建立執行緒,併為執行緒指定queue裡面的runnable,執行緒池的構造方法,支援自定義threadFactory傳入,我們可以自己編寫newThread()方法,來實現自定義的執行緒建立邏輯。

public interface ThreadFactory {
    Thread newThread(Runnable r);
}


RejectedExecutionHandler handler  // 當ThreadPoolExecutor已經關閉或ThreadPoolExecutor已經飽和時(達到了最大執行緒池大小且工作佇列已滿),execute()方法將要呼叫的Handler。

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

並且這些成員變數,都是volatile修飾的

    private volatile ThreadFactory threadFactory;

    private volatile RejectedExecutionHandler handler;

    private volatile long keepAliveTime;

    private volatile boolean allowCoreThreadTimeOut;

    private volatile int corePoolSize;

    private volatile int maximumPoolSize;

執行緒池成員屬性和api方法介紹

largestPoolSize: 是一個動態變數,是記錄執行緒曾經達到的最高值,也就是 largestPoolSize<= maximumPoolSize.

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

    public int getLargestPoolSize() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            return largestPoolSize;
        } finally {
            mainLock.unlock();
        }
    }

completedtaskcount:

返回已完成執行的近似任務總數。因為在計算期間任務和執行緒狀態可能動態改變,所以返回值只是一個近似值,但是該值在整個連續呼叫過程中不會減少。

當一個執行緒在workers容器中,準備remove時,執行緒會將自己的completedtaskcount賦值給執行緒池的completedtaskcount。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }
    public long getCompletedTaskCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            long n = completedTaskCount;
            for (Worker w : workers)
                n += w.completedTasks;
            return n;
        } finally {
            mainLock.unlock();
        }
    }

TaskCount 執行緒池執行的總任務數,包括已經執行完的任務數和任務佇列中目前還需要執行的任務數

    public long getTaskCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            long n = completedTaskCount;
            for (Worker w : workers) {
                n += w.completedTasks;
                if (w.isLocked())
                    ++n;
            }
            return n + workQueue.size();
        } finally {
            mainLock.unlock();
        }
    }

getActiveCount();Thread.activeCount()  得到是存活的執行緒數  返回值是int型別

    public int getActiveCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            int n = 0;
            for (Worker w : workers)
                if (w.isLocked())
                    ++n;
            return n;
        } finally {
            mainLock.unlock();
        }
    }

常見執行緒池型別:

singletenthreadPool:

SingleThreadExecutor是使用單個worker執行緒的Executor。下面是SingleThreadExecutor的原始碼實現。

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

SingleThreadExecutor的corePoolSize和maximumPoolSize被設定為1。其他引數與FixedThreadPool相同。SingleThreadExecutor使用無界佇列LinkedBlockingQueue作為執行緒池的工作佇列(佇列的容量為Integer.MAX_VALUE)。SingleThreadExecutor使用無界佇列作為工作佇列對執行緒池帶來的影響與FixedThreadPool相同,這裡就不贅述了。

  1. 如果當前執行的執行緒數少於corePoolSize(即執行緒池中無執行的執行緒),則建立一個新執行緒來執行任務。
  2. 線上程池完成預熱之後(當前執行緒池中有一個執行的執行緒),將任務加入LinkedBlockingQueue。
  3. 執行緒執行完1中的任務後,會在一個無限迴圈中反覆從LinkedBlockingQueue獲取任務來執行。

fixedthreadpool:

package java.util.concurrent;
public class Executors {
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
}

FixedThreadPool的corePoolSize和maximumPoolSize都被設定為建立FixedThreadPool時指定的引數nThreads。

當執行緒池中的執行緒數大於corePoolSize時,keepAliveTime為多餘的空閒執行緒等待新任務的最長時間,超過這個時間後多餘的執行緒將被終止。這裡把keepAliveTime設定為0L,意味著多餘的空閒執行緒會被立即終止。
FixedThreadPool的execute()方法的執行示意圖如下所示。

對上圖的說明如下。

  1. 如果當前執行得執行緒數少於corePoolSize,則建立執行緒來執行任務。
  2. 線上程池完成預熱之後(當前執行的執行緒數等於corePoolSize),將任務加入LinkedBlockingQueue。
  3. 執行緒執行完1中的任務後,會在迴圈中反覆從LinkedBlockingQueue獲取任務來執行。

FixedThreadPool使用無界佇列LinkedBlockingQueue作為執行緒池的工作佇列(佇列的容量為Integer.MAX_VALUE)。使用無界佇列作為工作佇列會對執行緒池帶來如下影響。

  1. 當執行緒池中的執行緒數達到corePoolSize後,新任務將在無界佇列中等待,因此執行緒池中的執行緒數不會超過corePoolSize。
  2. 由於1,使用無界佇列時maximumPoolSize將是一個無效引數。
  3. 由於1和2,使用無界佇列時keepAliveTime將是一個無效引數。
  4. 由於使用無界佇列,執行中的FixedThreadPool(未執行方法shutdown()或shutdownNow())不會拒絕任務(不會呼叫RejectedExecutionHandler.rejectedExecution方法)。

cachethreadpool:

CacheThreadPool是一個會根據需要建立新執行緒的執行緒池。下面是建立CacheThreadPool的原始碼。


    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

CacheThreadPool的corePoolSize被設定為0,即corePool為空;maximumPoolSize被設定為Integer.MAX_VALUE,即maximumPool是無界的。這裡把keepAliveTime設定為60L,意味著CacheThreadPool中的空閒執行緒等待新任務的最長時間為60秒,空閒執行緒超過60秒後將會被終止。

FixedThreadPool和SingleThreadExecutor使用無界佇列LinkedBlockingQueue作為執行緒池的工作佇列。CacheThreadPool使用沒有容量的SynchronousQueue作為執行緒池的工作佇列,但CacheThreadPool的maximumPool是無界的。這意味著,如果主執行緒提交任務的速度高於maximumPool中執行緒處理任務的速度時,CacheThreadPool會不斷建立新執行緒。極端情況下,CacheThreadPool會因為建立過多執行緒而耗盡CPU和記憶體資源。

對上圖的說明如下。

  1. 首先執行SynchronousQueue.offer(Runnable task)。如果當前maximumPool中有空閒執行緒正在執行SynchronousQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),那麼主執行緒執行offer操作與空閒執行緒執行的poll操作配對成功,主執行緒把任務交給空閒執行緒執行,execute()方法執行完成;否則執行下面的步驟2。
  2. 當初始maximumPool為空,或者maximumPool中當前沒有空閒執行緒時,將沒有執行緒執行SynchronousQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)。這種情況下,步驟1將失敗。此時CachedThreadPool會建立一個新執行緒執行任務,execute()方法執行完成。
  3. 在步驟2中新建立的執行緒將任務執行完後,會執行SynchronousQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)。這個poll操作會讓空閒執行緒最多在SynchronousQueue中等待60秒鐘。如果60秒鐘內主執行緒提交了一個新任務(主執行緒執行步驟1),那麼這個空閒執行緒將執行主執行緒提交的新任務;否則,這個空閒執行緒將終止。由於空閒60秒的空閒執行緒會被終止,因此長時間保持空閒的CachedThreadPool不會使用任務資源。

前面提到過,SynchronousQueue是一個沒有容量的阻塞佇列。每個插入操作必須等待另一個執行緒的對應移除操作,反之亦然。CachedThreadPool使用SynchronousQueue,把主執行緒提交的任務傳遞給空閒執行緒執行。CachedThreadPool中任務傳遞的示意圖如下所示。

ScheduledThreadPool

執行定時任務的執行緒池

建立執行緒池的四種方式

這四種方式,都實現了RejectedExecutionHandler介面

Abortpolicy 

會丟擲異常,導致當前執行緒退出

當我們建立執行緒池時,不指定rejectedExecutionHandler時,就會預設使用AbortPolicy,當我們通過executor.execute(runnable)任務時,可能會發生異常,並將異常直接返回給了呼叫者。

 

       public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }

CallerRunsPolicy

當執行緒池的存活執行緒數,達到了最大值,此時又有新的請求過來,執行緒池會呼叫rejectedExecutionHandler這個介面的實現類的rejectedExecution的方法,此時該實現類正好是CallerRunsPolicy,它會讓新請求,在自己的執行緒上執行run方法,如果run方法消耗時間長,它會阻塞web容器的請求,影響web容器處理其他請求的效能。

當有外部請求訪問web服務端時,tomcat會分配一條執行緒(tomcat預設有150個執行緒,可以配置最大的為1500個執行緒來接收處理請求,且這些執行緒之間具有隔離性不會互相影響對方)來處理這個請求,當這個請求要用到執行緒池,且我們的執行緒池是基於CallerRunsPolicy來建立的,那麼CallerRunsPolicy會,使用當前請求的執行緒,來執行run方法。而當這個run方法執行時間過長時,tomcat的請求就會被佔用不放,導致無法拿出空閒的執行緒去處理其他請求,就會影響到服務端的效能。

應用場景:當我們希望執行緒池滿了之後,進行阻塞,就使用CallerRunsPolicy,阻塞的是呼叫方的,不會往queue裡放任務了。

 

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }

看上面的rejectedExecution方法體,很有意思,它執行執行緒的方式,是r.run()而不是start()方法,這很耐人尋味,原因有兩個

我們在main方法中,準備啟動一個執行緒時,如果在程式碼中我們使用thread.star()方法,jvm在執行到這行時,實際上會建立一個新的執行緒,來執行執行緒物件中的run方法,此時在執行run方法的執行緒,與執行main方法的執行緒,是兩條執行緒,沒有關聯。而上面呼叫了runnable介面例項的run方法,jvm在執行時,根本不會建立新執行緒去執行,而是就在當前的請求(執行緒)裡之心run方法,此時的run方法,根本不需要開闢或分配新執行緒來執行,而是當做一個普通方法來執行了。所以此時run方法卡住了,他就會卡住當前的請求,就會卡住web容器的請求。影響web容器處理其他請求的效能。

DiscardOldestPolicy  在我的佇列裡面,踢出出最老的一個任務

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }

DiscardPolicy

不做任何處理

ThreadPool的三個階段

Workers容器

0<active<coresize 

當一個task準備分配給workers容器,希望呼叫一個執行緒去執行它時,如果此時容器中存活的執行緒數小於coresize指定執行緒數時,會一次性建立一條新執行緒來執行任務,而且新執行緒也會駐留在記憶體中。而當執行緒執行完任務,並不會收回,而是變成等待狀態了。

問題:什麼時候出現activesize會超過coresize?

當coreSize向maxsize變遷的時候,不是由workers決定的,而是由queue決定的。queue裡面的task數量達到最大值的時候,coreSize就會向maxsize變遷了。我們在建立執行緒池的時候。執行緒池的構造方法會有一個BlockingQueue<Runnable> workQueue,然後我們初始化執行緒池時會指定這個queue的size,那麼呼叫者一邊往queue裡裝task,task也會一邊分配給workers去執行。只有當queue裡面的任務數,size達到了設定的最大size時,wokers才會去建立更多的執行緒,來處理任務,建立新執行緒的數量,不能超過maxsize。

core<active<maxsize

條件:任務queue滿了,會新建立執行緒去處理任務

active == maxsize

跟rejectHandlerPolicy有關係,配置了CallerRunsPolicy就會阻塞請求方,拒絕接受任務;配置了abortPolicy就會返回異常,意思是執行緒數已經創夠了,不能繼續建立了;配置了discardOldPolicy就會刪除最老任務,配置了discardPolicy就什麼都不做。

 

 

本文章參考了:https://blog.csdn.net/en_joker/article/details/84973420   《併發:Thread