1. 程式人生 > >Java併發包:ExecutorService和ThreadPoolExecutor

Java併發包:ExecutorService和ThreadPoolExecutor

ExecutorService

Java.util.concurrent.ExecutorService介面代表一種非同步執行機制,它能夠在後臺執行任務。因此ExecutorService與thread pool是非常相似的。事實上,在java.util.package包中ExecutorService的具體實現就是一個執行緒池的具體實現。

ExcutorService 例子

下面是一個簡單的例子

ExecutorService executorService = Executors.newFixedThreadPool(10);

executorService.execute(new Runnable() {
    public void run() {
        System.out.println("Asynchronous task");
    }
});

executorService.shutdown();

首先,通過newFixedThreadPool()工廠方法建立一個ExecutorService的例項。這個方法建立了一個可以有10個執行緒執行任務的執行緒池。

第二,Runnable介面的匿名實現類作為引數被傳遞給execute()方法。Runable將會被ExecutorService中的一個執行緒來執行。

任務委託(Task Delegation)

下面的圖片說明了一個執行緒委託一個任務給ExecutorService進行非同步執行:
這裡寫圖片描述
一旦,執行緒委託任務給ExecutorService,執行緒會獨立任務的執行而繼續自己之後的操作。

ExcutorService的使用說明

下面是委託任務給ExecutorService的一些不同的方式:

  • execute(Runnable)
  • submit(Runnable)
  • submit(Callable)
  • invokeAny(…)
  • invokeAll(…)

下面來逐個看看這些方法。

  • execute(Runnable)

execute(Runnable) 方法接受一個java.lang.Runable物件的例項,並非同步執行之。下面是一個使用ExecutorService執行Runnable的例子:

ExecutorService executorService = Executors.newSingleThreadExecutor();

executorService.execute(new Runnable() {
    public void run() {
        System.out.println("Asynchronous task");
    }
});

executorService.shutdown();

這種方式不能獲得Runnable執行的結果,如果有這種需要,你將要使用Callable。

  • submit(Runnable)

submit(Runnable) 方法也接收一個Runnable介面的具體實現,並返回一個Future物件。Future物件可以用來檢測Runable是否執行完成。

Future future = executorService.submit(new Runnable() {
    public void run() {
        System.out.println("Asynchronous task");
    }
});

future.get();  //returns null if the task has finished correctly.
  • submit(Callable)

submit(Callable)方法與submit(Runnable)方法相似,除了接收的引數有所不同。Callable例項非常類似於Runnable,不同的是call方法可以返回一個結果,Runnable.run()方法不能返回一個結果。

可以通過submit(Callable)方法返回的Future物件獲取Callable的結果。下面是一個使用Callable的例子:

Future future = executorService.submit(new Callable(){
    public Object call() throws Exception {
        System.out.println("Asynchronous Callable");
        return "Callable Result";
    }
});

System.out.println("future.get() = " + future.get());

上面程式碼的輸出結果是:

Asynchronous Callable
future.get() = Callable Result
  • invokeAny(…)

invokeAny()方法接收一個Callable物件或者Callable的子介面例項的集合作為引數,這個方法不會返回Future,但會返回集合中某一個Callable的結果。你不能確定你得到是哪個Callable的結果。只是已執行完成的Callable中的一個。

如果一個任務已經完成(或者丟擲了異常),剩餘的Callable任務將被取消。
下面是示例程式碼:

ExecutorService executorService = Executors.newSingleThreadExecutor();

Set<Callable<String>> callables = new HashSet<Callable<String>>();

callables.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 1";
    }
});
callables.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 2";
    }
});
callables.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 3";
    }
});

String result = executorService.invokeAny(callables);

System.out.println("result = " + result);

executorService.shutdown();

示例程式碼將會列印給定的Callable集合中一個Callable任務返回的結果。我嘗試執行了多次,結果是變化的。有時候是“Task1”,有時候是“Task 2”等。

  • invokeAll(…)

invokeAll()接收一個Callable物件的集合作為引數,該方法會呼叫你傳給他的集合中的所有Callable物件。invokeAll()會返回一個Future物件的列表,通過這個列表你可以獲取每一個Callable執行的結果。

記住一個任務可能會因為一個異常而結束,因此這時任務並不是真正意義上執行成功了。這在Future上是沒有辦法來判斷的。

下面是示例程式碼:

ExecutorService executorService = Executors.newSingleThreadExecutor();

Set<Callable<String>> callables = new HashSet<Callable<String>>();

callables.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 1";
    }
});
callables.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 2";
    }
});
callables.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 3";
    }
});

List<Future<String>> futures = executorService.invokeAll(callables);

for(Future<String> future : futures){
    System.out.println("future.get = " + future.get());
}

executorService.shutdown();

ExecutorService Shutdown

當你是使用完ExecutorService後,你應該關閉它,使得執行緒不能持續執行。例如,你的應用程式從main()方法開始並且你的主執行緒退出應用程式,這時如果存在啟用狀態的ExecutorService,你的應用程式將仍然會保持執行。ExecutorService中啟用的執行緒會阻止JVM關閉。

為了終止ExecutorService中的執行緒,你需要呼叫shutdown()方法。ExecutorService不會立即關閉,但是它也不會接受新的任務,直到它裡面的所有執行緒都執行完畢,ExecutorService才會關閉。所有提交到ExecutorService中的任務會在呼叫shutdown()方法之前被執行。

如果你想立即關閉ExecutorService,你可以呼叫shutdownNow()方法。這將會嘗試立即停止所有正在執行的任務,並且忽略所有提交的但未被處理的任務。對於正在執行的任務是不能確定的,也許它們停止了,也行它們執行直到結束。

ThreadPoolExecutor

Java.util.concurrent.ThreadPoolExecutor類是ExecutorSerivce介面的具體實現。ThreadPoolExecutor使用執行緒池中的一個執行緒來執行給定的任務(Runnable或者Runnable)。

ThreadPoolExecutor內部的執行緒池包含不定數量的執行緒。池中執行緒的數量由下面的這些變數決定:

  • corePoolSize
  • maximumPoolSize

當一個任務委託給執行緒池執行,此時如果池執行緒中執行緒數少於corePoolSize,即使池中有空閒的執行緒,執行緒池中也會建立一個新的執行緒。

如果任務佇列是滿的,corePoolSize個執行緒或者更多的且少於maximumPoolSize的執行緒正在執行,也會建立一個新的執行緒來執行任務。

下面圖釋ThreadPoolExecutor這種原理:
這裡寫圖片描述

建立ThreadPoolExecutor

ThreadPoolExecutor有多種建構函式。例如:

int  corePoolSize  =    5;
int  maxPoolSize   =   10;
long keepAliveTime = 5000;

ExecutorService threadPoolExecutor =
        new ThreadPoolExecutor(
                corePoolSize,
                maxPoolSize,
                keepAliveTime,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>()
                );

除非你需要顯示的給ThreadPoolExecutor指定這些引數,通常使用java.util.concurrent.Executor類中的工廠方法來建立例項。