1. 程式人生 > >java 執行緒池 Executors 及 ThreadPoolExecutor

java 執行緒池 Executors 及 ThreadPoolExecutor

Java通過Executors提供四種執行緒池,分別為: 
newCachedThreadPool建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。 
newFixedThreadPool 建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。 
newScheduledThreadPool 建立一個定長執行緒池,支援定時及週期性任務執行。 
newSingleThreadExecutor 建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。

(1) newCachedThreadPool

 
建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。示例程式碼如下:

package test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExecutorTest {
 public static void main(String[] args) {
  ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
  for (int i = 0; i < 10; i++) {
   final int index = i;
   try {
    Thread.sleep(index * 1000);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
   cachedThreadPool.execute(new Runnable() {
    public void run() {
     System.out.println(index);
    }
   });
  }
 }
}

執行緒池為無限大,當執行第二個任務時第一個任務已經完成,會複用執行第一個任務的執行緒,而不用每次新建執行緒。 
  
(2) newFixedThreadPool 
建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。示例程式碼如下:

package test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExecutorTest {
 public static void main(String[] args) {
  ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
  for (int i = 0; i < 10; i++) {
   final int index = i;
   fixedThreadPool.execute(new Runnable() {
    public void run() {
     try {
      System.out.println(index);
      Thread.sleep(2000);
     } catch (InterruptedException e) {
      e.printStackTrace();
     }
    }
   });
  }
 }
}

  
因為執行緒池大小為3,每個任務輸出index後sleep 2秒,所以每兩秒列印3個數字。 
定長執行緒池的大小最好根據系統資源進行設定。如Runtime.getRuntime().availableProcessors()

(3)  newScheduledThreadPool 
建立一個定長執行緒池,支援定時及週期性任務執行。延遲執行示例程式碼如下:

package test;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExecutorTest {
 public static void main(String[] args) {
  ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
  scheduledThreadPool.schedule(new Runnable() {
   public void run() {
    System.out.println("delay 3 seconds");
   }
  }, 3, TimeUnit.SECONDS);
 }
}

  
表示延遲3秒執行。

定期執行示例程式碼如下:

package test;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExecutorTest {
 public static void main(String[] args) {
  ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
  scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
   public void run() {
    System.out.println("delay 1 seconds, and excute every 3 seconds");
   }
  }, 1, 3, TimeUnit.SECONDS);
 }
}

  
表示延遲1秒後每3秒執行一次。

(4) newSingleThreadExecutor 
建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。示例程式碼如下:

package test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExecutorTest {
 public static void main(String[] args) {
  ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
  for (int i = 0; i < 10; i++) {
   final int index = i;
   singleThreadExecutor.execute(new Runnable() {
    public void run() {
     try {
      System.out.println(index);
      Thread.sleep(2000);
     } catch (InterruptedException e) {
      e.printStackTrace();
     }
    }
   });
  }
 }
}

  
結果依次輸出,相當於順序執行各個任務。

你可以使用JDK自帶的監控工具來監控我們建立的執行緒數量,執行一個不終止的執行緒,建立指定量的執行緒,來觀察: 
工具目錄: C:\Program Files\Java\jdk1.6.0_06\bin\jconsole.exe 
執行程式做稍微修改:

package test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExecutorTest {
 public static void main(String[] args) {
  ExecutorService singleThreadExecutor = Executors.newCachedThreadPool();
  for (int i = 0; i < 100; i++) {
   final int index = i;
   singleThreadExecutor.execute(new Runnable() {
    public void run() {
     try {
      while(true) {
       System.out.println(index);
       Thread.sleep(10 * 1000);
      }
     } catch (InterruptedException e) {
      e.printStackTrace();
     }
    }
   });
   try {
    Thread.sleep(500);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  }
 }
}
ThreadPoolExecutor機制    轉自http://825635381.iteye.com/blog/2184680
一、概述 
1、ThreadPoolExecutor作為java.util.concurrent包對外提供基礎實現,以內部執行緒池的形式對外提供管理任務執行,執行緒排程,執行緒池管理等等服務; 
2、Executors方法提供的執行緒服務,都是通過引數設定來實現不同的執行緒池機制。 
3、先來了解其執行緒池管理的機制,有助於正確使用,避免錯誤使用導致嚴重故障。同時可以根據自己的需求實現自己的執行緒池
 

二、核心構造方法講解 
下面是ThreadPoolExecutor最核心的構造方法 
Java程式碼  收藏程式碼
  1. public ThreadPoolExecutor(int corePoolSize,  
  2.                               int maximumPoolSize,  
  3.                               long keepAliveTime,  
  4.                               TimeUnit unit,  
  5.                               BlockingQueue<Runnable> workQueue,  
  6.                               ThreadFactory threadFactory,  
  7.                               RejectedExecutionHandler handler) {  
  8.         if (corePoolSize < 0 ||  
  9.             maximumPoolSize <= 0 ||  
  10.             maximumPoolSize < corePoolSize ||  
  11.             keepAliveTime < 0)  
  12.             throw new IllegalArgumentException();  
  13.         if (workQueue == null || threadFactory == null || handler == null)  
  14.             throw new NullPointerException();  
  15.         this.corePoolSize = corePoolSize;  
  16.         this.maximumPoolSize = maximumPoolSize;  
  17.         this.workQueue = workQueue;  
  18.         this.keepAliveTime = unit.toNanos(keepAliveTime);  
  19.         this.threadFactory = threadFactory;  
  20.         this.handler = handler;  
  21.     }  

構造方法引數講解 
引數名 作用
corePoolSize 核心執行緒池大小
maximumPoolSize 最大執行緒池大小
keepAliveTime 執行緒池中超過corePoolSize數目的空閒執行緒最大存活時間;可以allowCoreThreadTimeOut(true)使得核心執行緒有效時間
TimeUnit keepAliveTime時間單位
workQueue 阻塞任務佇列
threadFactory 新建執行緒工廠
RejectedExecutionHandler 當提交任務數超過maxmumPoolSize+workQueue之和時,任務會交給RejectedExecutionHandler來處理


重點講解: 
其中比較容易讓人誤解的是:corePoolSize,maximumPoolSize,workQueue之間關係。 

1.當執行緒池小於corePoolSize時,新提交任務將建立一個新執行緒執行任務,即使此時執行緒池中存在空閒執行緒。 
2.當執行緒池達到corePoolSize時,新提交任務將被放入workQueue中,等待執行緒池中任務排程執行 
3.當workQueue已滿,且maximumPoolSize>corePoolSize時,新提交任務會建立新執行緒執行任務 
4.當提交任務數超過maximumPoolSize時,新提交任務由RejectedExecutionHandler處理 
5.當執行緒池中超過corePoolSize執行緒,空閒時間達到keepAliveTime時,關閉空閒執行緒 
6.當設定allowCoreThreadTimeOut(true)時,執行緒池中corePoolSize執行緒空閒時間達到keepAliveTime也將關閉
 

執行緒管理機制圖示: 


三、Executors提供的執行緒池配置方案 

1、構造一個固定執行緒數目的執行緒池,配置的corePoolSize與maximumPoolSize大小相同,同時使用了一個無界LinkedBlockingQueue存放阻塞任務,因此多餘的任務將存在再阻塞佇列,不會由RejectedExecutionHandler處理
 
Java程式碼  收藏程式碼
  1. public static ExecutorService newFixedThreadPool(int nThreads) {  
  2.         return new ThreadPoolExecutor(nThreads, nThreads,  
  3.                                       0L, TimeUnit.MILLISECONDS,  
  4.                                       new LinkedBlockingQueue<Runnable>());  
  5.     }  

2、構造一個緩衝功能的執行緒池,配置corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,keepAliveTime=60s,以及一個無容量的阻塞佇列 SynchronousQueue,因此任務提交之後,將會建立新的執行緒執行;執行緒空閒超過60s將會銷燬 
Java程式碼  收藏程式碼
  1. public static ExecutorService newCachedThreadPool() {  
  2.         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  
  3.                                       60L, TimeUnit.SECONDS,  
  4.                                       new SynchronousQueue<Runnable>());  
  5.     }  

3、構造一個只支援一個執行緒的執行緒池,配置corePoolSize=maximumPoolSize=1,無界阻塞佇列LinkedBlockingQueue;保證任務由一個執行緒序列執行 
Java程式碼  收藏程式碼
  1. public static ExecutorService newSingleThreadExecutor() {  
  2.         return new FinalizableDelegatedExecutorService  
  3.             (new ThreadPoolExecutor(11,  
  4.                                     0L, TimeUnit.MILLISECONDS,  
  5.                                     new LinkedBlockingQueue<Runnable>()));  
  6.     }  

4、構造有定時功能的執行緒池,配置corePoolSize,無界延遲阻塞佇列DelayedWorkQueue;有意思的是:maximumPoolSize=Integer.MAX_VALUE,由於DelayedWorkQueue是無界佇列,所以這個值是沒有意義的 
Java程式碼  收藏程式碼
  1. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {  
  2.         return new ScheduledThreadPoolExecutor(corePoolSize);  
  3.     }  
  4. public static ScheduledExecutorService newScheduledThreadPool(  
  5.             int corePoolSize, ThreadFactory threadFactory) {  
  6.         return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);  
  7.     }  
  8. public ScheduledThreadPoolExecutor(int corePoolSize,  
  9.                              ThreadFactory threadFactory) {  
  10.         super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,  
  11.               new DelayedWorkQueue(), threadFactory);  
  12.     }  


四、定製屬於自己的執行緒池 
Java程式碼  收藏程式碼
  1. import java.util.concurrent.ArrayBlockingQueue;  
  2. import java.util.concurrent.ExecutorService;  
  3. import java.util.concurrent.RejectedExecutionHandler;  
  4. import java.util.concurrent.ThreadFactory;  
  5. import java.util.concurrent.ThreadPoolExecutor;  
  6. import java.util.concurrent.TimeUnit;  
  7. import java.util.concurrent.atomic.AtomicInteger;  
  8. public class CustomThreadPoolExecutor {  
  9.     private ThreadPoolExecutor pool = null;  
  10.     /** 
  11.      * 執行緒池初始化方法 
  12.      *  
  13.      * corePoolSize 核心執行緒池大小----10 
  14.      * maximumPoolSize 最大執行緒池大小----30 
  15.      * keepAliveTime 執行緒池中超過corePoolSize數目的空閒執行緒最大存活時間----30+單位TimeUnit 
  16.      * TimeUnit keepAliveTime時間單位----TimeUnit.MINUTES 
  17.      * workQueue 阻塞佇列----new ArrayBlockingQueue<Runnable>(10)====10容量的阻塞佇列 
  18.      * threadFactory 新建執行緒工廠----new CustomThreadFactory()====定製的執行緒工廠 
  19.      * rejectedExecutionHandler 當提交任務數超過maxmumPoolSize+workQueue之和時, 
  20.      *                          即當提交第41個任務時(前面執行緒都沒有執行完,此測試方法中用sleep(100)), 
  21.      *                                任務會交給RejectedExecutionHandler來處理 
  22.      */  
  23.     public void init() {  
  24.         pool = new ThreadPoolExecutor(  
  25.                 10,  
  26.                 30,  
  27.                 30,  
  28.                 TimeUnit.MINUTES,  
  29.                 new ArrayBlockingQueue<Runnable>(10),  
  30.                 new CustomThreadFactory(),  
  31.                 new CustomRejectedExecutionHandler());  
  32.     }  
  33.     public void destory() {  
  34.         if(pool != null) {  
  35.             pool.shutdownNow();  
  36.         }  
  37.     }  
  38.     public ExecutorService getCustomThreadPoolExecutor() {  
  39.         return this.pool;  
  40.     }  
  41.     private class CustomThreadFactory implements ThreadFactory {  
  42.         private AtomicInteger count = new AtomicInteger(0);  
  43.         @Override  
  44.         public Thread newThread(Runnable r) {  
  45.             Thread t = new Thread(r);  
  46.             String threadName = CustomThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1);  
  47.             System.out.println(threadName);  
  48.             t.setName(threadName);  
  49.             return t;  
  50.         }  
  51.     }  
  52.     private class CustomRejectedExecutionHandler implements RejectedExecutionHandler {  
  53.         @Override  
  54.         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {  
  55.             // 記錄異常  
  56.             // 報警處理等  
  57.             System.out.println("error.............");  
  58.         }  
  59.     }  
  60.     // 測試構造的執行緒池  
  61.     public static void main(String[] args) {  
  62.         CustomThreadPoolExecutor exec = new CustomThreadPoolExecutor();  
  63.         // 1.初始化  
  64.         exec.init();  
  65.         ExecutorService pool = exec.getCustomThreadPoolExecutor();  
  66.         for(int i=1; i<100; i++) {  
  67.             System.out.println("提交第" + i + "個任務!");  
  68.             pool.execute(new Runnable() {  
  69.                 @Override  
  70.                 public void run() {  
  71.                     try {  
  72.                         Thread.sleep(300);  
  73.                     } catch (InterruptedException e) {  
  74.                         e.printStackTrace();  
  75.                     }  
  76.                     System.out.println("running=====");  
  77.                 }  
  78.             });  
  79.         }  
  80.         // 2.銷燬----此處不能銷燬,因為任務沒有提交執行完,如果銷燬執行緒池,任務也就無法執行了  
  81.         // exec.destory();  
  82.         try {  
  83.             Thread.sleep(10000);  
  84.         } catch (InterruptedException e) {  
  85.             e.printStackTrace();  
  86.         }  
  87.     }  
  88. }  

方法中建立一個核心執行緒數為30個,緩衝佇列有10個的執行緒池。每個執行緒任務,執行時會先睡眠0.1秒,保證提交40個任務時沒有任務被執行完,這樣提交第41個任務是,會交給CustomRejectedExecutionHandler 類來處理。