1. 程式人生 > >執行緒池工廠類Executors程式設計的藝術

執行緒池工廠類Executors程式設計的藝術

  Executors是一個執行緒池的工廠類,提供各種有用的執行緒池的建立,使用得當,將會使我們併發程式設計變得簡單!今天就來聊聊這個工廠類的藝術吧!

  Executors只是Executor框架的主要成員元件之一,為java的非同步任務排程執行提供了重要的入口!

  在說Executors之前,還需要說一下另一個Executor框架的重要成員,ThreadPoolExecutor。

  ThreadPoolExecutor 實現了ExecutorService介面,提供了執行緒池的排程功能!成為純種池技術的基石!

  ThreadPoolExecutor 的主要建構函式如下:

    /**
* Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool *
@param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {
@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }

  咱們此處不是要去分析其原始碼,只是想看一下怎樣使用他!

  從構造方法可以看出 ThreadPoolExecutor 的主要引數 7 個,在其註釋上也有說明功能,咱們翻譯下每個引數的功能:

    corePoolSize: 執行緒池核心執行緒數(平時保留的執行緒數),使用時機: 在初始時刻,每次請求進來都會建立一個執行緒直到達到該size
    maximumPoolSize: 執行緒池最大執行緒數,使用時機: 當workQueue都放不下時,啟動新執行緒,直到最大執行緒數,此時到達執行緒池的極限
    keepAliveTime/unit: 超出corePoolSize數量的執行緒的保留時間,unit為時間單位
    workQueue: 阻塞佇列,當核心執行緒數達到或者超出後,會先嚐試將任務放入該佇列由各執行緒自行消費;  
        ArrayBlockingQueue: 建構函式一定要傳大小
        LinkedBlockingQueue: 建構函式不傳大小會預設為65536(Integer.MAX_VALUE ),當大量請求任務時,容易造成 記憶體耗盡。
        SynchronousQueue: 同步佇列,一個沒有儲存空間的阻塞佇列 ,將任務同步交付給工作執行緒。
        PriorityBlockingQueue: 優先佇列
    threadFactory:執行緒工廠,用於執行緒需要建立時,呼叫其newThread()生產新執行緒使用
    handler: 飽和策略,當佇列已放不下任務,且建立的執行緒已達到 maximum 後,則不能再處理任務,直接將任務交給飽和策略
        AbortPolicy: 直接拋棄(預設)
        CallerRunsPolicy: 用呼叫者的執行緒執行任務
        DiscardOldestPolicy: 拋棄佇列中最久的任務
        DiscardPolicy: 拋棄當前任務

  所以,雖然引數有點多,但是其實仔細閱讀以上的註釋,基本就能很好地使用執行緒池了,不過咱們還可以通過一個流程圖來說明這一切,對於喜歡看圖說話的同學來說是件好事!

 

  好了,ThreadPoolExecutor 介紹完後,什麼感覺呢? 說是簡單,其實還是挺麻煩的,所以,是 Executors 工廠出場了!

   作為一個工廠類,Executors 簡化了各種引數,只忘文生義即可明白其意思!Executors 主要提供4種類型的執行緒池!

1. FixedThreadPool 建立一個指定數量的執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。

  其實現方式如下:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

  使用固定執行緒數的worker和無界佇列儲存多餘的task,所以建立的執行緒數不會超過corePoolSize,所以maxPoolSize是一個無效引數,所以keepAliveTime是一個無效引數,所以不會呼叫RejectedExecutionHandler.rejectedExecution()方法,即無拒絕策略可用;從而在外部表現來看,就是固定執行緒,在執行無限的佇列!

2. SingleThreadExecutor 建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。

  其實現方式如下:

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

  與 FixedThreadPool 原理類似,不過這裡只會使用一個執行緒在執行。使用一個執行緒可以保證取任務時的順序一致性,從而表現出先到先得的效果!在要求有執行順序要求的場景,剛好派上用場!

3. CachedThreadPool, 建立一個可快取的無限執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。

  其實現方式如下:

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

  使用SyncronousQueue作為執行緒池的工作佇列,這是特殊的佇列,它是個沒容量的阻塞佇列(至於為什麼要用這個特性,據說是效能比 LinkedBlockingQueue 更高,詳解他),每個插入操作必須有一個執行緒對應的移除操作,反之一樣;當提交一個cached任務時,執行SyncronousQueue.offer(x),如果有等待的空閒執行緒,則匹配成功,exec立即返回;如果沒有匹配,則會建立一個新執行緒執行任務;任務執行完後使用poll()等待keep即60秒,一直嘗試從佇列中獲取任務,超時後自動釋放執行緒;

  使用這種執行緒池把握好節奏,因為看起來執行緒池動不動就會建立系列執行緒池,而且動不動就會釋放執行緒,完全是不可控的,不可監控的。

 4. ScheduledThreadPool 建立一個定長執行緒池,支援定時及週期性任務執行。

  其實現如下:

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

  看起來,這個在引數方面沒有太多的解說,只是依賴於 ScheduledThreadPoolExecutor 的實現了!所以,我們來看一下 ScheduledThreadPoolExecutor是怎麼實現的?

    public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor    {}
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

  ok, 如上兩個,就夠了!ScheduledThreadPoolExecutor 也是繼承了 ThreadPoolExecutor, 其構造方法也是依賴於父類,只是將 佇列實現改為 DelayedWorkQueue,從而實現延時執行或者定期執行的效果!

  而相比於 Timer 來說,ScheduledThreadPool會有更好的執行效果,因為Timer只會有一個執行緒來執行任務,會有侷限性。而且 ScheduledThreadPool 提供其他很多的可操作方法!

  Executors 的出現,使執行緒池的使用難度大大降低,使開發更方便,在合適的場合使用相應的工廠方法,定能讓開發事半功倍!