1. 程式人生 > >Java併發程式設計:執行緒池的使用(轉載)

Java併發程式設計:執行緒池的使用(轉載)

轉載自:https://www.cnblogs.com/dolphin0520/p/3932921.html

Java併發程式設計:執行緒池的使用

  在前面的文章中,我們使用執行緒的時候就去建立一個執行緒,這樣實現起來非常簡便,但是就會有一個問題:

  如果併發的執行緒數量很多,並且每個執行緒都是執行一個時間很短的任務就結束了,這樣頻繁建立執行緒就會大大降低系統的效率,因為頻繁建立執行緒和銷燬執行緒需要時間。

  那麼有沒有一種辦法使得執行緒可以複用,就是執行完一個任務,並不被銷燬,而是可以繼續執行其他的任務?

  在Java中可以通過執行緒池來達到這樣的效果。今天我們就來詳細講解一下Java的執行緒池,首先我們從最核心的ThreadPoolExecutor類中的方法講起,然後再講述它的實現原理,接著給出了它的使用示例,最後討論了一下如何合理配置執行緒池的大小。

  以下是本文的目錄大綱:

  一.Java中的ThreadPoolExecutor類

  二.深入剖析執行緒池實現原理

  三.使用示例

  四.如何合理配置執行緒池的大小 

  若有不正之處請多多諒解,並歡迎批評指正。

  請尊重作者勞動成果,轉載請標明原文連結:

  http://www.cnblogs.com/dolphin0520/p/3932921.html

 

一.Java中的ThreadPoolExecutor類

  java.uitl.concurrent.ThreadPoolExecutor類是執行緒池中最核心的一個類,因此如果要透徹地瞭解Java中的執行緒池,必須先了解這個類。下面我們來看一下ThreadPoolExecutor類的具體實現原始碼。

  在ThreadPoolExecutor類中提供了四個構造方法:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public  class  ThreadPoolExecutor  extends  AbstractExecutorService {      .....      public  ThreadPoolExecutor( int  corePoolSize, int  maximumPoolSize, long  keepAliveTime,TimeUnit unit,              BlockingQueue<Runnable> workQueue);        public  ThreadPoolExecutor( int  corePoolSize, int  maximumPoolSize, long  keepAliveTime,TimeUnit unit,              BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);        public  ThreadPoolExecutor( int  corePoolSize, int  maximumPoolSize, long  keepAliveTime,TimeUnit unit,              BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);        public  ThreadPoolExecutor( int  corePoolSize, int  maximumPoolSize, long  keepAliveTime,TimeUnit unit,          BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);      ... }

   從上面的程式碼可以得知,ThreadPoolExecutor繼承了AbstractExecutorService類,並提供了四個構造器,事實上,通過觀察每個構造器的原始碼具體實現,發現前面三個構造器都是呼叫的第四個構造器進行的初始化工作。

   下面解釋下一下構造器中各個引數的含義:

  • corePoolSize:核心池的大小,這個引數跟後面講述的執行緒池的實現原理有非常大的關係。在建立了執行緒池後,預設情況下,執行緒池中並沒有任何執行緒,而是等待有任務到來才建立執行緒去執行任務,除非呼叫了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預建立執行緒的意思,即在沒有任務到來之前就建立corePoolSize個執行緒或者一個執行緒。預設情況下,在建立了執行緒池後,執行緒池中的執行緒數為0,當有任務來之後,就會建立一個執行緒去執行任務,當執行緒池中的執行緒數目達到corePoolSize後,就會把到達的任務放到快取隊列當中;
  • maximumPoolSize:執行緒池最大執行緒數,這個引數也是一個非常重要的引數,它表示線上程池中最多能建立多少個執行緒;
  • keepAliveTime:表示執行緒沒有任務執行時最多保持多久時間會終止。預設情況下,只有當執行緒池中的執行緒數大於corePoolSize時,keepAliveTime才會起作用,直到執行緒池中的執行緒數不大於corePoolSize,即當執行緒池中的執行緒數大於corePoolSize時,如果一個執行緒空閒的時間達到keepAliveTime,則會終止,直到執行緒池中的執行緒數不超過corePoolSize。但是如果呼叫了allowCoreThreadTimeOut(boolean)方法,線上程池中的執行緒數不大於corePoolSize時,keepAliveTime引數也會起作用,直到執行緒池中的執行緒數為0;
  • unit:引數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性:
複製程式碼
TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小時
TimeUnit.MINUTES;           //分鐘
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //納秒
複製程式碼
  • workQueue:一個阻塞佇列,用來儲存等待執行的任務,這個引數的選擇也很重要,會對執行緒池的執行過程產生重大影響,一般來說,這裡的阻塞佇列有以下幾種選擇:
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;

  ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。執行緒池的排隊策略與BlockingQueue有關。

  • threadFactory:執行緒工廠,主要用來建立執行緒;
  • handler:表示當拒絕處理任務時的策略,有以下四種取值:
ThreadPoolExecutor.AbortPolicy:丟棄任務並丟擲RejectedExecutionException異常。 
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不丟擲異常。 
ThreadPoolExecutor.DiscardOldestPolicy:丟棄佇列最前面的任務,然後重新嘗試執行任務(重複此過程)
ThreadPoolExecutor.CallerRunsPolicy:由呼叫執行緒處理該任務 

   具體引數的配置與執行緒池的關係將在下一節講述。

  從上面給出的ThreadPoolExecutor類的程式碼可以知道,ThreadPoolExecutor繼承了AbstractExecutorService,我們來看一下AbstractExecutorService的實現:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public  abstract  class  AbstractExecutorService  implements  ExecutorService {              protected  <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };      protected  <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };      public  Future<?> submit(Runnable task) {};      public  <T> Future<T> submit(Runnable task, T result) { };      public  <T> Future<T> submit(Callable<T> task) { };      private  <T> T doInvokeAny(Collection<?  extends  Callable<T>> tasks,                              boolean  timed,  long  nanos)          throws  InterruptedException, ExecutionException, TimeoutException {      };      public  <T> T invokeAny(Collection<?  extends  Callable<T>> tasks)          throws  InterruptedException, ExecutionException {      };      public  <T> T invokeAny(Collection<?  extends  Callable<T>> tasks,                             long  timeout, TimeUnit unit)          throws  InterruptedException, ExecutionException, TimeoutException {      };      public  <T> List<Future<T>> invokeAll(Collection<?  extends  Callable<T>> tasks)          throws  InterruptedException {      };      public  <T> List<Future<T>> invokeAll(Collection<?  extends  Callable<T>> tasks,                                           long  timeout, TimeUnit unit)          throws  InterruptedException {      }; }

   AbstractExecutorService是一個抽象類,它實現了ExecutorService介面。

  我們接著看ExecutorService介面的實現:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public  interface  ExecutorService  extends  Executor {        void  shutdown();      boolean  isShutdown();      boolean  isTerminated();      boolean  awaitTermination( long  timeout, TimeUnit unit)          throws  InterruptedException;      <T> Future<T> submit(Callable<T> task);      <T> Future<T> submit(Runnable task, T result);      Future<?> submit(Runnable task);      <T> List<Future<T>> invokeAll(Collection<?  extends  Callable<T>> tasks)          throws  InterruptedException;      <T> List<Future<T>> invokeAll(Collection<?  extends  Callable<T>> tasks,                                    long  timeout, TimeUnit unit)          throws  InterruptedException;        <T> T invokeAny(Collection<?  extends  Callable<T>> tasks)          throws  InterruptedException, ExecutionException;      <T> T invokeAny(Collection<?  extends  Callable<T>> tasks,                      long  timeout, TimeUnit unit)          throws  InterruptedException, ExecutionException, TimeoutException; }

   而ExecutorService又是繼承了Executor介面,我們看一下Executor介面的實現:

1 2 3 public  interface  Executor {      void  execute(Runnable command); }

   到這裡,大家應該明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor幾個之間的關係了。

  Executor是一個頂層介面,在它裡面只聲明瞭一個方法execute(Runnable),返回值為void,引數為Runnable型別,從字面意思可以理解,就是用來執行傳進去的任務的;

  然後ExecutorService介面繼承了Executor介面,並聲明瞭一些方法:submit、invokeAll、invokeAny以及shutDown等;

  抽象類AbstractExecutorService實現了ExecutorService介面,基本實現了ExecutorService中宣告的所有方法;

  然後ThreadPoolExecutor繼承了類AbstractExecutorService。

  在ThreadPoolExecutor類中有幾個非常重要的方法:

1 2 3 4 execute() submit() shutdown() shutdownNow()

   execute()方法實際上是Executor中宣告的方法,在ThreadPoolExecutor進行了具體的實現,這個方法是ThreadPoolExecutor的核心方法,通過這個方法可以向執行緒池提交一個任務,交由執行緒池去執行。

  submit()方法是在ExecutorService中宣告的方法,在AbstractExecutorService就已經有了具體的實現,在ThreadPoolExecutor中並沒有對其進行重寫,這個方法也是用來向執行緒池提交任務的,但是它和execute()方法不同,它能夠返回任務執行的結果,去看submit()方法的實現,會發現它實際上還是呼叫的execute()方法,只不過它利用了Future來獲取任務執行結果(Future相關內容將在下一篇講述)。

  shutdown()和shutdownNow()是用來關閉執行緒池的。

  還有很多其他的方法:

  比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等獲取與執行緒池相關屬性的方法,有興趣的朋友可以自行查閱API。

二.深入剖析執行緒池實現原理

  在上一節我們從巨集觀上介紹了ThreadPoolExecutor,下面我們來深入解析一下執行緒池的具體實現原理,將從下面幾個方面講解:

  1.執行緒池狀態

  2.任務的執行

  3.執行緒池中的執行緒初始化

  4.任務快取佇列及排隊策略

  5.任務拒絕策略

  6.執行緒池的關閉

  7.執行緒池容量的動態調整

 

1.執行緒池狀態

  在ThreadPoolExecutor中定義了一個volatile變數,另外定義了幾個static final變量表示執行緒池的各個狀態:

1 2 3 4 5 volatile  int  runState; static  final  int  RUNNING    =  0 ; static  final  int  SHUTDOWN   =  1 ; static  final  int  STOP       =  2 ; static  final  int  TERMINATED =  3 ;

   runState表示當前執行緒池的狀態,它是一個volatile變數用來保證執行緒之間的可見性;

  下面的幾個static final變量表示runState可能的幾個取值。

  當建立執行緒池後,初始時,執行緒池處於RUNNING狀態;

  如果呼叫了shutdown()方法,則執行緒池處於SHUTDOWN狀態,此時執行緒池不能夠接受新的任務,它會等待所有任務執行完畢;

  如果呼叫了shutdownNow()方法,則執行緒池處於STOP狀態,此時執行緒池不能接受新的任務,並且會去嘗試終止正在執行的任務;

  當執行緒池處於SHUTDOWN或STOP狀態,並且所有工作執行緒已經銷燬,任務快取佇列已經清空或執行結束後,執行緒池被設定為TERMINATED狀態。

2.任務的執行

  在瞭解將任務提交給執行緒池到任務執行完畢整個過程之前,我們先來看一下ThreadPoolExecutor類中其他的一些比較重要成員變數:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private  final  BlockingQueue<Runnable> workQueue;               //任務快取佇列,用來存放等待執行的任務 private  final  ReentrantLock mainLock =  new  ReentrantLock();    //執行緒池的主要狀態鎖,對執行緒池狀態(比如執行緒池大小                                                                //、runState等)的改變都要使用這個鎖 private  final  HashSet<Worker> workers =  new  HashSet<Worker>();   //用來存放工作集   private  volatile  long   keepAliveTime;     //執行緒存貨時間    private  volatile  boolean  allowCoreThreadTimeOut;    //是否允許為核心執行緒設定存活時間 private  volatile  int    corePoolSize;      //核心池的大小(即執行緒池中的執行緒數目大於這個引數時,提交的任務會被放進任務快取佇列) private  volatile  int    maximumPoolSize;    //執行緒池最大能容忍的執行緒數   private  volatile  int    poolSize;        //執行緒池中當前的執行緒數   private  volatile  RejectedExecutionHandler handler;  //任務拒絕策略   private  volatile  ThreadFactory threadFactory;    //執行緒工廠,用來建立執行緒   private  int  largestPoolSize;    //用來記錄執行緒池中曾經出現過的最大執行緒數   private  long  completedTaskCount;    //用來記錄已經執行完畢的任務個數

   每個變數的作用都已經標明出來了,這裡要重點解釋一下corePoolSize、maximumPoolSize、largestPoolSize三個變數。

  corePoolSize在很多地方被翻譯成核心池大小,其實我的理解這個就是執行緒池的大小。舉個簡單的例子:

  假如有一個工廠,工廠裡面有10個工人,每個工人同時只能做一件任務。

  因此只要當10個工人中有工人是空閒的,來了任務就分配給空閒的工人做;

  當10個工人都有任務在做時,如果還來了任務,就把任務進行排隊等待;

  如果說新任務數目增長的速度遠遠大於工人做任務的速度,那麼此時工廠主管可能會想補救措施,比如重新招4個臨時工人進來;

  然後就將任務也分配給這4個臨時工人做;

  如果說著14個工人做任務的速度還是不夠,此時工廠主管可能就要考慮不再接收新的任務或者拋棄前面的一些任務了。

  當這14個工人當中有人空閒時,而新任務增長的速度又比較緩慢,工廠主管可能就考慮辭掉4個臨時工了,只保持原來的10個工人,畢竟請額外的工人是要花錢的。

 

  這個例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。

  也就是說corePoolSize就是執行緒池大小,maximumPoolSize在我看來是執行緒池的一種補救措施,即任務量突然過大時的一種補救措施。

  不過為了方便理解,在本文後面還是將corePoolSize翻譯成核心池大小。

  largestPoolSize只是一個用來起記錄作用的變數,用來記錄執行緒池中曾經有過的最大執行緒數目,跟執行緒池的容量沒有任何關係。

 

  下面我們進入正題,看一下任務從提交到最終執行完畢經歷了哪些過程。

  在ThreadPoolExecutor類中,最核心的任務提交方法是execute()方法,雖然通過submit也可以提交任務,但是實際上submit方法裡面最終呼叫的還是execute()方法,所以我們只需要研究execute()方法的實現原理即可:

1 2 3 4 5 6 7 8 9 10 11 12 public  void  execute(Runnable command) {      if  (command ==  null )          throw  new  NullPointerException();      if  (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {          if  (runState == RUNNING && workQueue.offer(command)) {              if  (runState != RUNNING || poolSize ==  0 )                  ensureQueuedTaskHandled(command);          }          else  if  (!addIfUnderMaximumPoolSize(command))              reject(command);  // is shutdown or saturated      } }

   上面的程式碼可能看起來不是那麼容易理解,下面我們一句一句解釋:

  首先,判斷提交的任務command是否為null,若是null,則丟擲空指標異常;

  接著是這句,這句要好好理解一下:

1 if  (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))

   由於是或條件運算子,所以先計算前半部分的值,如果執行緒池中當前執行緒數不小於核心池大小,那麼就會直接進入下面的if語句塊了。

  如果執行緒池中當前執行緒數小於核心池大小,則接著執行後半部分,也就是執行

1 addIfUnderCorePoolSize(command)

  如果執行完addIfUnderCorePoolSize這個方法返回false,則繼續執行下面的if語句塊,否則整個方法就直接執行完畢了。

  如果執行完addIfUnderCorePoolSize這個方法返回false,然後接著判斷:

1 if  (runState == RUNNING && workQueue.offer(command))

   如果當前執行緒池處於RUNNING狀態,則將任務放入任務快取佇列;如果當前執行緒池不處於RUNNING狀態或者任務放入快取佇列失敗,則執行:

1 addIfUnderMaximumPoolSize(command)

  如果執行addIfUnderMaximumPoolSize方法失敗,則執行reject()方法進行任務拒絕處理。

  回到前面:

1 if  (runState == RUNNING && workQueue.offer(command))

   這句的執行,如果說當前執行緒池處於RUNNING狀態且將任務放入任務快取佇列成功,則繼續進行判斷:

1 if  (runState != RUNNING || poolSize ==  0 )

   這句判斷是為了防止在將此任務新增進任務快取佇列的同時其他執行緒突然呼叫shutdown或者shutdownNow方法關閉了執行緒池的一種應急措施。如果是這樣就執行:

1 ensureQueuedTaskHandled(command)

   進行應急處理,從名字可以看出是保證 新增到任務快取佇列中的任務得到處理。

  我們接著看2個關鍵方法的實現:addIfUnderCorePoolSize和addIfUnderMaximumPoolSize:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private  boolean  addIfUnderCorePoolSize(Runnable firstTask) {      Thread t =  null ;      final  ReentrantLock mainLock =  this .mainLock;      mainLock.lock();      try  {          if  (poolSize < corePoolSize && runState == RUNNING)              t = addThread(firstTask);         //建立執行緒去執行firstTask任務             finally  {          mainLock.unlock();      }      if  (t ==  null )          return  false ;      t.start();      return  true ; <