1. 程式人生 > >java執行緒池ThreadPoolExecutor和阻塞佇列BlockingQueue,Executor, ExecutorService

java執行緒池ThreadPoolExecutor和阻塞佇列BlockingQueue,Executor, ExecutorService

ThreadPoolExecutor

引數最全的建構函式

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

解釋: corePoolSize 核心執行緒池大小, maximumPoolSize 大小,BlockingQueue 阻塞佇列,多餘的任務會放在這, ThreadFactory 生產執行緒池的執行緒, RejectedExecutionHandler 拒絕策略

  1. 執行緒池的工作流程, 當執行緒數小於corePoolSize時,每有新任務要做,則新建立一個執行緒, 當執行緒數達到corePoolSize時,就把任務放到workQueue中,如果人物佇列滿了,就再增加執行緒數,直到達到maximumPoolSize。
  2. 執行緒池的原理就是,建立執行緒後,這個執行緒不會因為執行玩任務而被關閉回收。 那麼具體的原理就是,執行緒池中的執行緒執行的是一個while迴圈,每次從任務佇列中取出任務,然後執行任務。核心程式碼: while (task != null || (task = getTask()) != null)
    的條件為真,第一次判斷時task為firstTask,即執行的第一個任務,那麼要點就成了getTask()必須不能為空

getTask()中有個核心邏輯是:

Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();

這裡的workQueue就是阻塞佇列,timed表示是否會超時釋放,keepAliveTime是非核心執行緒允許的空閒時間;如果不超時,則呼叫BlockingQueue.take(),如果取不到值,就會一直阻塞直到程式提交了一個任務。所以,阻塞佇列的作用是控制執行緒池中執行緒的生命週期。 所以阻塞佇列保證了,執行緒池中的執行緒不會被回收,達到執行緒複用的目的。

  1. 為什麼要使用拒絕策略? 有了阻塞佇列後,為什麼還會用拒絕策略呢,拒絕策略的場景就是就是執行緒數最大了,任務佇列也滿了,這個時候就需要拒絕策略了。 阻塞佇列滿了,就會阻塞了,為什麼需要用拒絕策略呢?

因為新增任務呼叫的並不是阻塞的put()方法,而是非阻塞的offer()方法。 所以需要用拒絕策略。

所以執行緒池具有是生產者不阻塞,而消費者阻塞的特點。

BlockingQueue

  • 有界佇列: ArrayBlockingQueue

  • 無界佇列: LinkedBlockingQueue Executors.newFixedThreadPool() 採用的阻塞佇列,因為是無界的,某些情況下會導致佇列容量擴大,記憶體飆升 同步移交佇列:

  • SynchronousQueue 不儲存元素,只是移交

  • 優先順序佇列: PriorityBlockingQueue 具有優先順序

  • 延遲佇列 DelayedWorkQueue

4種不同的拒絕策略

4種策略都做為靜態內部類在ThreadPoolExcutor中進行實現。

  • AbortPolicy中止策略 該策略是預設飽和策略。
     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                    throw new RejectedExecutionException("Task " + r.toString() +
                                                         " rejected from " +
                                                         e.toString());
         } 

使用該策略時在飽和時會丟擲RejectedExecutionException(繼承自RuntimeException),呼叫者可捕獲該異常自行處理。

  • DiscardPolicy拋棄策略
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }

如程式碼所示,不做任何處理直接拋棄任務

  • DiscardOldestPolicy拋棄舊任務策略

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                  if (!e.isShutdown()) {
                      e.getQueue().poll();
                      e.execute(r);
                  }
      } 
    

如程式碼,先將阻塞佇列中的頭元素出隊拋棄,再嘗試提交任務。如果此時阻塞佇列使用PriorityBlockingQueue優先順序佇列,將會導致優先順序最高的任務被拋棄,因此不建議將該種策略配合優先順序佇列使用。

  • CallerRunsPolicy呼叫者執行

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } }

既不拋棄任務也不丟擲異常,直接執行任務的run方法,換言之將任務回退給呼叫者來直接執行。使用該策略時執行緒池飽和後將由呼叫執行緒池的主執行緒自己來執行任務,因此在執行任務的這段時間裡主執行緒無法再提交新任務,從而使執行緒池中工作執行緒有時間將正在處理的任務處理完成。

java 自帶的4中執行緒池

都在Executors下面

  • newFixedThreadPool
 public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
  • newCachedThreadPool
 public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }
  • newScheduledThreadPool 使用的DelayedWorkQueue
public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }
//
public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }

  • newSingleThreadExecutor 也就是核心容量為1且最大容量為1的FixedThreadPool
public static ExecutorService newSingleThreadExecutor() {
       return new FinalizableDelegatedExecutorService
           (new ThreadPoolExecutor(1, 1,
                                   0L, TimeUnit.MILLISECONDS,
                                   new LinkedBlockingQueue<Runnable>()));
   }

Executor, ExecutorService, Executors的區別聯絡

下面列出了 Executor 和 ExecutorService 的區別:

Executor | ExecutorService Executor 是 Java 執行緒池的核心介面,用來併發執行提交的任務 ExecutorService 是 Executor 介面的擴充套件,提供了非同步執行和關閉執行緒池的方法 Executor提供execute()方法用來提交任務 ExecutorService提供submit()方法用來提交任務 Executor execute()方法無返回值 ExecutorService submit()方法返回Future物件,可用來獲取任務執行結果 Executor 不能取消任務 ExecutorService 可以通過Future.cancel()取消pending中的任務 Executor沒有提供和關閉執行緒池有關的方法 ExecutorService 提供了關閉執行緒池的方法

而Executors是一個類,工具類。Executors和Executor類似於Collections和Collection的關係。