1. 程式人生 > >java併發程式設計--Executor框架以及Executors類的建立執行緒池方法

java併發程式設計--Executor框架以及Executors類的建立執行緒池方法

       Eexecutor作為靈活且強大的非同步執行框架,其支援多種不同型別的任務執行策略,提供了一種標準的方法將任務的提交過程和執行過程解耦開發,基於生產者-消費者模式,其提交任務的執行緒相當於生產者,執行任務的執行緒相當於消費者,並用Runnable來表示任務,Executor的實現還提供了對生命週期的支援,以及統計資訊收集,應用程式管理機制和效能監視等機制。

1.Exexctor簡介

Executor的UML圖:(常用的幾個介面和子類)

Executor:一個介面,其定義了一個接收Runnable物件的方法executor,其方法簽名為executor(Runnable command),

ExecutorService:是一個比Executor使用更廣泛的子類介面,其提供了生命週期管理的方法,以及可跟蹤一個或多個非同步任務執行狀況返回Future的方法

AbstractExecutorService:ExecutorService執行方法的預設實現

ScheduledExecutorService:一個可定時排程任務的介面

ScheduledThreadPoolExecutor:ScheduledExecutorService的實現,一個可定時排程任務的執行緒池

ThreadPoolExecutor:執行緒池,可以通過呼叫Executors以下靜態工廠方法來建立執行緒池並返回一個ExecutorService物件:

2.ThreadPoolExecutor建構函式的各個引數說明

ThreadPoolExecutor方法簽名:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) //後兩個引數為可選引數

引數說明:

corePoolSize:核心執行緒數,如果執行的執行緒少於corePoolSize,則建立新執行緒來執行新任務,即使執行緒池中的其他執行緒是空閒的

maximumPoolSize:最大執行緒數,可允許建立的執行緒數,corePoolSize和maximumPoolSize設定的邊界自動調整池大小:

corePoolSize <執行的執行緒數< maximumPoolSize:僅當佇列滿時才建立新執行緒

corePoolSize=執行的執行緒數= maximumPoolSize:建立固定大小的執行緒池

keepAliveTime:如果執行緒數多於corePoolSize,則這些多餘的執行緒的空閒時間超過keepAliveTime時將被終止

unit:keepAliveTime引數的時間單位

workQueue:儲存任務的阻塞佇列,與執行緒池的大小有關:

  當執行的執行緒數少於corePoolSize時,在有新任務時直接建立新執行緒來執行任務而無需再進佇列

  當執行的執行緒數等於或多於corePoolSize,在有新任務新增時則選加入佇列,不直接建立執行緒

  當佇列滿時,在有新任務時就建立新執行緒

threadFactory:使用ThreadFactory建立新執行緒,預設使用defaultThreadFactory建立執行緒

handle:定義處理被拒絕任務的策略,預設使用ThreadPoolExecutor.AbortPolicy,任務被拒絕時將丟擲RejectExecutorException

3.Executors:提供了一系列靜態工廠方法用於建立各種執行緒池

   newFixedThreadPool:建立可重用且固定執行緒數的執行緒池,核心執行緒數和最大執行緒數是一致的,採用LinkedBlockingQueue作為阻塞等待佇列。如果執行緒池中的所有執行緒(核心執行緒數量)都處於活動狀態,此時再提交任務就在佇列中等待,直到有可用執行緒;如果執行緒池中的某個執行緒由於異常而結束時,執行緒池就會再補充一條新執行緒。

方法簽名:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  //使用一個基於FIFO排序的阻塞佇列,在所有corePoolSize執行緒都忙時新任務將在佇列中等待
                                  new LinkedBlockingQueue<Runnable>());
}

    newSingleThreadExecutor:建立一個單執行緒的Executor,如果該執行緒因為異常而結束就新建一條執行緒來繼續執行後續的任務,由於LinkedBlockingQueue是FIFO的,所以能夠保證一定按照順序完成任務。

原文是這麼說的:使用一個工作執行緒操作一個無界佇列。(但是要注意,如果這個執行緒在關閉之前的執行過程中出現故障而終止,如果需要執行後續任務,則會有一個新的執行緒代替它)任務被保證按順序執行,並且在任何給定的時間內不會有超過一個任務處於活動狀態。與其他等效的{@code newFixedThreadPool(1)}不同的是,返回的執行器保證不能重新配置以使用其他執行緒。

方法簽名:
public static ExecutorService newSingleThreadExecutor() {
   return new FinalizableDelegatedExecutorService
                     //corePoolSize和maximumPoolSize都等於,表示固定執行緒池大小為1
                        (new ThreadPoolExecutor(1, 1,
                                                0L, TimeUnit.MILLISECONDS,
                                                new LinkedBlockingQueue<Runnable>()));
}

   newScheduledThreadPool:建立一個可延遲執行或定期執行的執行緒池,採用的是DelayedWorkQueue延遲佇列。

方法簽名:

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory);
}

例1:(使用newScheduledThreadPool來模擬心跳機制)

public class HeartBeat {
     public static void main(String[] args) {
         ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
         Runnable task = new Runnable() {
             public void run() {
                 System.out.println("HeartBeat.........................");
             }
        };
         executor.scheduleAtFixedRate(task,5,3, TimeUnit.SECONDS);   //5秒後第一次執行,之後每隔3秒執行一次
     }
}

輸出:

HeartBeat....................... //5秒後第一次輸出
HeartBeat....................... //每隔3秒輸出一個

   newCachedThreadPool:建立可快取的執行緒池,如果執行緒池中的執行緒在60秒未被使用就將被移除,在執行新的任務時,當執行緒池中有之前建立的可用執行緒就重用可用執行緒,否則就新建一條執行緒。同時SynchronousQueue佇列是同步移交,並不會因為任務多就進行等待。

方法簽名:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  //使用同步佇列,將任務直接提交給執行緒
                                  new SynchronousQueue<Runnable>());
}

例2:

public class ThreadPoolTest {

    public static void main(String[] args) throws InterruptedException {

     ExecutorService threadPool = Executors.newCachedThreadPool();//執行緒池裡面的執行緒數會動態變化,並可線上程被移除前重用

        for (int i = 1; i <= 3; i ++) {

            final  int task = i;   //10個任務

            //TimeUnit.SECONDS.sleep(1);

            threadPool.execute(new Runnable() {    //接受一個Runnable例項

                public void run() {

                        System.out.println("執行緒名字: " + Thread.currentThread().getName() +  "  任務名為: "+task);

                }

            });

        }

    }

}

 

輸出:(為每個任務新建一條執行緒,共建立了3條執行緒)

執行緒名字: pool-1-thread-1 任務名為: 1
執行緒名字: pool-1-thread-2 任務名為: 2
執行緒名字: pool-1-thread-3 任務名為: 3

去掉第6行的註釋其輸出如下:(始終重複利用一條執行緒,因為newCachedThreadPool能重用可用執行緒)

執行緒名字: pool-1-thread-1 任務名為: 1
執行緒名字: pool-1-thread-1 任務名為: 2
執行緒名字: pool-1-thread-1 任務名為: 3

通過使用Executor可以很輕易的實現各種調優  管理  監視  記錄日誌和錯誤報告等待。

newWorkStealingPool:建立一個執行緒池,該執行緒池維護足夠的執行緒以支援給定的並行級別,底層使用的是支援任務分解的執行緒池ForkJoinPool,並可以使用多個佇列來減少爭用。並行級別對應於積極參與或可用來參與任務處理的執行緒的最大數量。執行緒的實際數量可以動態地增加和減少並保證提交的任務執行的順序。並行級別決定了同一時刻最多有多少個執行緒在執行,如不傳並行級別引數,將預設為當前系統的CPU個數。

方法簽名:

public static ExecutorService newWorkStealingPool(int parallelism) {
    return new ForkJoinPool
        (parallelism,
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}

new ForkJoinPool:是jdk1.8引入,是一種支援任務分解的執行緒池,當提交給他的任務“過大”,他就會按照預先定義的規則將大任務分解成小任務,多執行緒併發執行。 一般要配合可分解任務介面ForkJoinTask來使用,ForkJoinTask有兩個實現它的抽象類:RecursiveAction和RecursiveTask,其區別是前者沒有返回值,後者有返回值。
方法簽名:

public ForkJoinPool() {
    this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
         defaultForkJoinWorkerThreadFactory, null, false);
}

例3:

public static void main(String[] args)throws InterruptedException{
    ForkJoinPool pool =  new ForkJoinPool();
    Task task = new Task(60);
        pool.submit(task);
    pool.awaitTermination(6000,TimeUnit.MILLISECONDS);
    pool.shutdown();

}

public static class Task extends RecursiveAction{

    private static final long serialVersionUID = 1L;
    //定義一個分解任務的閾值——50,即一個任務最多承擔50個工作量
    int THRESHOLD=50;
    //任務量
    int task_Num=0;
    Task(int Num){
        this.task_Num=Num;
    }
    
    @Override
    protected void compute() {
        if(task_Num<=THRESHOLD){
            System.out.println(Thread.currentThread().getName()+"承擔了"+task_Num+"份工作");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }else{
            //隨機解成兩個任務
            Random m=new Random();
            int x=m.nextInt(50);

            Task left=new Task(x);
            Task right=new Task(task_Num-x);

            left.fork();
            right.fork();
        }
    }
}

 

4.Executor的生命週期

ExecutorService提供了管理Eecutor生命週期的方法,ExecutorService的生命週期包括了:執行  關閉和終止三種狀態。

ExecutorService在初始化建立時處於執行狀態。

shutdown方法等待提交的任務執行完成並不再接受新任務,在完成全部提交的任務後關閉

shutdownNow方法將強制終止所有執行中的任務並不再允許提交新任務

可以將一個Runnable(如例2)或Callable(如例3)提交給ExecutorService的submit方法執行,最終返回一上Futire用來獲得任務的執行結果或取消任務

例3:(任務執行完成後並返回執行結果)

public class CallableAndFuture {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ExecutorService executor = Executors.newSingleThreadExecutor();

        Future<String> future = executor.submit(new Callable<String>() {   //接受一上callable例項

            public String call() throws Exception {

                return "MOBIN";

            }

        });

        System.out.println("任務的執行結果:"+future.get());

    }

}

 

輸出:

任務的執行結果:MOBIN

ExecutorCompletionService:實現了CompletionService,將執行完成的任務放到阻塞佇列中,通過take或poll方法來獲得執行結果

例4:(啟動10條執行緒,誰先執行完成就返回誰)

public class CompletionServiceTest {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ExecutorService executor = Executors.newFixedThreadPool(10);        //建立含10.條執行緒的執行緒池

        CompletionService completionService = new ExecutorCompletionService(executor);

        for (int i =1; i <=10; i ++) {

            final  int result = i;

            completionService.submit(new Callable() {

                public Object call() throws Exception {

                    Thread.sleep(new Random().nextInt(5000));   //讓當前執行緒隨機休眠一段時間

                    return result;

                }

            });

        }

        System.out.println(completionService.take().get());   //獲取執行結果

    }

}

 

輸出結果可能每次都不同(在1到10之間)

3

通過Executor來設計應用程式可以簡化開發過程,提高開發效率,並有助於實現併發,在開發中如果需要建立執行緒可優先考慮使用Executor