1. 程式人生 > >5.2探究執行器(Executors)

5.2探究執行器(Executors)

 你可以通過java提供的執行緒API來執行任務,像建立一個java.lang.Thread(newRunnableTask()).start();這個應用在可以執行多工的機器表現更為明顯(運行當前執行緒,建立執行緒和線上程池中隨意選擇執行執行緒)。

Note  一個任務就是一個物件,它會繼承java.lang.Runnable的介面(執行的任務)或者繼承java.util.concurrent.Callable的介面(立刻執行的任務)。

     併發工具包括高水平的執行器(executors),取代低水平的執行緒任務執行應用的工具。一個執行物件如果直接或間接繼承java.util.concurrent.Executor介面,那麼它就會從任務執行的器中解耦。

Note  這個執行器框架的運用介面,使其從任務執行中解耦任務,就像是集合框架運用介面從list、sets、queues和maps的介面中解耦是一樣的。解耦的好處是程式碼靈活的容易維護。

     使用Excutor時,我們需要宣告它的唯一方法execute(Runnable runnable),那麼就會執行runnable命名的任務。如果runnable為nul將會報錯:java.lang.NullPointExcetion,如果不能執行runnable的任務,那麼會報錯:java.util.concurrent.RejectedExecutionException。

Note   如果一個執行緒被強迫停止或不想接收新有任務時,將會丟擲RejectedExecutionException.當然,如果執行器沒有足夠的空間去儲存任務時,也會報出這個錯誤。(可能執行器使用一個有界的阻塞佇列去儲存任務和為個佇列滿了。)

     下面例子使用Executor,與前面講的new Thread(new RunnableTask()).start()的方法是一樣的。如:

      Executor executor = ···;//···代表著相同的執行器,建立executor.execute(new //RunnableTask());

      儘管Executor是很容易被使用的,但是這個介面存如下的限制:

        Executor的焦點是在Runnable上。因為Runnable的run()方法不能夠返回任何值,這裡也沒有任務方式讓一個執行的任務返回值給它的請求者。

        Executor沒有提供任何的方法,讓一個正在執行任務的執行緒取消或終止。

        Executor不能夠執行一個集合的執行任務。

        Executor不能夠讓一個應用強迫停止應用。

     以上的這些限制可以使用java.util.concurrent.ExcutorService的介面來解決,這個介面繼承Executor和它的應用是一個執行緒池。

表5-1 ExecutorService的方法

方法

說明

boolean awaitTermination(long timeout, TimeUnit unit)

阻塞直到所有任務的執行緒都完成。如果執行緒超時或當前執行緒強迫停止,那麼執行緒將會停止請求。當executor正常結束,那麼將會返回true,否則線上程執行期間超時,將返回false。當執行緒被打斷,那麼將會報錯:java.lang.InterruptedException.

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)

執行每一個即時響應的任務集合時,將會返回java.util.concurrent.Future的java.util.List。List的集合中包含著任務正常結束狀態或由於錯誤而報出異常的狀態。Futures的List是在相同的有序佇列中的,這些任務佇列是通過任務的迭代器(iterator)返回的。如果執行緒在等待時被打斷或任務任務沒有完成就被取消,那麼這個方法將會丟擲InterruptedException的錯誤。如果tasks為空或任務一個元素為空,那麼將會丟擲NullPointException。如果一個任務不能被排序來執行,那麼將會丟擲RejectedExecutionException。

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)

執行每一個即時響應的任務集合時,將會返回java.util.concurrent.Future的java.util.List。List的集合中包含著任務正常結束狀態或由於錯誤而報出異常的狀態,或超時終止。任務如果取消了,那麼將會終止。Futures的List是在相同的有序佇列中的,這些任務佇列是通過任務的迭代器(iterator)返回的。如果執行緒在等待時被打斷或任務任務沒有完成就被取消,那麼這個方法將會丟擲InterruptedException的錯誤。如果tasks為空或任務一個元素為空,或unit為空,那麼將會丟擲NullPointException。如果一個任務不能被排序來執行,那麼將會丟擲RejectedExecutionException。

<T> T invokeAny(Collection<? extends Callable<T>> tasks)

執行執行緒所給的任務,返回任何任務的成功執行。也就是說執行的任務不能報錯。如果執行的任務不正常或報錯,那麼任務(tasks)將不成功或不能正常結束。如果執行緒在等待時候被打斷,那麼將會丟擲InterruptedException;如果tasks或任何元素為空,那麼將會丟擲NullPointerException;當tasks中沒有元素,那麼將會丟擲java.lang.IllegalArgumentException.當沒有任務成功執行,將丟擲ExecutionException; 如果一個任務不能被排序來執行,那麼將會丟擲RejectedExecutionException。

<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)

執行執行緒所給的任務,返回任何任務的成功執行。也就是說執行的任務不能報錯。如果執行的任務不正常或報錯或超時,那麼任務(tasks)將不成功或不能正常結束。如果執行緒在等待時候被打斷,那麼將會丟擲InterruptedException;如果tasks或任何元素為null或unit為null,那麼將會丟擲NullPointerException;當tasks中沒有元素,那麼將會丟擲java.lang.IllegalArgumentException.當沒有任務成功執行,將丟擲ExecutionException; 如果一個任務不能被排序來執行,那麼將會丟擲RejectedExecutionException。當任務成功執行之前卻超時了,那麼將會丟擲TimeoutException。

boolean isShutdown()

當執行器被關閉,那麼將返回true,其它情況將返回false.

boolean isTerminated()

當所有任務都執行成功並且關閉,返回true;其它情況返回false。如果執行shutdown()或shutdownNow()方法,那麼這個方法將不會返回true.

void shutdown()

在所有的任務都提交之後,就開始的序的關閉任務,這期間將不會接受任何新有的任務。如果執行器(excutors)被關閉了,那麼執行這個方法是沒有任何作用的。如果之間的任務已經成功執行並提交了,那麼這個方法將不會等待,而是立即執行。如果需要等待,那麼就使用awaitTermination()的方法。

List<Runnable> shutdownNow()

嘗試去停止正在執行的任務,或停止正在等待的任務的執行緒,那麼將會返回的是等待執行任務的集合。這裡不能夠保證可以順利停止每一個任務,比如,應用被Thread.interrupt()取消了,那麼任務請求去停止,是沒有作用的。

<T> Future<T> submit(Callable<T> task)

提交一個即時響應的任務,將會返回Future的例項,代表著等待任務的處理結果返回。Future的例項中的get()方法,返回任務的成功執行情況。當任務(task)不能有序執行時,將會丟擲RejectedExecutionException.如果你想更快速的阻塞一個等待的任務時,那麼你可以用這個方法result = exec.submit(aCallable).get()

Future<?> submit(Runnable task)

提交一個執行的任務去執行,直到有結果返回。Future 的例項中的get()方法返回任務成功執行與否。如果這個任務不能有序的執行,那麼將會丟擲RejectedExecutionException;如果引數task為null,那麼將會丟擲NullPointerException.

<T> Future<T> submit(Runnable task, T result)

提交一個執行的任務去執行,直到有結果返回。Future 的例項中的get()方法返回任務成功執行與否,通過result來表現。如果這個任務不能有序的執行,那麼將會丟擲RejectedExecutionException;如果引數task為null,那麼將會丟擲NullPointerException.

表格5-1涉及到java.util.concurrent.TimeUnit,這個工具類中包含執行時間期間的粒度,其中包含的列舉有:DAYS、HOURS、MICROSECONDS、MILLISECONDS、MINUTES、NANOSECONDS、和SECONDS。與此同時,TimeUnit宣告時間轉換的工具(如:long toHours(long duration)),和執行和延遲的操作(如:voidsleep(long timeout))。

表格5-1涉及到即時響應的任務(callabletasks)。它不像Runnable中的run()方法,不能夠返回值和不能夠丟擲異常。Callable<V> 的 V call()方法將會返回值和丟擲異常的,因為它聲明瞭throws Exception。

最後,表格5-1涉及到了Future的介面,這個代表著同步計算的結果。它返回的結果是當作一個future,通常它是無效的,除非的相同的時刻的future.Future的屬性是Future<V>,提供方法去取消一個任務,因為它可以返回值和確保任務是否完成。表格5-2描述了Future的方法。

表格5-2 Future的方法

方法

說明

boolean cancel(boolean mayInterrupIfRunning)

嘗試取消執行的任務,和取消一個任務返回true;其它情況將返回false(這個任務在cancel()之前不能夠正常取消)。當一個任務已經完成、取消或其它原因不能被取消,那麼呼叫這個方法將會失敗。如果已經成功或任務尚未開始,那麼這個任務將不會執行。如果任務已經開始,那麼可以通過mayInterruptIfRunning宣告true或false,設定是否將任務停止或打斷。在執行之後,呼叫isDone()方法返回true,當呼叫cancel()方法返回true,那麼isCancelled()總是返回true。

V get()

如果需要任務成功執行然後返回結果,那麼可以呼叫此方法。當這個方法在取消之前,任務先取消了,那麼將會丟擲CancellaException.當任務的異常時,就會丟擲ExecutionException.如果當前的執行緒在等待期間被打斷,那麼將會丟擲InterruptedException

V get(long timeout, TimeUnit nuit)

等待timeout的時間,等待任務成功執行或有需要的話返回結果。當這個方法在取消之前,任務先取消了,那麼將會丟擲CancellaException.當任務的異常時,就會丟擲ExecutionException.如果當前的執行緒在等待期間被打斷,那麼將會丟擲InterruptedException如果方法超時了,那麼將要丟擲TimeoutException.

boolean isCancelled()

如果任務正常執行之前被取消了,那麼這個方法將會返回true,其它情況將會返回false。

boolean isDone()

如果任務結束,那麼返回true,其它情況返回false。結束情況,可能是非正常結束,異常或取消。這些情況,方法將返回true.

想像一下,你試圖寫這樣的一個應用,提供圖形表格讓使用者輸入單詞。使用者輸入單詞之後,就可以通過線上的詞典來查詢這個單詞,並將查詢的結果顯示給使用者。

因為線上的應用可能會因為網速而比較慢,而且使用者的互動需要請求和響應。你需要將請求的單詞(這個任務)放到執行器中執行,而且返回這個執行緒執行的結果。下面的例子將會使用ExecutorService、Callable和Future來實現這個功能。

ExecutorService executor = ...; // ... represents some executor creation
Future<String[]> taskFuture =
executor.submit(new Callable<String[]>()
{
@Override
public String[] call()
{
String[] entries = ...;
// Access online dictionaries
// with search word and populate
// entries with their resulting
// entries.
return entries;
}
});
// Do stuff.
String entries = taskFuture.get();


之後在相同的方法中包含一個執行器(executor),這個例子執行緒提交一個即時響應的任務給執行器(executor)。為了去控制任務的執行和獲取結果,submit()方法將會更快速地返回一個介面給Future物件。 

Note   java.util.concurrent.ScheduleExecutorService的介面繼承ExecutorService。和描述一個執行器讓你的任務列表執行,或執行緒被延遲了,那麼將會一直迴圈執行。

儘管你可以建立自己的Excutor、ExecutorService和ScheduledExcutorService的實現工具(例如,DirectExcutorimplements Excutor{@override public void execute(Runnable r){r.run();}} ----執行這個執行器(executor)將會呼叫執行緒)。這個簡單的例子也可以用java.concurrent.Executors來替換。

Note  如果你想要建立自己的ExcutorService的應用工具,那你java.util.concurrent.AbstractExecutorService和java.util.concurrent.FutureTask的類是非常有用的。

這個Executors的作用聲明瞭幾個類的方法可以返回多個的例項,如ExecutorService和ScheduledExecutorService的應用工具(或者其它的例項應用)。這個類的方法可以有如下的功效:

        建立和返回一個ExecutorService的例項,這個構造一個普通的應用或構造設定。

        建立和返回一個ScheduledExecutorSevice的例項,這個構造一個普通的應用或構造設定。

        建立和返回一個“擦除(wrapped)”的ExcutorService 或ScheduledExecutorService的例項;這個對於具體應用方法是難以見效的。

        建立和返回一個java.util.concurrent.ThreadFactory的例項(這個類是繼承ThreadFactory的介面),為了建立一個執行緒物件。

        建立和返回一個Callable的例項而不是一個關閉的物件,以至於執行這個方法可以請求Callable的引數(例如,ExecutorService 的submit(Callable)方法)。

例如,staticExecutorService newFixedThreadPool(int nThreads)建立一個執行緒池,這個執行緒池會在沒的邊界的佇列中使用固定的引數。在大多數情況下,nThread執行緒是活躍的程序任務。當所有執行緒都在執行時,如果有新的執行緒提交任務,那麼將處於等待狀態。

如果一個執行器(executor)關閉,那麼當前執行緒在執行期間就會失敗,所以當前執行的執行緒也就會終止。這個的話,執行緒池就會讓出這個空間給其它需要的執行緒來執行。執行緒明確停止了,但是執行緒池還是存在的。這個時候就會丟擲IllegalArgumentException,因為nThreads為‘0’或無效的值。

Note   建立一個執行緒而且要經常提交任務,那麼執行緒池作用就是用來消除這樣的高開銷。執行緒的建立不是低開銷的,因為建立多個執行緒會影響到系統的效能。

     你可能經常運用executors、runnable、callables和futures在檔案和網路的輸入/輸出的應用中。執行一個長的計算,你可能也會用到。下面的例子給出了運用executor、callable、future在尤拉公式(Euler)的計算。

package com.owen.thread.chapter5;

import java.math.BigDecimal;
import java.math.MathContext;
import java.math.RoundingMode;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class CalculateE
{

	final static int LASTITER = 17;

	public static void main(String[] args)
	{
		ExecutorService executor = Executors.newFixedThreadPool(1);
		Callable<BigDecimal> callable;
		callable = new Callable<BigDecimal>()
		{
			@Override
			public BigDecimal call()
			{
				MathContext mc = new MathContext(100, RoundingMode.HALF_UP);
				BigDecimal result = BigDecimal.ZERO;
				for (int i = 0; i <= LASTITER; i++)
				{
					BigDecimal factorial = factorial(new BigDecimal(i));
					BigDecimal res = BigDecimal.ONE.divide(factorial, mc);
					result = result.add(res);
				}
				return result;
			}

			public BigDecimal factorial(BigDecimal n)
			{
				if (n.equals(BigDecimal.ZERO))
					return BigDecimal.ONE;
				else
					return n.multiply(factorial(n.subtract(BigDecimal.ONE)));
			}
		};
		Future<BigDecimal> taskFuture = executor.submit(callable);
		try
		{
			while (!taskFuture.isDone())
				System.out.println("waiting");
			System.out.println(taskFuture.get());
		} catch (ExecutionException ee)
		{
			System.err.println("task threw an exception");
			System.err.println(ee);
		} catch (InterruptedException ie)
		{
			System.err.println("interrupted while waiting");
		}
		executor.shutdownNow();
	}
}



     預設的主執行緒執行main()方法,這個方法包含一個執行器(executor),通過Excutors的newFixedThreadPool()方法來呼叫。在例項化Callable的介面時,它包含了一個匿名內部類,和通過提交任務給執行器(executor),接收一個Future例項的請求。 

     提交任務之後,一個執行緒將會執行相同的工作直到它的請求等到任務的結果返回。     我模擬這樣的工作,是主執行緒會一直處於等待的狀態,直到Future的例項屬性isDone()方法返回true.(在實際的應用中,我們應該避免這個迴圈。)這裡最關鍵的是,主執行緒呼叫get()的方法去獲取結果,然後輸出。之後,主執行緒會停止執行器(executor)。

Caution  當一個應用執行完之後,停止這個執行器是非常關鍵的。否則這個應用可能不能終止。在上面的例子中呼叫shutdownNow()方法(你也可以用shutdown()方法)。

     callable的call()方法計算的公式是e = 1/0! + 1/1! + 1/2!...這個一系列的計算可以寫為1/n!,n的範圍是從0到無窮大。

     在call()的方法中,我們例項化java.math.MathContext,這個封裝了一個計算的精確度和舍入的方式。我選擇了100為最高的限制 e的精確度,和我選擇HALF_UP為舍入方式。

Tip   提高精確度,最終的結果將會起來起接近於e的值。

     call()方法中例項化一個java.math.BigDecimal的區域性變數result的例項值是BigDecimal.ZERO。它將放入到一個計算階梯因子的迴圈中,通過階梯因子分隔BigDecimal.ONE,和新增分隔結果給result。

     divide()方法將MathContext例項作為第二個引數去明確舍入的資訊。(如果我指定作為精確度,那麼將會出現nonterminating decimal expansion,也就是說商數分隔的結果是不正確的,如0.33333…。實際會丟擲java.lang.ArithmeticException。執行器(executor)還會再次丟擲ExecutionException)。

     執行上面的程式碼,輸入的結果可能是:

waiting

waiting

waiting

waiting

waiting

waiting

waiting

waiting

2.718281828459045070516047795848605061178979635251032698900735004065225042504843314055887974344245741730039454062711