1. 程式人生 > >Java執行緒池ThreadPoolExecutor詳解

Java執行緒池ThreadPoolExecutor詳解

 

1、執行緒池的工作原理?

 

  1. 執行緒池剛建立時,裡面沒有一個執行緒。任務佇列是作為引數傳進來的。不過,就算佇列裡面有任務,執行緒池也不會馬上執行它們。
  2. 當呼叫 execute() 方法新增一個任務時,執行緒池會做如下判斷:

    • 如果正在執行的執行緒數量小於 corePoolSize,那麼馬上建立執行緒執行這個任務;
    • 如果正在執行的執行緒數量大於或等於 corePoolSize,那麼將這個任務放入佇列。
    • 如果這時候佇列滿了,而且正在執行的執行緒數量小於 maximumPoolSize,那麼還是要建立執行緒執行這個任務;
    • 如果佇列滿了,而且正在執行的執行緒數量大於或等於 maximumPoolSize,那麼執行緒池會丟擲異常,告訴呼叫者“我不能再接受任務了”。

3,當一個執行緒完成任務時,它會從佇列中取下一個任務來執行。 
4,當一個執行緒無事可做,超過一定的時間(keepAliveTime)時,執行緒池會判斷,如果當前執行的執行緒數大於 corePoolSize,那麼這個執行緒就被停掉。所以執行緒池的所有任務完成後,它最終會收縮到 corePoolSize 的大小。

這樣的過程說明,並不是先加入任務就一定會先執行。假設佇列大小為 10,corePoolSize 為 3,maximumPoolSize 為 6,那麼當加入 20 個任務時,執行的順序就是這樣的:首先執行任務 1、2、3,然後任務 4~13 被放入佇列。這時候佇列滿了,任務 14、15、16 會被馬上執行,而任務 17~20 則會丟擲異常。最終順序是:1、2、3、14、15、16、4、5、6、7、8、9、10、11、12、13。

2、執行緒池有哪些配置項?

執行緒池可以使用java.util.concurrent.ThreadPoolExecutor來建立,在該類中包含最全引數的建構函式如下:

相應的入參就是執行緒池可以配置的引數:

 /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  •  

corePoolSize :核心池的大小,如果呼叫了prestartAllCoreThreads()或者prestartCoreThread()方法,會直接預先建立corePoolSize的執行緒,否則當有任務來之後,就會建立一個執行緒去執行任務,當執行緒池中的執行緒數目達到corePoolSize後,就會把到達的任務放到快取隊列當中;這樣做的好處是,如果任務量很小,那麼甚至就不需要快取任務,corePoolSize的執行緒就可以應對;

maximumPoolSize:執行緒池最大執行緒數,表示線上程池中最多能建立多少個執行緒,如果執行中的執行緒超過了這個數字,那麼相當於執行緒池已滿,新來的任務會使用RejectedExecutionHandler 進行處理;

keepAliveTime:表示執行緒沒有任務執行時最多保持多久時間會終止,然後執行緒池的數目維持在corePoolSize 大小;

unit:引數keepAliveTime的時間單位;

workQueue:一個阻塞佇列,用來儲存等待執行的任務,如果當前對執行緒的需求超過了corePoolSize大小,才會放在這裡;

threadFactory:執行緒工廠,主要用來建立執行緒,比如可以指定執行緒的名字;

handler:如果執行緒池已滿,新的任務的處理方式

3、執行緒池的阻塞佇列包含哪幾種選擇?

如果執行緒數超過了corePoolSize,則開始把執行緒先放到阻塞佇列裡,相當於生產者消費者的一個數據通道,有以下一些阻塞佇列可供選擇:

  1. ArrayBlockingQueue
  2. DelayQueue
  3. LinkedBlockingQueue
  4. PriorityBlockingQueue
  5. SynchronousQueue

ArrayBlockingQueue是一個有邊界的阻塞佇列,它的內部實現是一個數組。有邊界的意思是它的容量是有限的,我們必須在其初始化的時候指定它的容量大小,容量大小一旦指定就不可改變。

DelayQueue阻塞的是其內部元素,DelayQueue中的元素必須實現 java.util.concurrent.Delayed介面,該介面只有一個方法就是long getDelay(TimeUnit unit),返回值就是佇列元素被釋放前的保持時間,如果返回0或者一個負值,就意味著該元素已經到期需要被釋放,此時DelayedQueue會通過其take()方法釋放此物件,DelayQueue可應用於定時關閉連線、快取物件,超時處理等各種場景;

LinkedBlockingQueue阻塞佇列大小的配置是可選的,如果我們初始化時指定一個大小,它就是有邊界的,如果不指定,它就是無邊界的。說是無邊界,其實是採用了預設大小為Integer.MAX_VALUE的容量 。它的內部實現是一個連結串列。

PriorityBlockingQueue是一個沒有邊界的佇列,它的排序規則和 java.util.PriorityQueue一樣。需要注意,PriorityBlockingQueue中允許插入null物件。所有插入PriorityBlockingQueue的物件必須實現 java.lang.Comparable介面,佇列優先順序的排序規則就是按照我們對這個介面的實現來定義的。

SynchronousQueue佇列內部僅允許容納一個元素。當一個執行緒插入一個元素後會被阻塞,除非這個元素被另一個執行緒消費。

使用的最多的應該是LinkedBlockingQueue,注意一般情況下要配置一下佇列大小,設定成有界佇列,否則JVM記憶體會被撐爆!

4、如果執行緒池已經滿了可是還有新的任務提交怎麼辦?

執行緒池已滿的定義,是指執行執行緒數==maximumPoolSize,並且workQueue是有界佇列並且已滿(如果是無界隊列當然永遠不會滿);

這時候再提交任務怎麼辦呢?執行緒池會將任務傳遞給最後一個引數RejectedExecutionHandler來處理,比如列印報錯日誌、丟擲異常、儲存到Mysql/redis用於後續處理等等,執行緒池預設也提供了幾種處理方式見第5條目;

5、有哪些飽和策略可以使用?

飽和策略指的就是執行緒池已滿情況下任務的處理策略,預設有以下幾種:

在預設的 ThreadPoolExecutor.AbortPolicy 中,處理程式遭到拒絕將丟擲執行時RejectedExecutionException。 
在 ThreadPoolExecutor.CallerRunsPolicy 中,執行緒呼叫執行該任務的execute 本身。此策略提供簡單的反饋控制機制,能夠減緩新任務的提交速度。 
在 ThreadPoolExecutor.DiscardPolicy 中,不能執行的任務將被刪除。 
在 ThreadPoolExecutor.DiscardOldestPolicy 中,如果執行程式尚未關閉,則位於工作佇列頭部的任務將被刪除,然後重試執行程式(如果再次失敗,則重複此過程) 
當然也可以自己實現處理策略類,繼承RejectedExecutionHandler介面即可,該介面只有一個方法: 
void rejectedExecution(Runnable r, ThreadPoolExecutor executor); 
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);

6、怎樣優化執行緒池的配置?

如何合理配置執行緒池大小,僅供參考。

一般需要根據任務的型別來配置執行緒池大小:

如果是CPU密集型任務,就需要儘量壓榨CPU,參考值可以設為 NCPU+1

如果是IO密集型任務,參考值可以設定為2*NCPU

當然,這只是一個參考值,具體的設定還需要根據實際情況進行調整,比如可以先將執行緒池大小設定為參考值,

再觀察任務執行情況和系統負載、資源利用率來進行適當調整。

其中NCPU的指的是CPU的核心數,可以使用Runtime.getRuntime().availableProcessors()來獲取;

參考文章:

java多執行緒詳解(7)-執行緒池的使用 http://www.cnblogs.com/weiguo21/p/4813678.html 
ThreadPoolExecutor中策略的選擇與工作佇列的選擇(java執行緒池) http://blog.csdn.net/longeremmy/article/details/8231184 
【Java併發之】BlockingQueue http://blog.csdn.net/suifeng3051/article/details/48807423 

7. ThreadPoolExecutor 構造方法

/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  •  

引數: 
corePoolSize 核心執行緒池大小

maximumPoolSize 最大執行緒池大小

keepAliveTime 執行緒池中超過corePoolSize數目的空閒執行緒最大存活時間;可以 
allowCoreThreadTimeOut(true)使得核心執行緒有效時間

unit 時間單位

workQueue 儲存任務的阻塞佇列

threadFactory 建立執行緒的工廠

handler 當提交任務數超過maxmumPoolSize+workQueue之和時,任務會交給RejectedExecutionHandler來處理

1.當執行緒池小於corePoolSize時,新提交任務將建立一個新執行緒執行任務,即使此時執行緒池中存在空閒執行緒。

2.當執行緒池達到corePoolSize時,新提交任務將被放入workQueue中,等待執行緒池中任務排程執行

3.當workQueue已滿,且maximumPoolSize>corePoolSize時,新提交任務會建立新執行緒執行任務

4.當提交任務數超過maximumPoolSize時,新提交任務由RejectedExecutionHandler處理

5.當執行緒池中超過corePoolSize執行緒,空閒時間達到keepAliveTime時,關閉空閒執行緒

6.當設定allowCoreThreadTimeOut(true)時,執行緒池中corePoolSize執行緒空閒時間達到keepAliveTime也將關閉

 

這裡寫圖片描述

 

8. ThreadPoolExecutor 預設有四個拒絕策略:

1、ThreadPoolExecutor.AbortPolicy 直接丟擲異常RejectedExecutionException

/**
     * A handler for rejected tasks that throws a
     * {@code RejectedExecutionException}.
     */
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
  •  

2、ThreadPoolExecutor.CallerRunsPolicy 直接呼叫run方法並且阻塞執行

/**
     * A handler for rejected tasks that runs the rejected task
     * directly in the calling thread of the {@code execute} method,
     * unless the executor has been shut down, in which case the task
     * is discarded.
     */
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }
  •  

3、ThreadPoolExecutor.DiscardPolicy 直接丟棄後來的任務

/**
     * A handler for rejected tasks that silently discards the
     * rejected task.
     */
    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }

        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
  •  

4、ThreadPoolExecutor.DiscardOldestPolicy 丟棄在佇列中隊首的任務

 /**
     * A handler for rejected tasks that discards the oldest unhandled
     * request and then retries {@code execute}, unless the executor
     * is shut down, in which case the task is discarded.
     */
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }

        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }
  •  

3. Executors提供的執行緒池配置方案

/**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * {@code nThreads} threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
  •  

構造一個固定執行緒數目的執行緒池,配置的corePoolSize與maximumPoolSize大小相同,同時使用了一個無界LinkedBlockingQueue存放阻塞任務,因此多餘的任務將存在再阻塞佇列,不會由RejectedExecutionHandler處理

/**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available.  These pools will typically improve the performance
     * of programs that execute many short-lived asynchronous tasks.
     * Calls to {@code execute} will reuse previously constructed
     * threads if available. If no existing thread is available, a new
     * thread will be created and added to the pool. Threads that have
     * not been used for sixty seconds are terminated and removed from
     * the cache. Thus, a pool that remains idle for long enough will
     * not consume any resources. Note that pools with similar
     * properties but different details (for example, timeout parameters)
     * may be created using {@link ThreadPoolExecutor} constructors.
     *
     * @return the newly created thread pool
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
  •  

構造一個緩衝功能的執行緒池,配置corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,keepAliveTime=60s,以及一個無容量的阻塞佇列 SynchronousQueue,因此任務提交之後,將會建立新的執行緒執行;執行緒空閒超過60s將會銷燬

 /**
     * Creates an Executor that uses a single worker thread operating
     * off an unbounded queue. (Note however that if this single
     * thread terminates due to a failure during execution prior to
     * shutdown, a new one will take its place if needed to execute
     * subsequent tasks.)  Tasks are guaranteed to execute
     * sequentially, and no more than one task will be active at any
     * given time. Unlike the otherwise equivalent
     * {@code newFixedThreadPool(1)} the returned executor is
     * guaranteed not to be reconfigurable to use additional threads.
     *
     * @return the newly created single-threaded Executor
     */
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
  •  

構造一個只支援一個執行緒的執行緒池,配置corePoolSize=maximumPoolSize=1,無界阻塞佇列LinkedBlockingQueue;保證任務由一個執行緒序列執行

/**
     * Creates a thread pool that can schedule commands to run after a
     * given delay, or to execute periodically.
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle
     * @return a newly created scheduled thread pool
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
  •  

構造有定時功能的執行緒池,配置corePoolSize,無界延遲阻塞佇列DelayedWorkQueue;有意思的是:maximumPoolSize=Integer.MAX_VALUE,由於DelayedWorkQueue是無界佇列,所以這個值是沒有意義的