在上文已經說明,委託是構造執行緒安全類的一個最有效策略,也就是讓現有的執行緒安全類管理所有的狀態即可。以下將介紹這些基礎構建模組。

同步容器類

同步容器類包括Vector和Hashtable以及由Collections.synchronizedXxx等工廠方法建立的同步封裝器類。這些類實現執行緒安全的方式是:將它們的狀態封裝起來,並對每個公有方法都進行同步,使得每次只有一個執行緒能訪問容器的狀態。同步容器對所有容器狀態的訪問都序列化,嚴重降低了併發性;當多個執行緒競爭鎖時,吞吐量嚴重下降。

同步容器類存在的問題

同步容器類都是執行緒安全的,但是在某些情況下可能需要額外的客戶端加鎖來保護複合操作。

比如,在Vecotr中,getLast()和deleteLast()操作,如果是在多執行緒的環境下執行,如果不加鎖,會產生異常情況。一個執行緒在getLast()後,另一個執行緒deleteLast(),然後該執行緒繼續執行,進行deleteLast()操作,此時會丟擲下標越界的異常。

又比如,在迭代的過程中,使用get(index)的操作,如果有多個執行緒執行,可能會刪除其中元素,同樣會造成異常。

對於如上的情況,我們需要通過客戶端加鎖來解決執行緒安全的問題。如在迭代時加鎖:

synchronized(vector){
    for(int i=0;i<vector.size();i++){
        vector.get(i);
    }
}
複製程式碼

迭代器

在迭代或者for-each迴圈語法時,對容器類進行迭代的標準方式都是使用Iterator。然而,在設計同步容器類的迭代器時並沒有考慮到併發修改的問題,並且它們表現出的行為時“及時失敗”的,也就是當它們發現容器在迭代過程中被修改時,就會丟擲ConcurrentModificationException。

如果在迭代期間,對容器加鎖,首先會降低效率,提高執行緒的等待時間;然後還可能會產生死鎖;降低了吞吐量和CPU的利用率。

如果不希望在迭代期間加鎖,可以使用克隆容器的方法,並在克隆副本上進行迭代。

加鎖可以防止迭代器丟擲ConcurrentModificationException,但是要在所有對容器進行迭代的地方都要加鎖。如hashCode,equals,containsAll,removeAll,retainAll等方法,在以容器為引數時,都會對容器進行迭代。這些間接的迭代操作可能丟擲ConcurrentModificationException。

併發容器

Java 5.0提供了多種併發容器類來改進同步容器的效能。同步容器對所有容器狀態的訪問都序列化,嚴重降低了併發性;當多個執行緒競爭鎖時,吞吐量嚴重下降。

併發容器是針對多個執行緒併發訪問設計的。通過併發容器來替代同步容器,可以極大地提高伸縮性並降低風險。併發容器包括ConcurrentHashMap(替代Map),CopyOnWriteArrayList(替代List),ConcurrentLinkedQueue,BlockingQueue等等。

ConcurrentHashMap

同步容器類在執行每個操作期間都持有一個鎖。ConcurrentHashMap採用了不同的加鎖策略來提供更高的併發性和伸縮性。它並不是將每個方法都在同一個鎖上同步,而是使用一種粒度更細的加鎖機制來實現更大程度的共享,這種機制稱為分段鎖。

分段鎖機制使得任意數量的讀取執行緒可以併發訪問Map,執行讀取操作的執行緒和執行寫入操作的執行緒可以併發訪問Map,並且一定數量的寫入執行緒可以併發地修改Map,因此提高了併發訪問的吞吐量。

併發容器增強了同步容器類,它們提供的迭代器不會丟擲ConcurrentModificationException,因此不需要在迭代過程中對容器加鎖。其迭代器具有弱一致性,可以容忍併發的修改,在建立迭代器時會遍歷已有元素,並可以(但是不保證)在迭代器被構造後將修改操作反映給容器。size(),isEmpty()等方法返回的是一個近似值。

由於ConcurrentHashMap與Hashtable和synchronizedMap有更多的優勢,因此大多數情況應該使用併發容器類,至於當需要對整個容器加鎖進行獨佔訪問時,才應該放棄使用併發容器。

注意,此時不能再通過客戶端加鎖新建新的原子操作了,客戶端只能對併發容器自身加鎖,但併發容器內部使用的並不是自身鎖。

CopyOnWriteArrayList

寫入時複製容器,在每次修改時都會加鎖並建立和重新發佈一個新的容器副本,直接修改容器引用,從而實現可見性。 寫操作在一個複製的陣列上進行,讀操作還是在原始陣列中進行,讀寫分離,互不影響。寫操作需要加鎖,防止併發寫入時導致寫入資料丟失。寫操作結束之後需要把原始陣列指向新的複製陣列。

CopyOnWriteArrayList 在寫操作的同時允許讀操作,大大提高了讀操作的效能,因此很適合讀多寫少的應用場景。 但是 CopyOnWriteArrayList 有其缺陷:

  • 記憶體佔用:在寫操作時需要複製一個新的陣列,使得記憶體佔用為原來的兩倍左右;
  • 資料不一致:讀操作不能讀取實時性的資料,因為部分寫操作的資料還未同步到讀陣列中。

阻塞佇列

阻塞佇列支援生產者-消費者模式。簡化了開發過程,消除了生產者和消費者之間的程式碼依賴性。阻塞佇列簡化了生產者-消費者設計的實現過程。一種常見的生產者-消費者設計模式就是執行緒池與工作佇列的組合。

阻塞佇列提供了四種處理方法:

  1. 丟擲異常,使用add(e)插入,remove()刪除,element()查詢。當阻塞佇列滿時,插入元素;當佇列空,刪除元素都會丟擲異常。
  2. 返回特殊值,使用offer(e)插入,poll()刪除,peek()查詢。插入時,如果成功返回true,移除時,如果沒有對應的元素返回null。
  3. 阻塞,使用put(e)插入,take()刪除。佇列滿,插入元素時會阻塞;佇列空,取元素會阻塞。
  4. 超時退出:使用offer(e,time,unit)插入,poll(time,unit)刪除。當佇列滿時,會阻塞,超過一定的時間,執行緒會退出。

阻塞佇列有多種實現。

  • ArrayBlokcingQueue和LinkedBlockingQueue分別是陣列和連結串列結構組成的有界的FIFO阻塞佇列。
  • PriorityBlockingQueue是一個支援優先順序排序的無界阻塞佇列。
  • SynchronousQueue是一個不儲存元素的阻塞佇列,它不會為佇列中元素維護儲存空間。
  • LinkedTransferQueue:一個由連結串列結構組成的無界阻塞佇列。
  • LinkedBlockingDeque:一個由連結串列結構組成的雙向阻塞佇列。

雙端佇列與工作密取

Java 6提供了Dqueue和BlockingDeque,是雙端佇列,實現了在佇列頭和佇列尾的高效插入和移除。雙端佇列適用於工作密取模式。在工作密取中,每個消費者都有各自的雙端佇列。如果一個消費者完成了自己的雙端佇列的全部工作,可以從其他消費者雙端佇列末尾祕密的獲取工作。因為工作者執行緒不會再單個共享的任務佇列上發生競爭。適用於既是生產者又是消費者問題。

阻塞方法與中斷方法

執行緒會阻塞或暫停執行。被阻塞的執行緒必須等待某個不受它控制的事件發生後才能繼續執行。當在程式碼中呼叫一個可以丟擲InterruptedException的方法時,自己的方法就程式設計了阻塞方法,必須處理中斷的響應。如果這個方法被中斷,那麼它將努力提前結束狀態。

處理中斷的響應有兩種基本選擇:

  1. 傳遞InterruptedException,把該異常丟擲給方法的呼叫者。
  2. 恢復中斷,捕獲異常,並呼叫當前執行緒的interrupt方法恢復中斷,引發更高層的程式碼中斷。
public void run(){
    try{
        something();
    }catch(InterruptedException e){
        Thread.currentThread().interrupt();
    }
}
複製程式碼

同步工具類

同步工具類可以是任何一個物件,只要它根據其自身的狀態來協調執行緒的控制流。包括阻塞佇列,訊號量,柵欄以及閉鎖。

閉鎖

閉鎖用來確保某些活動直到其他活動都完成了才繼續執行。如果有多個執行緒,其中一個執行緒需要等到其他所有執行緒活動結束後才繼續執行,使用閉鎖。

CountDownLatch是一種閉鎖的實現,可以使得一個或者多個執行緒等待一組事情發生。包括一個計數器,表示需要等待的事件數量;countDown方法用來遞減計數器,表示有一個事件已經發生了;await方法等待計數器為0,表示所有需要等待的事情已經發生。

// 初始化閉鎖,並設定資源個數
CountDownLatch latch = new CountDownLatch(2);

Thread t1 = new Thread( new Runnable(){
    public void run(){
        // 載入資源1
        載入資源的程式碼……
        // 本資源載入完後,閉鎖-1
        latch.countDown();
    }
} ).start();

Thread t2 = new Thread( new Runnable(){
    public void run(){
        // 載入資源2
        資源載入程式碼……
        // 本資源載入完後,閉鎖-1
        latch.countDown();
    }
} ).start();

Thread t3 = new Thread( new Runnable(){
    public void run(){
        // 本執行緒必須等待所有資源載入完後才能執行
        latch.await();
        // 當閉鎖數量為0時,await返回,執行接下來的任務
        任務程式碼……
    }
} ).start();
複製程式碼

柵欄(同步屏障)

閉鎖是一次性物件,一旦進入終止狀態,就不能被重置。柵欄類似於閉鎖,能阻塞一組程序直到某個時間發生。柵欄與閉鎖的區別在於,所有執行緒必須同時到達柵欄位置,才能繼續執行。

若有多條執行緒,他們到達屏障時將會被阻塞,只有當所有執行緒都到達屏障時才能開啟屏障,所有執行緒同時執行,若有這樣的需求可以使用同步屏障。此外,當屏障開啟的同時還能指定執行的任務。

閉鎖只會阻塞一條執行緒,目的是為了讓該條任務執行緒滿足條件後執行; 而同步屏障會阻塞所有執行緒,目的是為了讓所有執行緒同時執行(實際上並不會同時執行,而是儘量把執行緒啟動的時間間隔降為最少)。

// 建立同步屏障物件,並制定需要等待的執行緒個數 和 開啟屏障時需要執行的任務
CyclicBarrier barrier = new CyclicBarrier(3,new Runnable(){
    public void run(){
        //當所有執行緒準備完畢後觸發此任務
    }
});

// 啟動三條執行緒
for( int i=0; i<3; i++ ){
    new Thread( new Runnable(){
        public void run(){
            // 等待,(每執行一次barrier.await,同步屏障數量-1,直到為0時,開啟屏障)
            barrier.await();
            // 任務
            任務程式碼……
        }
    } ).start();
}
複製程式碼

訊號量

訊號量用於控制同時訪問某個特定資源的運算元量,或者執行某個指定操作的數量。計數訊號量還可以用來實現某種資源池,或者對容器施加邊界。

訊號量可以用於實現資源池,也可以用於將容器變為有界阻塞容器。訊號量管理著一組虛擬的許可,在執行操作時首先獲取許可,並在使用以後釋放許可。如果沒有許可,將阻塞直到有許可或被中斷,超時。

訊號量的使用場景是,有m個資源,n個執行緒,且n>m,同一時刻只能允許m條執行緒訪問資源。

// 建立訊號量物件,並給予3個資源
Semaphore semaphore = new Semaphore(3);

// 開啟10條執行緒
for ( int i=0; i<10; i++ ) {
    new Thread( new Runnbale(){
        public void run(){
            // 獲取資源,若此時資源被用光,則阻塞,直到有執行緒歸還資源
            semaphore.acquire();
            // 任務程式碼
            ……
            // 釋放資源
            semaphore.release();
        }
    } ).start();
}
複製程式碼

FutureTask

可以用作閉鎖,是一種可以生成結果的Runnable,可以處於以下三種狀態:等待執行,正在執行和執行完成。當FutureTask進入完成狀態後,它會停止在這個狀態上。

FutureTask在Executor框架中表示非同步任務,此外還可以用來表示一些時間較長的運算,這些計算可以在使用計算結構之前啟動。

實戰:構建快取

首先,使用HashMap和同步機制來初始化快取。

public interface Computable<A,V> {
    V compute(A arg) throws InterruptedException;
}
public class ExpensiveFunc implements Computable<String,BigInteger> {

    @Override
    public BigInteger compute(String arg) throws InterruptedException {
        return new BigInteger(arg);
    }
}
public class Memoizer1<A,V> implements Computable<A,V> {
    private final Map<A,V> cache=new HashMap<>();
    private final Computable<A,V> c;

    public Memoizer1(Computable<A,V> c){
        this.c=c;
    }

    @Override
    public synchronized V compute(A arg) throws InterruptedException {
        V result=cache.get(arg);
        if(result==null){
            result=c.compute(arg);
            cache.put(arg,result);
        }
        return result;
    }
}
複製程式碼

在這種實現方法中,使用HashMap儲存之前計算的結果。首先檢查需要的結果是否已經在快取中,如果存在則返回之前計算,否則將計算結果快取到HashMap再返回。

為了確保執行緒安全,將整個compute方法進行同步。但是這樣伸縮性差,快取的效能並沒有得到提升。

下面使用ConcurrentHashMap替換HashMap。但是,這種方法存在一些不足,當兩個執行緒同時呼叫compute時,可能會導致計算得到相同的值。這樣是低效的,因為快取的作用就是避免相同的資料被計算多次。其問題在於,如果某個執行緒啟動了一個計算,而其他執行緒並不知道這個計算正在進行,很可能會重複這個計算。

針對如上問題,我們考慮可以使用FutureTask來解決。使用該類來表示計算的過程,如果有結果可用,則返回結果,否則一直阻塞。

public class Memo2 <A,V> implements Computable<A,V>{
    private final Map<A,Future<V>> cache=new ConcurrentHashMap<>();
    private final Computable<A,V>c;
    public Memo2(Computable<A,V>c){
        this.c=c;
    }

    @Override
    public V compute(A arg) throws InterruptedException {
        Future<V> future=cache.get(arg);
        if(future==null){
            Callable<V> eval=new Callable<V>() {
                @Override
                public V call() throws Exception {
                    return c.compute(arg);
                }
            };
            FutureTask<V> ft=new FutureTask<>(eval);
            future=cache.putIfAbsent(arg,ft);
            if(future==null){
                future=ft;
                ft.run();
            }
        }
        try{
            return future.get();
        }catch (ExecutionException e){
            e.printStackTrace();
        }
        return null;
    }

}
複製程式碼

參考資料