1. 程式人生 > >java併發程式設計實踐學習(4)構建塊

java併發程式設計實踐學習(4)構建塊

一.同步容器

同步容器包括倆部分Vector和HashTable,這些類由Collection.synchronizedxxx工廠方法建立,這些類通過封裝他們的狀態,並對每一個公共方法進行同步而實現了執行緒的安全,這樣一次只能有一個執行緒訪問容器的狀態。

1.同步容器中的問題

同步容器都是執行緒安全的。但是對於複合操作有時你可能需要使用額外的客戶端加鎖進行保護。這些複合操作即使沒有客戶端加鎖技術上是執行緒安全的,但是有其它執行緒能併發修改容器的時候就不能按期望的方式執行。
操作Vector的複合操作可能導致混亂的結果

public static Object getLast(Vector list
){ int lastIndex = list.size() - 1; return list.get(lastIndex); } public static void deleteLast(Vector list){ int lastIndex = list.size() - 1; list.remove(lastIndex); }

這裡寫圖片描述

getLast和deleteLast交替發生,丟擲ArrayIndexOutOfBoundsException
使用客戶端加鎖,對Vector進行復合操作

public static Object getLast(Vector list
){ synchronized(list){ int lastIndex = list.size() - 1; return list.get(lastIndex); } } public static void deleteLast(Vector list){ synchronized(list){ int lastIndex = list.size() - 1; list.remove(lastIndex); } }

迭代的過程中也可能丟擲 ArrayIndexOutOfBoundsException

for(int i = 0;i < vector.size(); i++){
    doSomething(vector.get(i));
}

雖然Vector是執行緒安全的但是在迭代過程中可能由於其他執行緒修改vector。
這種造成迭代不可靠的問題可以通過客戶端加鎖解決,但是帶迭代期間其他執行緒無法訪問,這削弱了併發性。

2.迭代器和ConcurrentModificationException

對容器的標準迭代時使用Iterator,無論是顯示的使用還是隱式的通過foreach迴圈。
在設計同步容器返回的迭代器時,沒有考慮到併發修改的問題,它們是及時失敗的:當它們察覺到容器在迭代期間被修改會丟擲一個未檢查的ConcurrentModificationException。不過這樣的檢查在沒有同步帶情況下進行,所以可能存在風險:看到過期資料,卻沒有發現修改。
解決迭代不可靠的方法是加鎖,但是有時我們不願意在迭代期間加鎖。當其他執行緒需要訪問容器的時候必須等待,直到迭代結束:如果容器很大,或者每一個元素執行的任務耗時比較長,他們可能需要等待很長時間。如果doSomething時還要持有其他鎖,這是一個產生死鎖的風險。
保持鎖的時間越長,對鎖的競爭就越激烈,如果很多執行緒在等待時阻塞,吞吐量和CPU的效能都會受到影響。

3.隱藏迭代器

很多時候迭代器是隱藏的,例如字串的拼接操作經過編譯轉換為呼叫StringBuilder.append(Object)完成,它會呼叫容器的toString方法,標準容器的toString方法會迭代容器中的每個元素。
addTenThings,println,hashCode和equals方法也會間接呼叫迭代,類似的containsAll,removeAll,retainAll方法以及把容器作為引數的建構函式都會對容器進行迭代會丟擲ConcurrentModificationException。

二.併發容器

同步容器是對容器所有的狀態進行序列訪問,這樣會削弱併發性,降低吞吐量。
併發容器是為多執行緒訪問設計,用併發容器替換同步容器可以用很小的風險取得可擴充套件性顯著的提高。
java5.0添加了Queue和BlockingQueue.
Queue操作不會阻塞,如果佇列為空,那麼獲取元素會返回空值。
BlockingQueue如果佇列為空,獲取操作會一直阻塞到佇列中存在可用元素。

1.ConcurrentHashMap

ConcurrentHashMap以前,程式使用一個公共的鎖同步每一個方法,並嚴格限制只能有一個執行緒訪問容器。而ConcurrentHashMap使用了分離鎖,這個機制允許任意數量的讀執行緒可以併發訪問Map,讀者寫者也可以併發訪問Map,並且有限的寫執行緒可以併發修改Map。這樣併發訪問帶來更高的吞吐量,同時幾乎沒有損失單個執行緒訪問的效能。
ConcurrentHashMap和一起併發容器返回的迭代器有弱一致性而非就“及時失敗”的。弱一致性迭代器允許併發修改,當迭代器被建立時,它會遍歷已有元素,並且可以(但是不能保證)感應到迭代器被建立後,對容器修改。

2.Map附加的原子操作

ConcurrentHashMap不能在獨佔訪問中被加鎖,我們不能使用客戶端加鎖來建立新的原子操作。但常見的“缺少即加入”,“相等便移除”,“相等便替換”被實現為原子操作。

3.CoopyOnWriteArrayList

CopyOnWriteArrayList是同步List的一個併發代替品,有更好的併發性,避免了在迭代期間對容器的加鎖和複製。
“寫入時複製”容器來源於只要有效的不可變物件被正確的釋出,那麼訪問它將不需要更多的同步。在每次需要修改時會建立並從新發布新的容器拷貝來實現可變性。
“寫入時複製”迭代器保留一個底層陣列的引用。這個陣列作為迭代器的起點,永遠不修改,對它的同步是為了確保陣列內容的可見性。因此,多個執行緒可以對這個容器迭代,並且不都其他執行緒干涉。
每次複製容器需要一定的開銷。所以常用在容器的迭代操作遠遠高於修改的頻率。

三.阻塞佇列和生產者-消費者模式

阻塞佇列(BlockingQueue)提供了可阻塞的put和take。如果佇列已經滿了,put方法會阻塞知道空間可用。如果佇列為空,take方法會阻塞直到有元素可用。佇列長度可以有限可以無限。
阻塞佇列支援生產者-消費者設計模式。
該模式不會發現一個工作便立即處理,而是把工作置入一個任務清單中以備後期處理。該模式簡化了開發,因為它解除了生產者類和消費者類之間互相依賴的程式碼。
最常見的的實現是執行緒池和工作佇列的結合(後面細講).
阻塞佇列提供了offer方法,如果條目不能被加入到佇列裡會返回失敗狀態。這使你能建立更靈活的策略來處理超負荷工作。
生產者和消費者模式可以使生產者消費者程式碼互相解耦,但是它們的行為還是通過共享佇列耦合在一起。如果阻塞佇列不符合你的要求,也可以使用訊號量來建立其他阻塞資料結構。
BlockingQueue的實現

  • LinkedBlockingQueue和ArrayBlockingQueue是FIFO佇列
  • PriorityBlockingQueue是按優先順序順序排序的,它可以使元素本身的自然順序,也可以使用一個Comparator實現。
  • SynchronousQueue不是真正的佇列,它沒有為元素維護儲存空間,但它維護一個排隊執行緒清單,這些執行緒等待把元素加入或移除佇列。

2.連續執行緒限制

對於可變物件,生產者-消費者模式和阻塞佇列一起為生產者消費者之間移交物件所有權提供了連續的執行緒限制,一個執行緒約束的物件完全由唯一一個能訪問到這個物件的許可權,並且移交後原執行緒不能訪問到他。

3.雙端佇列和竊取工作

java6引入了Deque和BolckingDeque,它們是雙端佇列,允許高效的在頭和尾分別進行插入和刪除。
它們與竊取工作模式相關聯:每一個消費者有一個自己的雙端佇列,如果一個消費者完成了自己佇列中的全部工作,它可以偷取其他消費者雙端佇列中末尾的任務。從而進一步降低對雙端佇列的爭奪。

四.阻塞和可中斷的方法

執行緒可能因為:等待I/O操作結束,等待獲得一個鎖,等待從Thread.sleep中喚醒,或者等待另外一個執行緒的計算結果。被阻塞的執行緒必須等待外部事件傳送才能回到RUNNABLE狀態從新獲得排程機會。
BlockingQueue的put和take會丟擲一個受檢查的InterruptedException,當一個方法能丟擲這個異常說明這是一個可以阻塞的方法,如果它被中斷可以提前結束阻塞狀態。
Thread的interrupt方法用來中斷一個執行緒或者查詢一個執行緒是否已經被中斷每一個執行緒有一個布林型別的屬性,代表了中斷狀態。
中斷是一種協作機制,一個執行緒不能迫使其他執行緒停止正在做的事情或去做其他事情,當A中斷B時,A僅僅是要求B在達成某一個方便停止的關鍵點時停止正在做的事。
當你呼叫了一個會丟擲InterruptedException時你自己的方法也稱為了一個阻塞方法,要為響應中斷做好準備,有兩種基本選擇:
傳遞InterruptedException,恢復中斷。還可以有更加複雜的處理方案,但你不應該捕獲它,但不做任何響應,這樣會丟失執行緒中斷的證據,從而剝奪了上層程式碼處理中斷的機會。
只有一種情況允許掩蓋中斷:你擴充套件了Thread並因此控制了所有處於呼叫棧上層的程式碼。

五.Synchronizer

Synchronizer是一個物件,他根據本身的狀態和調節執行緒的控制流。阻塞佇列,訊號量(semaphore)。關卡(barrier)以及閉鎖(latch)可以扮演Synchronizer的角色。
所有Synchronizer都有類似的結構特性:它們封裝狀態,而這些狀態決定著執行緒執行到在某一點是通過還是被迫等待,他們還提供操控狀態的方法,以及高效的等待Synchronizer進入到期望狀態的方法。

1.閉鎖

閉鎖是一種Synchronzier,他可以延遲執行緒的進度直到執行緒到達終止狀態。
CountDownLatch是一個靈活的閉鎖實現:允許一個或多個執行緒等待一個事件集的發生。
閉鎖狀態包括一個計數器,初始化為一個正數,用來表現需要等待的事件數。countDown方法對計數器映象減操作,表示一個事件已經發生,而await方法等待計數器達到零。此時所有需要等待的實際已經發生、如果計數器入口值非零,await會一直阻塞到計數器為零,或者等待執行緒中斷以及超時。
在時序測試中,使用CountDownLatch來啟動和停止執行緒

public class TestHarness{
    public longtimeTasks(int nThreads,final Runnable task) throws InterruptedException{
        final CountDownLatch startGate = new CountDownLatch(1);
        final CountDownLatch endGate = new CountDownLatch(nThreads);
        for(int i = 0;i < nThreads; i++){
            Thread t = new Thread(){
                public void run(){
                    try{
                        startGate.await();
                        try{
                            task.run();
                        }finally{
                            endGate.cuntDown();
                        }
                    }catch(){
                    }
                }
            };
            t.start();
        }
        long start = System.nanoTime();
        startGate.countDown();
        endGate.await();
        long end = System.nanoTime();
        return end - start;
    }
}

2.FurureTask

FurtureTask同樣可以作為閉鎖。它的計算是通過Callable實現,等價於一個可帶結果的Runnable,有三個狀態:等待,執行,和完成。
Fureure.get的行為依賴於任務的狀態。如果他已經完成,get可以立刻得到返回的結果,否則會阻塞直到任務轉入完成狀態。
使用FutureTask於嘉在稍後需要的資料

public class Preloader{
    private final FutureTask<ProductInfo> future = new FutureTask<ProductInfo>(new Callable<ProductInfo>(){
        public ProductInfo call() throws DataLoadException(){
            return loadProductInfo();
        }
    });
    private final Thread thread = new Thread(future);
    public void start(){
        thread.start();
    }
    public ProductInfo get()throws DataLoadException ,InterruptedException{

    }
}

3.訊號量

計數訊號量(Counting semaphore)用來控制能夠同時訪問某特定資源的活動的數量或者同時執行某一給定操作的數量。技術訊號量可以用來實現資源池或者給一個容器限定邊界。
一個Semaphroe管理一個有效的許可集;許可集的初始量通過建構函式傳遞給Semaphroe。活動能夠獲得許可並在使用後釋放許可,如果已經沒有可用的許可了,那麼acquier會被阻塞直到有可用的為止或者到被中斷或者操作超時)。release方法向訊號量返回一個許可。
如果一個初始計數唯一的Semaphroe可以用作互斥鎖。你也可以用Semaphroe把任何一個容器轉換為有界的阻塞容器。
使用訊號量來約束容器

public calss BoundedHashSet<T>(){
    private final Set<T> set;
    private fianl Seaphore sem;
    public BoundedHashSet(int bound){
        this.set = Collections.SynchornizedSet(new HashSet<T>());
        sem = new Semaphore(bound);
    }

    public  boolean add(T o) throw InterruptedException{
        sem.acquire();
        boolean wasAdded = false;
        try{
            wasAdded = set.add(o);
            return waAdded;
        }finally{
            if(!wasAdded){
                sem.release();
            }
        }
    }

    public boolean remove(Object o){
        boolean wasRemoved = set.Removed(o);
        if(wasRemoved)
            sem.release();
        return wasRemoved;
    }
}

4.關卡

關卡類似於閉鎖,它們能阻塞一組執行緒,直到某些事件發生,不同的是所有執行緒必須都到達關卡點才能繼續處理。閉鎖等待的是事件,關卡等待的是執行緒。
關卡實現的協議就行家庭成員在商場中的集合地點 “我們每個人6點在麥當勞見”然後自己幹自己的事情。
CyclicBarrier允許一個給定數量的成員多次集中在一個關卡點,這在並行迭代短髮中非常有用,這個演算法會把一個問題拆成一系列互相獨立的子問題。當執行緒到達關卡點時呼叫await,await會被阻塞,直到所有執行緒都到達關卡點。如果所有執行緒都到達關卡點,關卡就被釋放,關卡重置以備下一次使用。如果對await呼叫超時,或者阻塞中的執行緒被中斷,關卡認為是失敗的,所有對await未完成的呼叫都通過BrokenbarrierException終止。如果成功的通過關卡,await為每一個執行緒返回一個唯一的到達索引號,用它可以用來選舉殘生一個領導,在下一次迭代中承擔一些特殊工作。CyclicBarrier也允許你向建構函式傳遞一個關卡行為;這是一個Run拿不了,當成功通過關卡的時候會執行,但是在阻塞執行緒被釋放掉之前是不能執行的。關卡常被用來模擬一個步驟的並行執行,但是要求必須完成所有域一個步驟相關的工作才能進入下一步。
Exchanger是關卡的另一種形式,它是一種倆步關卡,在關卡點會交換資料。當倆方的活動不對稱時,Exchange是非常有用的。比如當一個執行緒向緩衝寫入一個數據,這時另一個執行緒充當消費者使用這個資料,這時可以使用Exchanger進行會面,並用完整的緩衝和空緩衝進行會面。交換為雙方的物件建立了一個安全的釋出。
交換的時機取決於程式的響應需求。最簡單的方案就是緩衝滿時交換,並且清除任務的緩衝清空後也交換;這樣做交換的次數少,如果交換的達到率不可預測的話,處理資料延遲。另一個方案是,緩衝滿了交換,但是沒滿但已經存在了特定的時間也會交換。