1. 程式人生 > >轉:【Java並發編程】之十九:並發新特性—Executor框架與線程池(含代碼)

轉:【Java並發編程】之十九:並發新特性—Executor框架與線程池(含代碼)

接口類 容易 20px 了解 大小 執行c 生命周期 schedule p s

Executor框架簡介

在Java 5之後並發編程引入了一堆新的啟動、調度和管理線程的API。Executor框架便是Java 5中引入的,其內部使用了線程池機制,它在java.util.cocurrent 包下,通過該框架來控制線程的啟動、執行和關閉,可以簡化並發編程的操作。因此,在Java 5之後,通過Executor來啟動線程比使用Thread的start方法更好,除了更易管理,效率更好(用線程池實現,節約開銷)外,還有關鍵的一點:有助於避免this逃逸問題——如果我們在構造器中啟動一個線程,因為另一個任務可能會在構造器結束之前開始執行,此時可能會訪問到初始化了一半的對象用Executor在構造器中。

Executor框架包括:線程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。

Executor接口中之定義了一個方法execute(Runnable command),該方法接收一個Runable實例,它用來執行一個任務,任務即一個實現了Runnable接口的類。ExecutorService接口繼承自Executor接口,它提供了更豐富的實現多線程的方法,比如,ExecutorService提供了關閉自己的方法,以及可為跟蹤一個或多個異步任務執行狀況而生成 Future 的方法。 可以調用ExecutorService的shutdown()方法來平滑地關閉 ExecutorService,調用該方法後,將導致ExecutorService停止接受任何新的任務且等待已經提交的任務執行完成(已經提交的任務會分兩類:一類是已經在執行的,另一類是還沒有開始執行的),當所有已經提交的任務執行完畢後將會關閉ExecutorService。因此我們一般用該接口來實現和管理多線程。

ExecutorService的生命周期包括三種狀態:運行、關閉、終止。創建後便進入運行狀態,當調用了shutdown()方法時,便進入關閉狀態,此時意味著ExecutorService不再接受新的任務,但它還在執行已經提交了的任務,當素有已經提交了的任務執行完後,便到達終止狀態。如果不調用shutdown()方法,ExecutorService會一直處在運行狀態,不斷接收新的任務,執行新的任務,服務器端一般不需要關閉它,保持一直運行即可。

Executors提供了

public static ExecutorService newFixedThreadPool(int nThreads)

創建固定數目線程的線程池。

public static ExecutorService newCachedThreadPool()

創建一個可緩存的線程池,調用execute將重用以前構造的線程(如果線程可用)。如果現有線程沒有可用的,則創建一個新線 程並添加到池中。終止並從緩存中移除那些已有 60 秒鐘未被使用的線程。

public static ExecutorService newSingleThreadExecutor()

創建一個單線程化的Executor。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

創建一個支持定時及周期性的任務執行的線程池,多數情況下可用來替代Timer類。

這四種方法都是用的Executors中的


newCachedThreadPool()

-緩存型池子,先查看池中有沒有以前建立的線程,如果有,就 reuse.如果沒有,就建一個新的線程加入池中
-緩存型池子通常用於執行一些生存期很短的異步型任務
因此在一些面向連接的daemon型SERVER中用得不多。但對於生存期短的異步任務,它是Executor的首選。
-能reuse的線程,必須是timeout IDLE內的池中線程,缺省 timeout是60s,超過這個IDLE時長,線程實例將被終止及移出池。
註意,放入CachedThreadPool的線程不必擔心其結束,超過TIMEOUT不活動,其會自動被終止。



newFixedThreadPool(int)

-newFixedThreadPool與cacheThreadPool差不多,也是能reuse就用,但不能隨時建新的線程
-其獨特之處:任意時間點,最多只能有固定數目的活動線程存在,此時如果有新的線程要建立,只能放在另外的隊列中等待直到當前的線程中某個線程終止直接被移出池子
-和cacheThreadPool不同,FixedThreadPool沒有IDLE機制(可能也有,但既然文檔沒提,肯定非常長,類似依賴上層的TCP或UDP IDLE機制之類的),所以FixedThreadPool多數針對一些很穩定很固定的正規並發線程,多用於服務器
-從方法的源代碼看,cache池和fixed 池調用的是同一個底層 池,只不過參數不同:
fixed池線程數固定,並且是0秒IDLE(無IDLE)
cache池線程數支持0-Integer.MAX_VALUE(顯然完全沒考慮主機的資源承受能力),60秒IDLE

newScheduledThreadPool(int)

-調度型線程池
-這個池子裏的線程可以按schedule依次delay執行,或周期執行

SingleThreadExecutor()

-單例線程,任意時間池中只能有一個線程
-用的是和cache池和fixed池相同的底層池,但線程數目是1-1,0秒IDLE(無IDLE)


一般來說,CachedTheadPool在程序執行過程中通常會創建與所需數量相同的線程,然後在它回收舊線程時停止創建新線程,因此它是合理的Executor的首選,只有當這種方式會引發問題時(比如需要大量長時間面向連接的線程時),才需要考慮用FixedThreadPool。(該段話摘自《Thinking in Java》第四版)

Executor執行Runnable任務

ExecutorService實例,而後調用該實例的execute(Runnable command)方法即可。一旦Runnable任務傳遞到execute()方法,該方法便會自動在一個線程上執行。下面是是Executor執行Runnable任務的示例代碼:

[java] view plain copy
  1. import java.util.concurrent.ExecutorService;
  2. import java.util.concurrent.Executors;
  3. public class TestCachedThreadPool{
  4. public static void main(String[] args){
  5. ExecutorService executorService = Executors.newCachedThreadPool();
  6. // ExecutorService executorService = Executors.newFixedThreadPool(5);
  7. // ExecutorService executorService = Executors.newSingleThreadExecutor();
  8. for (int i = 0; i < 5; i++){
  9. executorService.execute(new TestRunnable());
  10. System.out.println("************* a" + i + " *************");
  11. }
  12. executorService.shutdown();
  13. }
  14. }
  15. class TestRunnable implements Runnable{
  16. public void run(){
  17. System.out.println(Thread.currentThread().getName() + "線程被調用了。");
  18. }
  19. }

某次執行後的結果如下:

技術分享

從結果中可以看出,pool-1-thread-1和pool-1-thread-2均被調用了兩次,這是隨機的,execute會首先在線程池中選擇一個已有空閑線程來執行任務,如果線程池中沒有空閑線程,它便會創建一個新的線程來執行任務。

Executor執行Callable任務

在Java 5之後,任務分兩類:一類是實現了Runnable接口的類,一類是實現了Callable接口的類。兩者都可以被ExecutorService執行,但是Runnable任務沒有返回值,而Callable任務有返回值。並且Callable的call()方法只能通過ExecutorService的submit(Callable<T> task) 方法來執行,並且返回一個 <T>Future<T>,是表示任務等待完成的 Future。

Callable接口類似於Runnable,兩者都是為那些其實例可能被另一個線程執行的類設計的。但是 Runnable 不會返回結果,並且無法拋出經過檢查的異常而Callable又返回結果,而且當獲取返回結果時可能會拋出異常。Callable中的call()方法類似Runnable的run()方法,區別同樣是有返回值,後者沒有。

當將一個Callable的對象傳遞給ExecutorService的submit方法,則該call方法自動在一個線程上執行,並且會返回執行結果Future對象。同樣,將Runnable的對象傳遞給ExecutorService的submit方法,則該run方法自動在一個線程上執行,並且會返回執行結果Future對象,但是在該Future對象上調用get方法,將返回null。

下面給出一個Executor執行Callable任務的示例代碼:

[java] view plain copy
  1. import java.util.ArrayList;
  2. import java.util.List;
  3. import java.util.concurrent.*;
  4. public class CallableDemo{
  5. public static void main(String[] args){
  6. ExecutorService executorService = Executors.newCachedThreadPool();
  7. List<Future<String>> resultList = new ArrayList<Future<String>>();
  8. //創建10個任務並執行
  9. for (int i = 0; i < 10; i++){
  10. //使用ExecutorService執行Callable類型的任務,並將結果保存在future變量中
  11. Future<String> future = executorService.submit(new TaskWithResult(i));
  12. //將任務執行結果存儲到List中
  13. resultList.add(future);
  14. }
  15. //遍歷任務的結果
  16. for (Future<String> fs : resultList){
  17. try{
  18. while(!fs.isDone);//Future返回如果沒有完成,則一直循環等待,直到Future返回完成
  19. System.out.println(fs.get()); //打印各個線程(任務)執行的結果
  20. }catch(InterruptedException e){
  21. e.printStackTrace();
  22. }catch(ExecutionException e){
  23. e.printStackTrace();
  24. }finally{
  25. //啟動一次順序關閉,執行以前提交的任務,但不接受新任務
  26. executorService.shutdown();
  27. }
  28. }
  29. }
  30. }
  31. class TaskWithResult implements Callable<String>{
  32. private int id;
  33. public TaskWithResult(int id){
  34. this.id = id;
  35. }
  36. /**
  37. * 任務的具體過程,一旦任務傳給ExecutorService的submit方法,
  38. * 則該方法自動在一個線程上執行
  39. */
  40. public String call() throws Exception {
  41. System.out.println("call()方法被自動調用!!! " + Thread.currentThread().getName());
  42. //該返回結果將被Future的get方法得到
  43. return "call()方法被自動調用,任務返回的結果是:" + id + " " + Thread.currentThread().getName();
  44. }
  45. }

某次執行結果如下:

技術分享

從結果中可以同樣可以看出,submit也是首先選擇空閑線程來執行任務,如果沒有,才會創建新的線程來執行任務。另外,需要註意:如果Future的返回尚未完成,則get()方法會阻塞等待,直到Future完成返回,可以通過調用isDone()方法判斷Future是否完成了返回。

自定義線程池

自定義線程池,可以用ThreadPoolExecutor類創建,它有多個構造方法來創建線程池,用該類很容易實現自定義的線程池,這裏先貼上示例程序:

[java] view plain copy
  1. import java.util.concurrent.ArrayBlockingQueue;
  2. import java.util.concurrent.BlockingQueue;
  3. import java.util.concurrent.ThreadPoolExecutor;
  4. import java.util.concurrent.TimeUnit;
  5. public class ThreadPoolTest{
  6. public static void main(String[] args){
  7. //創建等待隊列
  8. BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(20);
  9. //創建線程池,池中保存的線程數為3,允許的最大線程數為5
  10. ThreadPoolExecutor pool = new ThreadPoolExecutor(3,5,50,TimeUnit.MILLISECONDS,bqueue);
  11. //創建七個任務
  12. Runnable t1 = new MyThread();
  13. Runnable t2 = new MyThread();
  14. Runnable t3 = new MyThread();
  15. Runnable t4 = new MyThread();
  16. Runnable t5 = new MyThread();
  17. Runnable t6 = new MyThread();
  18. Runnable t7 = new MyThread();
  19. //每個任務會在一個線程上執行
  20. pool.execute(t1);
  21. pool.execute(t2);
  22. pool.execute(t3);
  23. pool.execute(t4);
  24. pool.execute(t5);
  25. pool.execute(t6);
  26. pool.execute(t7);
  27. //關閉線程池
  28. pool.shutdown();
  29. }
  30. }
  31. class MyThread implements Runnable{
  32. @Override
  33. public void run(){
  34. System.out.println(Thread.currentThread().getName() + "正在執行。。。");
  35. try{
  36. Thread.sleep(100);
  37. }catch(InterruptedException e){
  38. e.printStackTrace();
  39. }
  40. }
  41. }

運行結果如下:

技術分享

從結果中可以看出,七個任務是在線程池的三個線程上執行的。這裏簡要說明下用到的ThreadPoolExecuror類的構造方法中各個參數的含義。

public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue)

maximumPoolSize:池中允許的最大線程數。

keepAliveTime:線程池中的空閑線程所能持續的最長時間。

unit:持續時間的單位。

workQueue:任務執行前保存任務的隊列,僅保存由execute方法提交的Runnable任務。

根據ThreadPoolExecutor源碼前面大段的註釋,我們可以看出,當試圖通過excute方法講一個Runnable任務添加到線程池中時,按照如下順序來處理:

1、如果線程池中的線程數量少於corePoolSize,即使線程池中有空閑線程,也會創建一個新的線程來執行新添加的任務;

2、如果線程池中的線程數量大於等於corePoolSize,但緩沖隊列workQueue未滿,則將新添加的任務放到workQueue中,按照FIFO的原則依次等待執行(線程池中有線程空閑出來後依次將緩沖隊列中的任務交付給空閑的線程執行);

3、如果線程池中的線程數量大於等於corePoolSize,且緩沖隊列workQueue已滿,但線程池中的線程數量小於maximumPoolSize,則會創建新的線程來處理被添加的任務;

4、如果線程池中的線程數量等於了maximumPoolSize,有4種才處理方式(該構造方法調用了含有5個參數的構造方法,並將最後一個構造方法為RejectedExecutionHandler類型,它在處理線程溢出時有4種方式,這裏不再細說,要了解的,自己可以閱讀下源碼)。

總結起來,也即是說,當有新的任務要處理時,先看線程池中的線程數量是否大於corePoolSize,再看緩沖隊列workQueue是否滿,最後看線程池中的線程數量是否大於maximumPoolSize。

另外,當線程池中的線程數量大於corePoolSize時,如果裏面有線程的空閑時間超過了keepAliveTime,就將其移除線程池,這樣,可以動態地調整線程池中線程的數量。

我們大致來看下Executors的源碼,newCachedThreadPool的不帶RejectedExecutionHandler參數(即第五個參數,線程數量超過maximumPoolSize時,指定處理方式)的構造方法如下:

[java] view plain copy
  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3. 60L, TimeUnit.SECONDS,
  4. new SynchronousQueue<Runnable>());
  5. }

它將corePoolSize設定為0,而將maximumPoolSize設定為了Integer的最大值,線程空閑超過60秒,將會從線程池中移除。由於核心線程數為0,因此每次添加任務,都會先從線程池中找空閑線程,如果沒有就會創建一個線程(SynchronousQueue<Runnalbe>決定的,後面會說)來執行新的任務,並將該線程加入到線程池中,而最大允許的線程數為Integer的最大值,因此這個線程池理論上可以不斷擴大。

再來看newFixedThreadPool的不帶RejectedExecutionHandler參數的構造方法,如下:

[java] view plain copy
  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }

它將corePoolSize和maximumPoolSize都設定為了nThreads,這樣便實現了線程池的大小的固定,不會動態地擴大,另外,keepAliveTime設定為了0,也就是說線程只要空閑下來,就會被移除線程池,敢於LinkedBlockingQueue下面會說。

下面說說幾種排隊的策略:

1、直接提交。緩沖隊列采用 SynchronousQueue,它將任務直接交給線程處理而不保持它們。如果不存在可用於立即運行任務的線程(即線程池中的線程都在工作),則試圖把任務加入緩沖隊列將會失敗,因此會構造一個新的線程來處理新添加的任務,並將其加入到線程池中。直接提交通常要求無界 maximumPoolSizes(Integer.MAX_VALUE) 以避免拒絕新提交的任務。newCachedThreadPool采用的便是這種策略。

2、無界隊列。使用無界隊列(典型的便是采用預定義容量的 LinkedBlockingQueue,理論上是該緩沖隊列可以對無限多的任務排隊)將導致在所有 corePoolSize 線程都工作的情況下將新任務加入到緩沖隊列中。這樣,創建的線程就不會超過 corePoolSize,也因此,maximumPoolSize 的值也就無效了。當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界隊列。newFixedThreadPool采用的便是這種策略。

3、有界隊列。當使用有限的 maximumPoolSizes 時,有界隊列(一般緩沖隊列使用ArrayBlockingQueue,並制定隊列的最大長度)有助於防止資源耗盡,但是可能較難調整和控制,隊列大小和最大池大小需要相互折衷,需要設定合理的參數。

轉:【Java並發編程】之十九:並發新特性—Executor框架與線程池(含代碼)