1. 程式人生 > >java多執行緒管理 concurrent包用法詳解

java多執行緒管理 concurrent包用法詳解

 

    我們都知道,在JDK1.5之前,Java中要進行業務併發時,通常需要有程式設計師獨立完成程式碼實現,當然也有一些開源的框架提供了這些功能,但是這些依然沒有JDK自帶的功能使用起來方便。而當針對高質量Java多執行緒併發程式設計時,為防止死蹦等現象的出現,比如使用java之前的wait()、notify()和synchronized等,每每需要考慮效能、死鎖、公平性、資源管理以及如何避免執行緒安全性方面帶來的危害等諸多因素,往往會採用一些較為複雜的安全策略,加重了程式設計師的開發負擔.萬幸的是,在JDK1.5出現之後,Sun大神(Doug Lea)終於為我們這些可憐的小程式設計師推出了java.util.concurrent工具包以簡化併發完成。開發者們藉助於此,將有效的減少競爭條件(race conditions)和死鎖執行緒。concurrent包很好的解決了這些問題,為我們提供了更實用的併發程式模型。

Executor                  :具體Runnable任務的執行者。
ExecutorService           :一個執行緒池管理者,其實現類有多種,我會介紹一部分。我們能把Runnable,Callable提交到池中讓其排程。
Semaphore                 :一個計數訊號量
ReentrantLock             :一個可重入的互斥鎖定 Lock,功能類似synchronized,但要強大的多。
Future                    :是與Runnable,Callable進行互動的介面,比如一個執行緒執行結束後取返回的結果等等,還提供了cancel終止執行緒。
BlockingQueue             :阻塞佇列。
CompletionService         : ExecutorService的擴充套件,可以獲得執行緒執行結果的
CountDownLatch            :一個同步輔助類,在完成一組正在其他執行緒中執行的操作之前,它允許一個或多個執行緒一直等待。 
CyclicBarrier             :一個同步輔助類,它允許一組執行緒互相等待,直到到達某個公共屏障點 
Future                    :Future 表示非同步計算的結果。
ScheduledExecutorService :一個 ExecutorService,可安排在給定的延遲後執行或定期執行的命令。
接下來逐一介紹
Executors主要方法說明
newFixedThreadPool(固定大小執行緒池)
建立一個可重用固定執行緒集合的執行緒池,以共享的無界佇列方式來執行這些執行緒(只有要請求的過來,就會在一個佇列裡等待執行)。如果在關閉前的執行期間由於失敗而導致任何執行緒終止,那麼一個新執行緒將代替它執行後續的任務(如果需要)。
newCachedThreadPool(無界執行緒池,可以進行自動執行緒回收)
建立一個可根據需要建立新執行緒的執行緒池,但是在以前構造的執行緒可用時將重用它們。對於執行很多短期非同步任務的程式而言,這些執行緒池通常可提高程式效能。呼叫 execute 將重用以前構造的執行緒(如果執行緒可用)。如果現有執行緒沒有可用的,則建立一個新執行緒並新增到池中。終止並從快取中移除那些已有 60 秒鐘未被使用的執行緒。因此,長時間保持空閒的執行緒池不會使用任何資源。注意,可以使用 ThreadPoolExecutor 構造方法建立具有類似屬性但細節不同(例如超時引數)的執行緒池。
newSingleThreadExecutor(單個後臺執行緒)
建立一個使用單個 worker 執行緒的 Executor,以無界佇列方式來執行該執行緒。(注意,如果因為在關閉前的執行期間出現失敗而終止了此單個執行緒,那麼如果需要,一個新執行緒將代替它執行後續的任務)。可保證順序地執行各個任務,並且在任意給定的時間不會有多個執行緒是活動的。與其他等效的 newFixedThreadPool(1) 不同,可保證無需重新配置此方法所返回的執行程式即可使用其他的執行緒。
這些方法返回的都是ExecutorService物件,這個物件可以理解為就是一個執行緒池。
這個執行緒池的功能還是比較完善的。可以提交任務submit()可以結束執行緒池shutdown()。
01
import java.util.concurrent.ExecutorService;
02 import java.util.concurrent.Executors;
03 public
 class MyExecutor extends Thread {
04 private int index;
05 public MyExecutor(
int i){
06     this.index=i;
07 }
08 public void run(){
09     try{
10      System.out.println("["+this.index+"] start....");
11      Thread.sleep((int)(Math.random()*1000));
12      System.out.println("["+this.index+"] end.");
13     }
14     catch(Exception e){
15      e.printStackTrace();
16     }
17 }
18 public static void main(String args[]){
19     ExecutorService service=Executors.newFixedThreadPool(4);
20     for(int i=0;i<10;i++){
21      service.execute(new MyExecutor(i));
22      //service.submit(new MyExecutor(i));
23     }
24     System.out.println("submit finish");
25     service.shutdown();
26 }
27 }

雖然列印了一些資訊,但是看的不是非常清晰,這個執行緒池是如何工作的,我們來將休眠的時間調長10倍。
Thread.sleep((int)(Math.random()*10000));
再來看,會清楚看到只能執行4個執行緒。當執行完一個執行緒後,才會又執行一個新的執行緒,也就是說,我們將所有的執行緒提交後,執行緒池會等待執行完最後shutdown。我們也會發現,提交的執行緒被放到一個“無界佇列裡”。這是一個有序佇列(BlockingQueue,這個下面會說到)。
另外它使用了Executors的靜態函式生成一個固定的執行緒池,顧名思義,執行緒池的執行緒是不會釋放的,即使它是Idle。
這就會產生效能問題,比如如果執行緒池的大小為200,當全部使用完畢後,所有的執行緒會繼續留在池中,相應的記憶體和執行緒切換(while(true)+sleep迴圈)都會增加。
如果要避免這個問題,就必須直接使用ThreadPoolExecutor()來構造。可以像通用的執行緒池一樣設定“最大執行緒數”、“最小執行緒數”和“空閒執行緒keepAlive的時間”。

這個就是執行緒池基本用法。
Semaphore
一個計數訊號量。從概念上講,訊號量維護了一個許可集合。如有必要,在許可可用前會阻塞每一個 acquire(),然後再獲取該許可。每個 release() 新增一個許可,從而可能釋放一個正在阻塞的獲取者。但是,不使用實際的許可物件,Semaphore 只對可用許可的號碼進行計數,並採取相應的行動。
Semaphore 通常用於限制可以訪問某些資源(物理或邏輯的)的執行緒數目。例如,下面的類使用訊號量控制對內容池的訪問:
這裡是一個實際的情況,大家排隊上廁所,廁所只有兩個位置,來了10個人需要排隊。
01 import java.util.concurrent.ExecutorService;
02 import java.util.concurrent.Executors;
03 import java.util.concurrent.Semaphore;
04 public class MySemaphore extends Thread {
05 Semaphore position;
06 private int id;
07 public MySemaphore(int i,Semaphore s){
08     this.id=i;
09     this.position=s;
10 }
11 public void run(){
12     try{
13      if(position.availablePermits()>0){
14       System.out.println("顧客["+this.id+"]進入廁所,有空位");
15      }
16      else{
17       System.out.println("顧客["+this.id+"]進入廁所,沒空位,排隊");
18      }
19      position.acquire();
20      System.out.println("顧客["+this.id+"]獲得坑位");
21      Thread.sleep((int)(Math.random()*1000));
22      System.out.println("顧客["+this.id+"]使用完畢");
23      position.release();
24     }
25     catch(Exception e){
26      e.printStackTrace();
27     }
28 }
29 public static void main(String args[]){
30     ExecutorService list=Executors.newCachedThreadPool();
31     Semaphore position=new Semaphore(2);
32     for(int i=0;i<10;i++){
33      list.submit(new MySemaphore(i+1,position));
34     }
35     list.shutdown();
36     position.acquireUninterruptibly(2);
37     System.out.println("使用完畢,需要清掃了");
38     position.release(2);
39 }
40 }
ReentrantLock
一個可重入的互斥鎖定 Lock,它具有與使用 synchronized 方法和語句所訪問的隱式監視器鎖定相同的一些基本行為和語義,但功能更強大。
ReentrantLock 將由最近成功獲得鎖定,並且還沒有釋放該鎖定的執行緒所擁有。當鎖定沒有被另一個執行緒所擁有時,呼叫 lock 的執行緒將成功獲取該鎖定並返回。如果當前執行緒已經擁有該鎖定,此方法將立即返回。可以使用 isHeldByCurrentThread() 和 getHoldCount() 方法來檢查此情況是否發生。
此類的構造方法接受一個可選的公平引數。
當設定為 true時,在多個執行緒的爭用下,這些鎖定傾向於將訪問權授予等待時間最長的執行緒。否則此鎖定將無法保證任何特定訪問順序。
與採用預設設定(使用不公平鎖定)相比,使用公平鎖定的程式在許多執行緒訪問時表現為很低的總體吞吐量(即速度很慢,常常極其慢),但是在獲得鎖定和保證鎖定分配的均衡性時差異較小。不過要注意的是,公平鎖定不能保證執行緒排程的公平性。因此,使用公平鎖定的眾多執行緒中的一員可能獲得多倍的成功機會,這種情況發生在其他活動執行緒沒有被處理並且目前並未持有鎖定時。還要注意的是,未定時的 tryLock 方法並沒有使用公平設定。因為即使其他執行緒正在等待,只要該鎖定是可用的,此方法就可以獲得成功。
建議總是 立即實踐,使用 try 塊來呼叫 lock,在之前/之後的構造中,最典型的程式碼如下: 
01 class X {
02     private final ReentrantLock lock = new ReentrantLock();
03     // ...
04     public void m() {
05       lock.lock(); // block until condition holds
06       try {
07         // ... method body
08       } finally {
09         lock.unlock()
10       }
11     }
12 }
我的例子:
01 import java.util.concurrent.ExecutorService;
02 import java.util.concurrent.Executors;
03 import java.util.concurrent.locks.ReentrantLock;
04 public class MyReentrantLock extends Thread{
05 TestReentrantLock lock;
06 private int id;
07 public MyReentrantLock(int i,TestReentrantLock test){
08     this.id=i;
09     this.lock=test;
10 }
11 public void run(){
12     lock.print(id);
13 }
14 public static void main(String args[]){
15     ExecutorService service=Executors.newCachedThreadPool();
16     TestReentrantLock lock=new TestReentrantLock();
17     for(int i=0;i<10;i++){
18      service.submit(new MyReentrantLock(i,lock));
19     }
20     service.shutdown();
21 }
22 }
23 class TestReentrantLock{
24 private ReentrantLock lock=new ReentrantLock();
25 public void print(int str){
26     try{
27      lock.lock();
28      System.out.println(str+"獲得");
29      Thread.sleep((int)(Math.random()*1000));
30     }
31     catch(Exception e){
32      e.printStackTrace();
33     }
34     finally{
35      System.out.println(str+"釋放");
36      lock.unlock();
37     }
38 }
39 }
BlockingQueue
支援兩個附加操作的 Queue,這兩個操作是:檢索元素時等待佇列變為非空,以及儲存元素時等待空間變得可用。
BlockingQueue 不接受 null 元素。試圖 add、put 或 offer 一個 null 元素時,某些實現會丟擲 NullPointerException。null 被用作指示 poll 操作失敗的警戒值。
BlockingQueue 可以是限定容量的。它在任意給定時間都可以有一個 remainingCapacity,超出此容量,便無法無阻塞地 put 額外的元素。
沒有任何內部容量約束的 BlockingQueue 總是報告 Integer.MAX_VALUE 的剩餘容量。
BlockingQueue 實現主要用於生產者-使用者佇列,但它另外還支援 Collection 介面。因此,舉例來說,使用 remove(x) 從佇列中移除任意一個元素是有可能的。
然而,這種操作通常不 會有效執行,只能有計劃地偶爾使用,比如在取消排隊資訊時。
BlockingQueue 實現是執行緒安全的。所有排隊方法都可以使用內部鎖定或其他形式的併發控制來自動達到它們的目的。
然而,大量的 Collection 操作(addAll、containsAll、retainAll 和 removeAll)沒有 必要自動執行,除非在實現中特別說明。
因此,舉例來說,在只添加了 c 中的一些元素後,addAll(c) 有可能失敗(丟擲一個異常)。
BlockingQueue 實質上不 支援使用任何一種“close”或“shutdown”操作來指示不再新增任何項。
這種功能的需求和使用有依賴於實現的傾向。例如,一種常用的策略是:對於生產者,插入特殊的 end-of-stream 或 poison 物件,並根據使用者獲取這些物件的時間來對它們進行解釋。
下面的例子演示了這個阻塞佇列的基本功能。
01 import java.util.concurrent.BlockingQueue;
02 import java.util.concurrent.ExecutorService;
03 import java.util.concurrent.Executors;
04 import java.util.concurrent.LinkedBlockingQueue;
05 public class MyBlockingQueue extends Thread {
06 public static BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);
07 private int index;
08 public MyBlockingQueue(int i) {
09    this.index = i;
10 }
11 public void run() {
12    try {
13     queue.put(String.valueOf(this.index));
14     System.out.println("{" + this.index + "} in queue!");
15    } catch (Exception e) {
16     e.printStackTrace();
17    }
18 }
19 public static void main(String args[]) {
20