1. 程式人生 > >java並發編程實戰:第六章----任務執行

java並發編程實戰:第六章----任務執行

獨立 設定 結構 conn web服務器 系統資源 AR 決定 pri

任務:通常是一些抽象的且離散的工作單元。大多數並發應用程序都是圍繞"任務執行"來構造的,把程序的工作分給多個任務,可以簡化程序的組織結構便於維護

一、在線程中執行任務

任務的獨立性:任務並不依賴於其他任務的狀態,結果和邊緣效應。獨立的任務可以實現並行執行

1、串行的執行任務

所有的任務放在單個線程中串行執行,程序簡單,安全性高,不涉及同步等情況,缺點也顯而易見,無法提高吞吐量和響應速度,適合任務數量很少並且執行時間很長時,或者只為單個用戶使用,並且該用戶每次只發出一個請求。

2、顯示的創建線程

為每一個請求創建一個線程,將任務的處理從主線程中分離出來,多個任務可以並行處理,充分利用了系統資源,提高吞吐量和相應速度,要求處理代碼必須是線程安全的

3、無限創建線程的不足

線程生命周期的開銷非常高;太多線程會消耗系統資源,空閑線程的內存空間占用,大量線程競爭CPU時產生其他性能開銷;穩定性:破壞這些限制底層操作系統對線程的限制很可能拋出OutOfMemoryError異常

總結:在一定範圍內,增加線程有助於提高吞吐量,但是再多就可能導致性能下降。

二、Executor框架

任務是一組邏輯單元,而線程是使任務異步執行的機制

  • Executor簡化了線程的管理工作,並且java.util.concurrent提供了一種靈活的線程池作為Executor框架的一部分。
  • Executor基於生產者—消費者設計模式,提交任務的操作單元相當於生產者(生成待完成的工作單元),執行任務的線程相當於消費者(執行完這些工作單元)
  • 將提交過程和執行過程解耦,用Runnable表示執行任務

1、基於Executor的web服務器

 1 public class ThreadPerTaskWebServer {
 2     private static final int NTHREADS = 100;
 3     /**
 4      * 創建固定線程數量的線程池
 5      */
 6     private static final Executor exec = 
 7             Executors.newFixedThreadPool(NTHREADS);
 8     public static void main(String[] args) throws IOException {
 9         ServerSocket server = new ServerSocket(80);
10         boolean listening = true;
11         while (listening){
12             final Socket connection = server.accept(); //阻塞等待客戶端連接請求
13             Runnable task = new Runnable() {
14                 @Override
15                 public void run() {
16                     handlerRequest(connection);
17                 }
18             };
19             exec.execute(task);
20         }
21         server.close();
22     }
23        ...
24 }

Executor創建了含有100個線程的線程池來處理任務

若想更改任務的處理方式,只需要使用不用的Executor實現

2、執行策略

  • 根據可用的資源和對服務質量的要求制定合理的執行策略
  • 將任務的提交與任務的執行解耦,有助於在部署階段選擇與硬件最匹配的執行策略

3、線程池:管理一組同構工作線程的資源池。

線程池vs工作隊列:工作者線程來自線程池,從工作隊列獲取任務,執行完畢回到線程池

優點:不僅可以在處理多個請求時分攤在線程創建和銷毀過程中產生的巨大開銷,另外一個好處就是當請求到達時,工作線程通常已經存在,因此不會由於等待線程創建而延遲任務的執行

  • newFixedThreadPool:固定長度的線程池,即線程池的規模有上限。
  • newCachedThreadPool:可緩存的線程池,如果線程池的當前規模超過了處理需求時,將回收空閑的線程,而當需求增加時,則可以添加新的線程,註意線程池的規模不存在任何限制。
  • newSingleThreadExecutor:單線程的Executor,通過創建單個工作者線程來串行的執行任務,如果此線程異常結束,Executor會創建另一個線程來代替。註意此模式能確保依照任務在隊列中的順序來串行執行(例如FIFO、LIFO、優先級)。
  • newScheduledThreadPool:創建固定長度的線程池,而且以延遲或者定時的方式來執行任務。

4、Executor的生命周期

newXXXThreadPool都是返回的ExecutorService

ExecutorService的生命周期主要有三種狀態:運行、關閉和已終止。

為了解決執行服務的生命周期問題,ExecutorService擴展了Executor接口,添加了管理生命周期的方法。

  • shutdown:關閉線程池,不再接受新任務,等待已經提交的任務完成
  • shutdownNow:強制立即關閉線程池,返回的等待執行的任務列表,執行中的任務拋出中斷異常
  • isShutdown:是否處於正在關閉狀態
  • isTerminated:是否結束
  • awaitTerminated:阻塞等待關閉完成

5、延遲任務和周期任務

通過ScheduledThreadPoolExecutor來代替Timer,TimerTask。

  • Timer基於絕對時間,ScheduledThreadPoolExecutor基於相對時間。
  • Timer執行所有定時任務只能創建一個線程,若某個任務執行時間過長,容易破壞其他TimerTask的定時精確性。
  • Timer不捕獲異常,Timetask拋出未檢查的異常會終止定時器線程,已經調度但未執行的TimerTask將不會再執行,新的任務也不會被調度,出現"線程泄漏"
 1 public class OutOfTime {
 2     public static void main(String[] args) throws InterruptedException {
 3         Timer timer = new Timer();
 4         timer.schedule(new ThrowTask(), 1); //第一個任務拋出異常
 5         Thread.sleep(1000); 
 6         timer.schedule(new ThrowTask(), 1); //第二個任務將不能再執行, 並拋出異常Timer already cancelled.
 7         Thread.sleep(5000);
 8         System.out.println("end.");
 9     }
10     
11     static class ThrowTask extends TimerTask{
12 
13         @Override
14         public void run() {
15             throw new RuntimeException("test timer‘s error behaviour");
16         }
17     }
18 }

三、找出可利用的並行性

1、攜帶結果的任務Callable與Future

  Runnable的缺陷:不能返回一個值,或拋出一個異常

  Callable和Runnable都描述抽象的計算任務,Callable可以返回一個值,並可以拋出一個異常

Executor執行任務的4個生命周期:創建,提交,開始,完成。Executor框架中,可以取消已提交但未開始執行的任務,對於已經開始執行的任務,只能當他們能響應中斷時,才能取消,取消已經完成的任務不會有影響。

Future表示了一個任務的生命周期,提供了相應的方法判斷是否完成或被取消以及獲取執行結果

  • get方法:若任務完成,返回結果或拋出ExecutionException;若任務取消,拋出CancellationException;若任務沒完成,阻塞等待結果
  • ExecutorService的submit方法提交一個Callable任務,並返回一個Future來判斷執行狀態並獲取執行結果
  • 安全發布過程:將任務從提交線程穿個執行線程,結果從計算線程到調用get方法的線程

2、異構任務並行化中存在的局限:當異構任務之間的執行效率懸殊很大時,對於整體的性能提升來看並不是很有效。

3、完成服務CompletionService(Executor+BlockingQueue)

  使用BlockingQueue保存計算結果(Future),使用take和poll獲取,計算部分同樣委托給Executor

4、為任務設定時限:如果超出期望執行時間,將不要其結果

小結:通過圍繞任務執行來設計應用程序,可以簡化開發過程,並有助於實現並發。Executor框架將任務提交與執行策略解耦開來,同時還支持多種不同類型的執行策略。當需要創建線程來執行任務時,可以考慮使用Executor。要想在將應用程序分解為不同的任務時獲得最大的好處,必須定義清晰的任務邊界。某些應用程序中存在著比較明顯的任務邊界,而在其他一些程序中則需要進一步分析才能揭示出粒度更細的並行性。

方法小結

Future的get、cancel、isCancelled、isDone方法

get:在任務完成前一直阻塞。會拋出三種異常:CancellationException - 如果計算被取消、ExecutionException - 如果計算拋出異常、InterruptedException - 如果當前的線程在等待時被中斷。

get(long timeout, TimeUnit unit):在超時之前且任務未完成則一直阻塞。除拋出以上三種異常

cancel(boolean mayInterruptIfRunning):試圖取消對此任務的執行。如果任務已完成、或已取消,或者由於某些其他原因而無法取消,則此嘗試將失敗。當調用cancel時,如果調用成功,而此任務尚未啟動,則此任務將永不運行。如果任務已經啟動,則mayInterruptIfRunning參數決定了是否調用運行任務的線程的interrupt操作。

isCancelled:如果在任務正常完成前將其取消,則返回true

isDone:正常終止、異常或取消而完成,在所有這些情況中,此方法都將返回 true
ExecutorService的submit、invokeAll、invokeAny方法

ExecutorService的有三個重載的submit方法:

1、 可以接收Runnable或Callable類型的任務,返回Future<?>類型的Future的get返回null。
2、 這三個方法都將提交的任務轉換成了Future的實現類FutureTask實例,並作為submit的返回實例。
3、 另外調用這三個方法不會阻塞,不像invokeAll那樣要等到所有任務完成後才返回,與不像invokeAny那樣要等到有一個任務完成後才返回Future。
4、 這個三方法會調用Executor的execute來完成,因為Executor的execute會拋出RejectedExecutionException - 如果不能接受執行此任務、NullPointerException - 如果命令為 null這兩個運行進異常,所以這三個方法也會拋出這兩個異常。

T invokeAny(Collection<Callable<T>> tasks):
1、 只要某個任務已成功完成(也就是未拋出異常,這與任務完成概念不一樣:任務完成是指定Future的isDone返回true,有可能是拋出異常後進行完成狀態),才返回這個結果。一旦正常或異常返回後,則取消尚未完成的任務(即任務所運行的線程處理中斷狀態,一旦在它上面出現可中斷阻塞的方法調用,則會拋出中斷異常)。
2、 此方法會阻塞到有一個任務完成為止(正常完成或異常退出)。
3、 也是調用Executor的execute來完成
4、 調用get不會阻塞

invokeAny(Collection<Callable<T>> tasks, long timeout, TimeUnit unit):
1、 只要在給定的超時期滿前某個任務已成功完成(也就是invokeAny方法不能拋出異常,包括Future.get所拋的異常),則返回其結果。一旦正常或異常返回後,則取消尚未完成的任務。
2、 此方法會阻塞到有一個任務完成為止(正常完成或異常退出)。
3、 也是調用Executor的execute來完成
4、 調用get不會阻塞

List<Future<T>> invokeAll(Collection<Callable<T>> tasks):
1、 只有當所有任務完成時,才返回保持任務狀態和結果的 Future 列表。返回列表的所有元素的 Future.isDone() 為 true。註意,可以正常地或通過拋出異常來已完成任務。
2、 此方法會阻塞到所有任務完成為止(正常完成或異常退出)。
3、 也是調用Executor的execute來完成,如果任務執行過程中拋出了其他異常,則方法會異常退出,且取消所有其他還未執行完成的任務。
4、 返回的列表中的Future都是已經完成的任務,get時不會再阻塞

invokeAll(Collection<Callable<T>> tasks, long timeout, TimeUnit unit):
1、 當所有任務完成或超時期滿時(無論哪個首先發生),返回保持任務狀態和結果的 Future 列表(如果是超時返回的列表,則列表中的會包括這些還未執行完的任務,使用get獲取結果時可能會拋出CancellationException異常)。返回列表的所有元素的 Future.isDone() 為 true。一旦返回後,即取消尚未完成的任務。註意,可以正常地或通過拋出異常來完成任務。
2、 此方法會阻塞到所有任務完成為止(正常完成或異常退出或超時)。
3、 也是調用Executor的execute來完成,如果任務執行過程中拋出了其他異常,則方法會異常退出,且取消所有其他還未執行完成的任務。
4、 返回的列表中的Future中會有因超時執行任務時異常而未執行完的任務,get時會拋出CancellationException或ExecutionException,當然所有的Future的get也不會阻塞。

java並發編程實戰:第六章----任務執行