定製併發類(二)定製ThreadPoolExecutor類
宣告:本文是《 Java 7 Concurrency Cookbook 》的第七章,作者: Javier Fernández González 譯者:許巧輝
定製ThreadPoolExecutor類
執行者框架(Executor framework)是一種機制,允許你將執行緒的建立與執行分離。它是基於Executor和ExecutorService介面及其實現這兩個介面的ThreadPoolExecutor類。它有一個內部的執行緒池和提供允許你提交兩種任務給執行緒池執行的方法。這些任務是:
- Runnable介面,實現沒有返回結果的任務
- Callable介面,實現返回結果的任務
在這兩種情況下,你只有提交任務給執行者。這個執行者使用執行緒池中的執行緒或建立一個新的執行緒來執行這些任務。執行者同樣決定任務被執行的時刻。
在這個指南中,你將學習如何覆蓋ThreadPoolExecutor類的一些方法,計算你在執行者中執行的任務的執行時間,並且將關於執行者完成它的執行的統計資訊寫入到控制檯。
準備工作
這個指南的例子使用Eclipse IDE實現。如果你使用Eclipse或其他IDE,如NetBeans,開啟它並建立一個新的Java專案。
如何做…
按以下步驟來實現的這個例子:
1.建立MyExecutor類,並指定它繼承ThreadPoolExecutor類。
public class MyExecutor extends ThreadPoolExecutor {
2.宣告一個私有的、ConcurrentHashMap型別的屬性,並引數化為String和Date類,名為startTimes。
private ConcurrentHashMap<String, Date> startTimes;
3.實現這個類的構造器,使用super關鍵字呼叫父類的構造器,並初始化startTime屬性。
public MyExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); startTimes=new ConcurrentHashMap<>(); }
4.覆蓋shutdown()方法。將關於已執行的任務,正在執行的任務和待處理的任務資訊寫入到控制檯。然後,使用super關鍵字呼叫父類的shutdown()方法。
@Override public void shutdown() { System.out.printf("MyExecutor: Going to shutdown.\n"); System.out.printf("MyExecutor: Executed tasks: %d\n",getCompletedTaskCount()); System.out.printf("MyExecutor: Running tasks: %d\n",getActiveCount()); System.out.printf("MyExecutor: Pending tasks: %d\n",getQueue().size()); super.shutdown(); }
5.覆蓋shutdownNow()方法。將關於已執行的任務,正在執行的任務和待處理的任務資訊寫入到控制檯。然後,使用super關鍵字呼叫父類的shutdownNow()方法。
@Override public List<Runnable> shutdownNow() { System.out.printf("MyExecutor: Going to immediately shutdown.\n"); System.out.printf("MyExecutor: Executed tasks: %d\n",getCompletedTaskCount()); System.out.printf("MyExecutor: Running tasks: %d\n",getActiveCount()); System.out.printf("MyExecutor: Pending tasks: %d\n",getQueue().size()); return super.shutdownNow(); }
6.覆蓋beforeExecute()方法。寫入一條資訊(將要執行任務的執行緒名和任務的雜湊編碼)到控制檯。在HashMap中,使用這個任務的雜湊編碼作為key,儲存開始日期。
@Override protected void beforeExecute(Thread t, Runnable r) { System.out.printf("MyExecutor: A task is beginning: %s : %s\n",t.getName(),r.hashCode()); startTimes.put(String.valueOf(r.hashCode()), new Date()); }
7.覆蓋afterExecute()方法。將任務的結果和計算任務的執行時間(將當前時間減去儲存在HashMap中的任務的開始時間)的資訊寫入到控制檯。
@Override protected void afterExecute(Runnable r, Throwable t) { Future<?> result=(Future<?>)r; try { System.out.printf("*********************************\n"); System.out.printf("MyExecutor: A task is finishing.\n"); System.out.printf("MyExecutor: Result: %s\n",result.get()); Date startDate=startTimes.remove(String.valueOf(r. hashCode())); Date finishDate=new Date(); long diff=finishDate.getTime()-startDate.getTime(); System.out.printf("MyExecutor: Duration: %d\n",diff); System.out.printf("*********************************\n"); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
8.建立一個SleepTwoSecondsTask類,它實現引數化為String類的Callable介面。實現call()方法。令當前執行緒睡眠2秒,返回轉換為String型別的當前時間。
public class SleepTwoSecondsTask implements Callable<String> { public String call() throws Exception { TimeUnit.SECONDS.sleep(2); return new Date().toString(); } }
9.實現這個例子的主類,通過建立Main類,並實現main()方法。
public class Main { public static void main(String[] args) {
10.建立一個MyExecutor物件,名為myExecutor。
MyExecutor myExecutor=new MyExecutor(2, 4, 1000, TimeUnit. MILLISECONDS, new LinkedBlockingDeque<Runnable>());
11.建立一個引數化為String類的Future物件的數列,用於儲存你將提交給執行者的任務的結果物件。
List<Future<String>> results=new ArrayList<>();¡;
12.提交10個Task物件。
for (int i=0; i<10; i++) { SleepTwoSecondsTask task=new SleepTwoSecondsTask(); Future<String> result=myExecutor.submit(task); results.add(result); }
13.使用get()方法,獲取前5個任務的執行結果。將這些資訊寫入到控制檯。
for (int i=0; i<5; i++){ try { String result=results.get(i).get(); System.out.printf("Main: Result for Task %d : %s\n",i,result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }
14.使用shutdown()方法結束這個執行者的執行。
myExecutor.shutdown();
15.使用get()方法,獲取後5個任務的執行結果。將這些資訊寫入到控制檯。
for (int i=5; i<10; i++){ try { String result=results.get(i).get(); System.out.printf("Main: Result for Task %d : %s\n",i,result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }
16.使用awaitTermination()方法等待這個執行者的完成。
try { myExecutor.awaitTermination(1, TimeUnit.DAYS); } catch (InterruptedException e) { e.printStackTrace(); }
17.寫入一條資訊表明這個程式執行的結束。
System.out.printf("Main: End of the program.\n");
它是如何工作的…
在這個指南中,我們已經通過繼承ThreadPoolExecutor類和覆蓋它的4個方法來實現我們自己定製的執行者。我們用beforeExecute()和afterExecute()方法來計算任務的執行時間。beforeExecute()方法是在任務執行之前被執行的。在這種情況下,我們使用HashMap來儲存任務的開始(執行)時間。afterExecute()方法是在任務執行之後被執行的。你可以從HashMap中獲取已完成任務的startTime(開始執行時間),然後,計算實際時間和那個時間(startTime)的差異來獲取任務的執行時間。你也覆蓋了shutdown()和shutdownNow()方法,將關於在執行者中已執行的任務的統計資訊寫入到控制檯:
- 對於已執行的任務,使用getCompletedTaskCount()方法(獲取)。
- 對於正在執行的任務,使用getActiveCount()方法(獲取)。
對於待處理任務,使用執行者儲存待處理任務的阻塞佇列的size()方法(獲取)。SleepTwoSecondsTask類,實現Callable介面,令它的執行執行緒睡眠2秒。Main類,使用它向你的執行者提交10個任務和演示其他類的特性。
執行這個程式,你將看到這個程式如何顯示正在執行的每個任務的時間跨度,和根據呼叫shutdown()方法統計執行者。
參見
- 在第7章,定製併發類中的在一個Executor物件中使用我們的ThreadFactory指南