1. 程式人生 > >併發程式設計 – Concurrent 使用者指南

併發程式設計 – Concurrent 使用者指南

譯序

本指南根據 Jakob Jenkov 最新部落格翻譯,請隨時關注部落格更新:http://tutorials.jenkov.com/java-util-concurrent/index.html。

1. java.util.concurrent – Java 併發工具包

Java 5 添加了一個新的包到 Java 平臺,java.util.concurrent 包。這個包包含有一系列能夠讓 Java 的併發程式設計變得更加簡單輕鬆的類。在這個包被新增以前,你需要自己去動手實現自己的相關工具類。

本文我將帶你一一認識 java.util.concurrent 包裡的這些類,然後你可以嘗試著如何在專案中使用它們。本文中我將使用 Java 6 版本,我不確定這和 Java 5 版本里的是否有一些差異。我不會去解釋關於 Java 併發的核心問題 – 其背後的原理,也就是說,如果你對那些東西感興趣,參考《

Java 併發指南》。

半成品

本文很大程度上還是個 “半成品”,所以當你發現一些被漏掉的類或介面時,請耐心等待。在我空閒的時候會把它們加進來的。

2. 阻塞佇列 BlockingQueue

java.util.concurrent 包裡的 BlockingQueue 介面表示一個執行緒安放入和提取例項的佇列。本小節我將給你演示如何使用這個 BlockingQueue。本節不會討論如何在 Java 中實現一個你自己的 BlockingQueue。如果你對那個感興趣,參考《Java 併發指南

BlockingQueue 用法

BlockingQueue 通常用於一個執行緒生產物件,而另外一個執行緒消費這些物件的場景。下圖是對這個原理的闡述:

一個執行緒往裡邊放,另外一個執行緒從裡邊取的一個 BlockingQueue。

一個執行緒將會持續生產新物件並將其插入到佇列之中,直到佇列達到它所能容納的臨界點。也就是說,它是有限的。如果該阻塞佇列到達了其臨界點,負責生產的執行緒將會在往裡邊插入新物件時發生阻塞。它會一直處於阻塞之中,直到負責消費的執行緒從佇列中拿走一個物件。負責消費的執行緒將會一直從該阻塞佇列中拿出物件。如果消費執行緒嘗試去從一個空的佇列中提取物件的話,這個消費執行緒將會處於阻塞之中,直到一個生產執行緒把一個物件丟進佇列。

BlockingQueue 的方法

BlockingQueue 具有 4 組不同的方法用於插入、移除以及對佇列中的元素進行檢查。如果請求的操作不能得到立即執行的話,每個方法的表現也不同。這些方法如下:

四組不同的行為方式解釋:

拋異常:如果試圖的操作無法立即執行,拋一個異常。 特定值:如果試圖的操作無法立即執行,返回一個特定的值(常常是 true / false)。 阻塞:如果試圖的操作無法立即執行,該方法呼叫將會發生阻塞,直到能夠執行。 超時:如果試圖的操作無法立即執行,該方法呼叫將會發生阻塞,直到能夠執行,但等待時間不會超過給定值。返回一個特定值以告知該操作是否成功(典型的是 true / false)。

無法向一個 BlockingQueue 中插入 null。如果你試圖插入 null,BlockingQueue 將會丟擲一個 NullPointerException。

可以訪問到 BlockingQueue 中的所有元素,而不僅僅是開始和結束的元素。比如說,你將一個物件放入佇列之中以等待處理,但你的應用想要將其取消掉。那麼你可以呼叫諸如 remove(o) 方法來將佇列之中的特定物件進行移除。但是這麼幹效率並不高(譯者注:基於佇列的資料結構,獲取除開始或結束位置的其他物件的效率不會太高),因此你儘量不要用這一類的方法,除非你確實不得不那麼做。

BlockingQueue 的實現

BlockingQueue 是個介面,你需要使用它的實現之一來使用 BlockingQueue。java.util.concurrent 具有以下 BlockingQueue 介面的實現(Java 6):

Java 中使用 BlockingQueue 的例子

這裡是一個 Java 中使用 BlockingQueue 的示例。本示例使用的是 BlockingQueue 介面的 ArrayBlockingQueue 實現。

首先,BlockingQueueExample 類分別在兩個獨立的執行緒中啟動了一個 Producer 和 一個 Consumer。

Producer 向一個共享的 BlockingQueue 中注入字串,而 Consumer 則會從中把它們拿出來。

public class BlockingQueueExample {  

    public static void main(String[] args) throws Exception {  

        BlockingQueue queue = new ArrayBlockingQueue(1024);  

        Producer producer = new Producer(queue);  
        Consumer consumer = new Consumer(queue);  

        new Thread(producer).start();  
        new Thread(consumer).start();  

        Thread.sleep(4000);  
    }  
}

以下是 Producer 類。注意它在每次 put() 呼叫時是如何休眠一秒鐘的。這將導致 Consumer 在等待佇列中物件的時候發生阻塞。

public class Producer implements Runnable{  

    protected BlockingQueue queue = null;  

    public Producer(BlockingQueue queue) {  
        this.queue = queue;  
    }  

    public void run() {  
        try {  
            queue.put("1");  
            Thread.sleep(1000);  
            queue.put("2");  
            Thread.sleep(1000);  
            queue.put("3");  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
    }  
}

以下是 Consumer 類。它只是把物件從佇列中抽取出來,然後將它們列印到 System.out。

public class Consumer implements Runnable{  

    protected BlockingQueue queue = null;  

    public Consumer(BlockingQueue queue) {  
        this.queue = queue;  
    }  

    public void run() {  
        try {  
            System.out.println(queue.take());  
            System.out.println(queue.take());  
            System.out.println(queue.take());  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
    }  
}

3. 陣列阻塞佇列 ArrayBlockingQueue

ArrayBlockingQueue 類實現了 BlockingQueue 介面。

ArrayBlockingQueue 是一個有界的阻塞佇列,其內部實現是將物件放到一個數組裡。有界也就意味著,它不能夠儲存無限多數量的元素。它有一個同一時間能夠儲存元素數量的上限。你可以在對其初始化的時候設定這個上限,但之後就無法對這個上限進行修改了(譯者注:因為它是基於陣列實現的,也就具有陣列的特性:一旦初始化,大小就無法修改)。

‘ArrayBlockingQueue 內部以 FIFO(先進先出)的順序對元素進行儲存。佇列中的頭元素在所有元素之中是放入時間最久的那個,而尾元素則是最短的那個。

以下是在使用 ArrayBlockingQueue 的時候對其初始化的一個示例:

BlockingQueue queue = new ArrayBlockingQueue(1024);  

queue.put("1");  

Object object = queue.take();

以下是使用了 Java 泛型的一個 BlockingQueue 示例。注意其中是如何對 String 元素放入和提取的:

BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1024);  

queue.put("1");  

String string = queue.take();

4. 延遲佇列 DelayQueue

DelayQueue 實現了 BlockingQueue 介面。DelayQueue 對元素進行持有直到一個特定的延遲到期。注入其中的元素必須實現 java.util.concurrent.Delayed 介面,該介面定義:

public interface Delayed extends Comparable<Delayed< {  

 public long getDelay(TimeUnit timeUnit);  

}

DelayQueue 將會在每個元素的 getDelay() 方法返回的值的時間段之後才釋放掉該元素。如果返回的是 0 或者負值,延遲將被認為過期,該元素將會在 DelayQueue 的下一次 take 被呼叫的時候被釋放掉。傳遞給 getDelay 方法的 getDelay 例項是一個列舉型別,它表明了將要延遲的時間段。

TimeUnit 列舉將會取以下值:

DAYS  
HOURS  
MINUTES  
SECONDS  
MILLISECONDS  
MICROSECONDS  
NANOSECONDS  
`

正如你所看到的,Delayed 介面也繼承了 java.lang.Comparable 介面,這也就意味著 Delayed 物件之間可以進行對比。這個可能在對 DelayQueue 佇列中的元素進行排序時有用,因此它們可以根據過期時間進行有序釋放。以下是使用 DelayQueue 的例子:

public class DelayQueueExample {  

    public static void main(String[] args) {  
        DelayQueue queue = new DelayQueue();  

        Delayed element1 = new DelayedElement();  

        queue.put(element1);  

        Delayed element2 = queue.take();  
    }  
}

DelayedElement 是我所建立的一個 DelayedElement 介面的實現類,它不在 Java.util.concurrent 包裡。你需要自行建立你自己的 Delayed 介面的實現以使用 DelayQueue 類。

5. 鏈阻塞佇列 LinkedBlockingQueue

LinkedBlockingQueue 類實現了 BlockingQueue 介面。

LinkedBlockingQueue 內部以一個鏈式結構(連結節點)對其元素進行儲存。如果需要的話,這一鏈式結構可以選擇一個上限。如果沒有定義上限,將使用 Integer.MAX_VALUE 作為上限。

LinkedBlockingQueue 內部以 FIFO(先進先出)的順序對元素進行儲存。佇列中的頭元素在所有元素之中是放入時間最久的那個,而尾元素則是最短的那個。

以下是 LinkedBlockingQueue 的初始化和使用示例程式碼:

BlockingQueue<String> unbounded = new LinkedBlockingQueue<String>();  
BlockingQueue<String> bounded   = new LinkedBlockingQueue<String>(1024);  

bounded.put("Value");  

String value = bounded.take();

6. 具有優先順序的阻塞佇列 PriorityBlockingQueue

PriorityBlockingQueue 類實現了 BlockingQueue 介面。

PriorityBlockingQueue 是一個無界的併發佇列。它使用了和類 java.util.PriorityQueue 一樣的排序規則。你無法向這個佇列中插入 null 值。所有插入到 PriorityBlockingQueue 的元素必須實現 java.lang.Comparable 介面。因此該佇列中元素的排序就取決於你自己的 Comparable 實現。注意 PriorityBlockingQueue 對於具有相等優先順序(compare() == 0)的元素並不強制任何特定行為。

同時注意,如果你從一個 PriorityBlockingQueue 獲得一個 Iterator 的話,該 Iterator 並不能保證它對元素的遍歷是以優先順序為序的。

以下是使用 PriorityBlockingQueue 的示例:

BlockingQueue queue   = new PriorityBlockingQueue();  

    //String implements java.lang.Comparable  
    queue.put("Value");  

    String value = queue.take();

7. 同步佇列 SynchronousQueue

SynchronousQueue 類實現了 BlockingQueue 介面。

SynchronousQueue 是一個特殊的佇列,它的內部同時只能夠容納單個元素。如果該佇列已有一元素的話,試圖向佇列中插入一個新元素的執行緒將會阻塞,直到另一個執行緒將該元素從佇列中抽走。同樣,如果該佇列為空,試圖向佇列中抽取一個元素的執行緒將會阻塞,直到另一個執行緒向佇列中插入了一條新的元素。

據此,把這個類稱作一個佇列顯然是誇大其詞了。它更多像是一個匯合點。

8. 阻塞雙端佇列 BlockingDeque

java.util.concurrent 包裡的 BlockingDeque 介面表示一個執行緒安放入和提取例項的雙端佇列。本小節我將給你演示如何使用 BlockingDeque。BlockingDeque 類是一個雙端佇列,在不能夠插入元素時,它將阻塞住試圖插入元素的執行緒;在不能夠抽取元素時,它將阻塞住試圖抽取的執行緒。deque(雙端佇列) 是 “Double Ended Queue” 的縮寫。因此,雙端佇列是一個你可以從任意一端插入或者抽取元素的佇列。

BlockingDeque 的使用

線上程既是一個佇列的生產者又是這個佇列的消費者的時候可以使用到 BlockingDeque。如果生產者執行緒需要在佇列的兩端都可以插入資料,消費者執行緒需要在佇列的兩端都可以移除資料,這個時候也可以使用 BlockingDeque。

一個 BlockingDeque – 執行緒在雙端佇列的兩端都可以插入和提取元素。

一個執行緒生產元素,並把它們插入到佇列的任意一端。如果雙端佇列已滿,插入執行緒將被阻塞,直到一個移除執行緒從該佇列中移出了一個元素。如果雙端佇列為空,移除執行緒將被阻塞,直到一個插入執行緒向該佇列插入了一個新元素。

BlockingDeque 的方法

BlockingDeque 具有 4 組不同的方法用於插入、移除以及對雙端佇列中的元素進行檢查。如果請求的操作不能得到立即執行的話,每個方法的表現也不同。這些方法如下:

四組不同的行為方式解釋:

拋異常:如果試圖的操作無法立即執行,拋一個異常。 特定值:如果試圖的操作無法立即執行,返回一個特定的值(常常是 true / false)。 阻塞:如果試圖的操作無法立即執行,該方法呼叫將會發生阻塞,直到能夠執行。 超時:如果試圖的操作無法立即執行,該方法呼叫將會發生阻塞,直到能夠執行,但等待時間不會超過給定值。返回一個特定值以告知該操作是否成功(典型的是 true / false)。

BlockingDeque 繼承自 BlockingQueue

BlockingDeque 介面繼承自 BlockingQueue 介面。

這就意味著你可以像使用一個 BlockingQueue 那樣使用 BlockingDeque。如果你這麼幹的話,各種插入方法將會把新元素新增到雙端佇列的尾端,而移除方法將會把雙端佇列的首端的元素移除。正如 BlockingQueue 介面的插入和移除方法一樣。

以下是 BlockingDeque 對 BlockingQueue 介面的方法的具體內部實現:

BlockingDeque 的實現

既然 BlockingDeque 是一個介面,那麼你想要使用它的話就得使用它的眾多的實現類的其中一個。java.util.concurrent 包提供了以下 BlockingDeque 介面的實現類:

BlockingDeque 程式碼示例

以下是如何使用 BlockingDeque 方法的一個簡短程式碼示例:

BlockingDeque<String> deque = new LinkedBlockingDeque<String>();  

deque.addFirst("1");  
deque.addLast("2");  

String two = deque.takeLast();  
String one = deque.takeFirst();

9. 鏈阻塞雙端佇列 LinkedBlockingDeque

LinkedBlockingDeque 類實現了 BlockingDeque 介面。

deque(雙端佇列) 是 “Double Ended Queue” 的縮寫。因此,雙端佇列是一個你可以從任意一端插入或者抽取元素的佇列。(譯者注:唐僧啊,受不了。)LinkedBlockingDeque 是一個雙端佇列,在它為空的時候,一個試圖從中抽取資料的執行緒將會阻塞,無論該執行緒是試圖從哪一端抽取資料。以下是 LinkedBlockingDeque 例項化以及使用的示例:

BlockingDeque<String> deque = new LinkedBlockingDeque<String>();  

deque.addFirst("1");  
deque.addLast("2");  

String two = deque.takeLast();  
String one = deque.takeFirst();

10. 併發 Map(對映) ConcurrentMap

java.util.concurrent.ConcurrentMap

java.util.concurrent.ConcurrentMap 介面表示了一個能夠對別人的訪問(插入和提取)進行併發處理的 java.util.Map。ConcurrentMap 除了從其父介面 java.util.Map 繼承來的方法之外還有一些額外的原子性方法。

ConcurrentMap 的實現

既然 ConcurrentMap 是個介面,你想要使用它的話就得使用它的實現類之一。java.util.concurrent 包具備 ConcurrentMap 介面的以下實現類:

  • ConcurrentHashMap

ConcurrentHashMap

ConcurrentHashMap 和 java.util.HashTable 類很相似,但 ConcurrentHashMap 能夠提供比 HashTable 更好的併發效能。在你從中讀取物件的時候 ConcurrentHashMap 並不會把整個 Map 鎖住。

此外,在你向其中寫入物件的時候,ConcurrentHashMap 也不會鎖住整個 Map。它的內部只是把 Map 中正在被寫入的部分進行鎖定。

另外一個不同點是,在被遍歷的時候,即使是 ConcurrentHashMap 被改動,它也不會拋 ConcurrentModificationException。儘管 Iterator 的設計不是為多個執行緒的同時使用。更多關於 ConcurrentMap 和 ConcurrentHashMap 的細節請參考官方文件。

ConcurrentMap 例子

以下是如何使用 ConcurrentMap 介面的一個例子。

本示例使用了 ConcurrentHashMap 實現類:

ConcurrentMap concurrentMap = new ConcurrentHashMap();  

concurrentMap.put("key", "value");  

Object value = concurrentMap.get("key");

11. 併發導航對映 ConcurrentNavigableMap

java.util.concurrent.ConcurrentNavigableMap 是一個支援併發訪問的 java.util.NavigableMap,它還能讓它的子 map 具備併發訪問的能力。所謂的 “子 map” 指的是諸如 headMap(),subMap(),tailMap() 之類的方法返回的 map。

NavigableMap 中的方法不再贅述,本小節我們來看一下 ConcurrentNavigableMap 新增的方法。

headMap()

headMap(T toKey) 方法返回一個包含了小於給定 toKey 的 key 的子 map。如果你對原始 map 裡的元素做了改動,這些改動將影響到子 map 中的元素(譯者注:map 集合持有的其實只是物件的引用)。以下示例演示了對 headMap() 方法的使用:

ConcurrentNavigableMap map = new ConcurrentSkipListMap();  

map.put("1", "one");  
map.put("2", "two");  
map.put("3", "three");  

ConcurrentNavigableMap headMap = map.headMap("2");

headMap 將指向一個只含有鍵 “1″ 的 ConcurrentNavigableMap,因為只有這一個鍵小於 “2″。關於這個方法及其過載版本具體是怎麼工作的細節請參考 Java 文件。

tailMap()

tailMap(T fromKey) 方法返回一個包含了不小於給定 fromKey 的 key 的子 map。

如果你對原始 map 裡的元素做了改動,這些改動將影響到子 map 中的元素(譯者注:map 集合持有的其實只是物件的引用)。

以下示例演示了對 tailMap() 方法的使用:

ConcurrentNavigableMap map = new ConcurrentSkipListMap();  

map.put("1", "one");  
map.put("2", "two");  
map.put("3", "three");  

ConcurrentNavigableMap tailMap = map.tailMap("2");

tailMap 將擁有鍵 “2″ 和 “3″,因為它們不小於給定鍵 “2″。關於這個方法及其過載版本具體是怎麼工作的細節請參考 Java 文件。

subMap()

subMap() 方法返回原始 map 中,鍵介於 from(包含) 和 to (不包含) 之間的子 map。

示例如下:

ConcurrentNavigableMap map = new ConcurrentSkipListMap();  

map.put("1", "one");  
map.put("2", "two");  
map.put("3", "three");  

ConcurrentNavigableMap subMap = map.subMap("2", "3");

返回的 submap 只包含鍵 “2″,因為只有它滿足不小於 “2″,比 “3″ 小。

更多方法

ConcurrentNavigableMap 介面還有其他一些方法可供使用, 比如:

  • descendingKeySet()
  • descendingMap()
  • navigableKeySet()

關於這些方法更多資訊參考官方 Java 文件。

12. 閉鎖 CountDownLatch

java.util.concurrent.CountDownLatch 是一個併發構造,它允許一個或多個執行緒等待一系列指定操作的完成。

CountDownLatch 以一個給定的數量初始化。countDown() 每被呼叫一次,這一數量就減一。通過呼叫 await() 方法之一,執行緒可以阻塞等待這一數量到達零。以下是一個簡單示例。

Decrementer 三次呼叫 countDown() 之後,等待中的 Waiter 才會從 await() 呼叫中釋放出來。

CountDownLatch latch = new CountDownLatch(3);  

Waiter      waiter      = new Waiter(latch);  
Decrementer decrementer = new Decrementer(latch);  

new Thread(waiter)     .start();  
new Thread(decrementer).start();  

Thread.sleep(4000);  

public class Waiter implements Runnable{  

    CountDownLatch latch = null;  

    public Waiter(CountDownLatch latch) {  
        this.latch = latch;  
    }  

    public void run() {  
        try {  
            latch.await();  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  

        System.out.println("Waiter Released");  
    }  
}  

public class Decrementer implements Runnable {  

    CountDownLatch latch = null;  

    public Decrementer(CountDownLatch latch) {  
        this.latch = latch;  
    }  

    public void run() {  

        try {  
            Thread.sleep(1000);  
            this.latch.countDown();  

            Thread.sleep(1000);  
            this.latch.countDown();  

            Thread.sleep(1000);  
            this.latch.countDown();  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
    }  
}

13. 柵欄 CyclicBarrier

java.util.concurrent.CyclicBarrier 類是一種同步機制,它能夠對處理一些演算法的執行緒實現同步。換句話講,它就是一個所有執行緒必須等待的一個柵欄,直到所有執行緒都到達這裡,然後所有執行緒才可以繼續做其他事情。

圖示如下:

兩個執行緒在柵欄旁等待對方。

通過呼叫 CyclicBarrier 物件的 await() 方法,兩個執行緒可以實現互相等待。一旦 N 個執行緒在等待 CyclicBarrier 達成,所有執行緒將被釋放掉去繼續執行。

建立一個 CyclicBarrier

在建立一個 CyclicBarrier 的時候你需要定義有多少執行緒在被釋放之前等待柵欄。

建立 CyclicBarrier 示例:

CyclicBarrier barrier = new CyclicBarrier(2);

等待一個 CyclicBarrier

以下演示瞭如何讓一個執行緒等待一個 CyclicBarrier:

barrier.await();

當然,你也可以為等待執行緒設定一個超時時間。等待超過了超時時間之後,即便還沒有達成 N 個執行緒等待 CyclicBarrier 的條件,該執行緒也會被釋放出來。以下是定義超時時間示例:

barrier.await(10, TimeUnit.SECONDS);

滿足以下任何條件都可以讓等待 CyclicBarrier 的執行緒釋放:

  • 最後一個執行緒也到達 CyclicBarrier(呼叫 await())
  • 當前執行緒被其他執行緒打斷(其他執行緒呼叫了這個執行緒的 interrupt() 方法)
  • 其他等待柵欄的執行緒被打斷
  • 其他等待柵欄的執行緒因超時而被釋放
  • 外部執行緒呼叫了柵欄的 CyclicBarrier.reset() 方法

CyclicBarrier 行動

CyclicBarrier 支援一個柵欄行動,柵欄行動是一個 Runnable 例項,一旦最後等待柵欄的執行緒抵達,該例項將被執行。你可以在 CyclicBarrier 的構造方法中將 Runnable 柵欄行動傳給它:

Runnable barrierAction = ... ; 
CyclicBarrier barrier = new CyclicBarrier(2, barrierAction);

CyclicBarrier 示例

以下程式碼演示瞭如何使用 CyclicBarrier:

Runnable barrier1Action = new Runnable() {  
    public void run() {  
        System.out.println("BarrierAction 1 executed ");  
    }  
};  
Runnable barrier2Action = new Runnable() {  
    public void run() {  
        System.out.println("BarrierAction 2 executed ");  
    }  
};  

CyclicBarrier barrier1 = new CyclicBarrier(2, barrier1Action);  
CyclicBarrier barrier2 = new CyclicBarrier(2, barrier2Action);  

CyclicBarrierRunnable barrierRunnable1 =  
        new CyclicBarrierRunnable(barrier1, barrier2);  

CyclicBarrierRunnable barrierRunnable2 =  
        new CyclicBarrierRunnable(barrier1, barrier2);  

new Thread(barrierRunnable1).start();  
new Thread(barrierRunnable2).start();

CyclicBarrierRunnable 類:

public class CyclicBarrierRunnable implements Runnable{  

    CyclicBarrier barrier1 = null;  
    CyclicBarrier barrier2 = null;  

    public CyclicBarrierRunnable(  
            CyclicBarrier barrier1,  
            CyclicBarrier barrier2) {  

        this.barrier1 = barrier1;  
        this.barrier2 = barrier2;  
    }  

    public void run() {  
        try {  
            Thread.sleep(1000);  
            System.out.println(Thread.currentThread().getName() +  
                                " waiting at barrier 1");  
            this.barrier1.await();  

            Thread.sleep(1000);  
            System.out.println(Thread.currentThread().getName() +  
                                " waiting at barrier 2");  
            this.barrier2.await();  

            System.out.println(Thread.currentThread().getName() +  
                                " done!");  

        } catch (InterruptedException e) {  
            e.printStackTrace();  
        } catch (BrokenBarrierException e) {  
            e.printStackTrace();  
        }  
    }  
}

以上程式碼控制檯輸出如下。注意每個執行緒寫入控制檯的時序可能會跟你實際執行不一樣。比如有時 Thread-0 先列印,有時 Thread-1 先列印。

Thread-0 waiting at barrier 1
Thread-1 waiting at barrier 1
BarrierAction 1 executed
Thread-1 waiting at barrier 2
Thread-0 waiting at barrier 2
BarrierAction 2 executed
Thread-0 done!
Thread-1 done!

14. 交換機 Exchanger

java.util.concurrent.Exchanger 類表示一種兩個執行緒可以進行互相交換物件的會和點。這種機制圖示如下:

兩個執行緒通過一個 Exchanger 交換物件。 交換物件的動作由 Exchanger 的兩個 exchange() 方法的其中一個完成。

以下是一個示例:

Exchanger exchanger = new Exchanger();  

ExchangerRunnable exchangerRunnable1 =  
        new ExchangerRunnable(exchanger, "A");  

ExchangerRunnable exchangerRunnable2 =  
        new ExchangerRunnable(exchanger, "B");  

new Thread(exchangerRunnable1).start();  
new Thread(exchangerRunnable2).start();

ExchangerRunnable 程式碼:

public class ExchangerRunnable implements Runnable{  

    Exchanger exchanger = null;  
    Object    object    = null;  

    public ExchangerRunnable(Exchanger exchanger, Object object) {  
        this.exchanger = exchanger;  
        this.object = object;  
    }  

    public void run() {  
        try {  
            Object previous = this.object;  

            this.object = this.exchanger.exchange(this.object);  

            System.out.println(  
                    Thread.currentThread().getName() +  
                    " exchanged " + previous + " for " + this.object  
            );  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
    }  
}

以上程式輸出:Thread-0 exchanged A for BThread-1 exchanged B for A

15. 訊號量 Semaphore

java.util.concurrent.Semaphore 類是一個計數訊號量。這就意味著它具備兩個主要方法:

  • acquire()
  • release()

計數訊號量由一個指定數量的 “許可” 初始化。每呼叫一次 acquire(),一個許可會被呼叫執行緒取走。每呼叫一次 release(),一個許可會被返還給訊號量。因此,在沒有任何 release() 呼叫時,最多有 N 個執行緒能夠通過 acquire() 方法,N 是該訊號量初始化時的許可的指定數量。這些許可只是一個簡單的計數器。這裡沒啥奇特的地方。

Semaphore 用法

訊號量主要有兩種用途:

  1. 保護一個重要(程式碼)部分防止一次超過 N 個執行緒進入。
  2. 在兩個執行緒之間傳送訊號。

保護重要部分

如果你將訊號量用於保護一個重要部分,試圖進入這一部分的程式碼通常會首先嚐試獲得一個許可,然後才能進入重要部分(程式碼塊),執行完之後,再把許可釋放掉。 比如這樣:

Semaphore semaphore = new Semaphore(1);  

//critical section  
semaphore.acquire();  

...  

semaphore.release();

線上程之間傳送訊號

如果你將一個訊號量用於在兩個執行緒之間傳送訊號,通常你應該用一個執行緒呼叫 acquire() 方法,而另一個執行緒呼叫 release() 方法。如果沒有可用的許可,acquire() 呼叫將會阻塞,直到一個許可被另一個執行緒釋放出來。同理,如果無法往訊號量釋放更多許可時,一個 release() 呼叫也會阻塞。

通過這個可以對多個執行緒進行協調。比如,如果執行緒 1 將一個物件插入到了一個共享列表(list)之後之後呼叫了 acquire(),而執行緒 2 則在從該列表中獲取一個物件之前呼叫了 release(),這時你其實已經建立了一個阻塞佇列。訊號量中可用的許可的數量也就等同於該阻塞佇列能夠持有的元素個數。

公平

沒有辦法保證執行緒能夠公平地可從訊號量中獲得許可。也就是說,無法擔保掉第一個呼叫 acquire() 的執行緒會是第一個獲得一個許可的執行緒。如果第一個執行緒在等待一個許可時發生阻塞,而第二個執行緒前來索要一個許可的時候剛好有一個許可被釋放出來,那麼它就可能會在第一個執行緒之前獲得許可。如果你想要強制公平,Semaphore 類有一個具有一個布林型別的引數的構造子,通過這個引數以告知 Semaphore 是否要強制公平。強制公平會影響到併發效能,所以除非你確實需要它否則不要啟用它。

以下是如何在公平模式建立一個 Semaphore 的示例:

Semaphore semaphore = new Semaphore(1, true);

更多方法

java.util.concurrent.Semaphore 類還有很多方法,比如:

  • availablePermits()
  • acquireUninterruptibly()
  • drainPermits()
  • hasQueuedThreads()
  • getQueuedThreads()
  • tryAcquire()
  • 等等

這些方法的細節請參考 Java 文件。

16. 執行器服務 ExecutorService

java.util.concurrent.ExecutorService 介面表示一個非同步執行機制,使我們能夠在後臺執行任務。因此一個 ExecutorService 很類似於一個執行緒池。實際上,存在於 java.util.concurrent 包裡的 ExecutorService 實現就是一個執行緒池實現。

ExecutorService 例子

以下是一個簡單的 ExecutorService 例子:

ExecutorService executorService = Executors.newFixedThreadPool(10);  

executorService.execute(new Runnable() {  
    public void run() {  
        System.out.println("Asynchronous task");  
    }  
});  

executorService.shutdown();

首先使用 newFixedThreadPool() 工廠方法建立一個 ExecutorService。這裡建立了一個十個執行緒執行任務的執行緒池。然後,將一個 Runnable 介面的匿名實現類傳遞給 execute() 方法。這將導致 ExecutorService 中的某個執行緒執行該 Runnable。

任務委派

下圖說明了一個執行緒是如何將一個任務委託給一個 ExecutorService 去非同步執行的:

一個執行緒將一個任務委派給一個 ExecutorService 去非同步執行。

一旦該執行緒將任務委派給 ExecutorService,該執行緒將繼續它自己的執行,獨立於該任務的執行。

ExecutorService 實現

既然 ExecutorService 是個介面,如果你想用它的話就得去使用它的實現類之一。

java.util.concurrent 包提供了 ExecutorService 介面的以下實現類:

建立一個 ExecutorService

ExecutorService 的建立依賴於你使用的具體實現。但是你也可以使用 Executors 工廠類來建立 ExecutorService 例項。

以下是幾個建立 ExecutorService 例項的例子:

ExecutorService executorService1 = Executors.newSingleThreadExecutor();  

ExecutorService executorService2 = Executors.newFixedThreadPool(10);  

ExecutorService executorService3 = Executors.newScheduledThreadPool(10);

ExecutorService 使用

有幾種不同的方式來將任務委託給 ExecutorService 去執行:

  • execute(Runnable)
  • submit(Runnable)
  • submit(Callable)
  • invokeAny(…)
  • invokeAll(…)

接下來我們挨個看一下這些方法。

execute(Runnable)

execute(Runnable) 方法要求一個 java.lang.Runnable 物件,然後對它進行非同步執行。以下是使用 ExecutorService 執行一個 Runnable 的示例:

ExecutorService executorService = Executors.newSingleThreadExecutor();  

executorService.execute(new Runnable() {  
    public void run() {  
        System.out.println("Asynchronous task");  
    }  
});  

executorService.shutdown();

沒有辦法得知被執行的 Runnable 的執行結果。如果有需要的話你得使用一個 Callable(以下將做介紹)。

submit(Runnable)

submit(Runnable) 方法也要求一個 Runnable 實現類,但它返回一個 Future 物件。這個 Future 物件可以用來檢查 Runnable 是否已經執行完畢。以下是 ExecutorService submit() 示例:

Future future = executorService.submit(new Runnable() {  
    public void run() {  
        System.out.println("Asynchronous task");  
    }  
});  

future.get();  //returns null if the task has finished correctly.

submit(Callable)

submit(Callable) 方法類似於 submit(Runnable) 方法,除了它所要求的引數型別之外。Callable 例項除了它的 call() 方法能夠返回一個結果之外和一個 Runnable 很相像。Runnable.run() 不能夠返回一個結果。Callable 的結果可以通過 submit(Callable) 方法返回的 Future 物件進行獲取。

以下是一個 ExecutorService Callable 示例:

Future future = executorService.submit(new Callable(){  
    public Object call() throws Exception {  
        System.out.println("Asynchronous Callable");  
        return "Callable Result";  
    }  
});  

System.out.println("future.get() = " + future.get());

以上程式碼輸出:

Asynchronous Callable
future.get() = Callable Result

invokeAny()

invokeAny() 方法要求一系列的 Callable 或者其子介面的例項物件。呼叫這個方法並不會返回一個 Future,但它返回其中一個 Callable 物件的結果。無法保證返回的是哪個 Callable 的結果 – 只能表明其中一個已執行結束。

如果其中一個任務執行結束(或者拋了一個異常),其他 Callable 將被取消。以下是示例程式碼:

ExecutorService executorService = Executors.newSingleThreadExecutor();  

Set<Callable<String>> callables = new HashSet<Callable<String>>();  

callables.add(new Callable<String>() {  
    public String call() throws Exception {  
        return "Task 1";  
    }  
});  
callables.add(new Callable<String>() {  
    public String call() throws Exception {  
        return "Task 2";  
    }  
});  
callables.add(new Callable<String>() {  
    public String call() throws Exception {  
        return "Task 3";  
    }  
});  

String result = executorService.invokeAny(callables);  

System.out.println("result = " + result);  

executorService.shutdown();

上述程式碼將會打印出給定 Callable 集合中的一個的執行結果。我自己試著執行了它幾次,結果始終在變。有時是 “Task 1″,有時是 “Task 2″ 等等。

invokeAll()

invokeAll() 方法將呼叫你在集合中傳給 ExecutorService 的所有 Callable 物件。invokeAll() 返回一系列的 Future 物件,通過它們你可以獲取每個 Callable 的執行結果。記住,一個任務可能會由於一個異常而結束,因此它可能沒有 “成功”。

無法通過一個 Future 物件來告知我們是兩種結束中的哪一種。以下是一個程式碼示例:

ExecutorService executorService = Executors.newSingleThreadExecutor();  

Set<Callable<String>> callables = new HashSet<Callable<String>>();  

callables.add(new Callable<String>() {  
    public String call() throws Exception {  
        return "Task 1";  
    }  
});  
callables.add(new Callable<String>() {  
    public String call() throws Exception {  
        return "Task 2";  
    }  
});  
callables.add(new Callable<String>() {  
    public String call() throws Exception {  
        return "Task 3";  
    }  
});  

List<Future<String>> futures = executorService.invokeAll(callables);  

for(Future<String> future : futures){  
    System.out.println("future.get = " + future.get());  
}  

executorService.shutdown();

ExecutorService 關閉

使用完 ExecutorService 之後你應該將其關閉,以使其中的執行緒不再執行。

比如,如果你的應用是通過一個 main() 方法啟動的,之後 main 方法退出了你的應用,如果你的應用有一個活動的 ExexutorService 它將還會保持執行。ExecutorService 裡的活動執行緒阻止了 JVM 的關閉。

要終止 ExecutorService 裡的執行緒你需要呼叫 ExecutorService 的 shutdown() 方法。ExecutorService 並不會立即關閉,但它將不再接受新的任務,而且一旦所有執行緒都完成了當前任務的時候,ExecutorService 將會關閉。在 shutdown() 被呼叫之前所有提交給 ExecutorService 的任務都被執行。如果你想要立即關閉 ExecutorService,你可以呼叫 shutdownNow() 方法。這樣會立即嘗試停止所有執行中的任務,並忽略掉那些已提交但尚未開始處理的任務。無法擔保執行任務的正確執行。可能它們被停止了,也可能已經執行結束。

17. 執行緒池執行者 ThreadPoolExecutor

java.util.concurrent.ThreadPoolExecutor 是 ExecutorService 介面的一個實現。ThreadPoolExecutor 使用其內部池中的執行緒執行給定任務(Callable 或者 Runnable)。

ThreadPoolExecutor 包含的執行緒池能夠包含不同數量的執行緒。池中執行緒的數量由以下變數決定:

  • corePoolSize
  • maximumPoolSize

當一個任務委託給執行緒池時,如果池中執行緒數量低於 corePoolSize,一個新的執行緒將被建立,即使池中可能尚有空閒執行緒。如果內部任務佇列已滿,而且有至少 corePoolSize 正在執行,但是執行執行緒的數量低於 maximumPoolSize,一個新的執行緒將被建立去執行該任務。

ThreadPoolExecutor 圖解:

一個 ThreadPoolExecutor

建立一個 ThreadPoolExecutor

ThreadPoolExecutor 有若干個可用構造子。比如:

int  corePoolSize  =    5;  
int  maxPoolSize   =   10;  
long keepAliveTime = 5000;  

ExecutorService threadPoolExecutor =  
        new ThreadPoolExecutor(  
                corePoolSize,  
                maxPoolSize,  
                keepAliveTime,  
                TimeUnit.MILLISECONDS,  
                new LinkedBlockingQueue<Runnable>()  
                );

但是,除非你確實需要顯式為 ThreadPoolExecutor 定義所有引數,使用 java.util.concurrent.Executors 類中的工廠方法之一會更加方便,正如 ExecutorService 小節所述。

18. 定時執行者服務 ScheduledExecutorService

java.util.concurrent.ScheduledExecutorService 是一個 ExecutorService, 它能夠將任務延後執行,或者間隔固定時間多次執行。 任務由一個工作者執行緒非同步執行,而不是由提交任務給 ScheduledExecutorService 的那個執行緒執行。

ScheduledExecutorService 例子

以下是一個簡單的 ScheduledExecutorService 示例:

ScheduledExecutorService scheduledExecutorService =  
        Executors.newScheduledThreadPool(5);  

ScheduledFuture scheduledFuture =  
    scheduledExecutorService.schedule(new Callable() {  
        public Object call() throws Exception {  
            System.out.println("Executed!");  
            return "Called!";  
        }  
    },  
    5,  
    TimeUnit.SECONDS);

首先一個內建 5 個執行緒的 ScheduledExecutorService 被建立。之後一個 Callable 介面的匿名類示例被建立然後傳遞給 schedule() 方法。後邊的倆引數定義了 Callable 將在 5 秒鐘之後被執行。

ScheduledExecutorService 實現

既然 ScheduledExecutorService 是一個介面,你要用它的話就得使用 java.util.concurrent 包裡對它的某個實現類。ScheduledExecutorService 具有以下實現類:ScheduledThreadPoolExecutor

建立一個 ScheduledExecutorService 如何建立一個 ScheduledExecutorService 取決於你採用的它的實現類。但是你也可以使用 Executors 工廠類來建立一個 ScheduledExecutorService 例項。比如:

ScheduledExecutorService scheduledExecutorService =  

        Executors.newScheduledThreadPool(5);

ScheduledExecutorService 使用

一旦你建立了一個 ScheduledExecutorService,你可以通過呼叫它的以下方法:

  • schedule (Callable task, long delay, TimeUnit timeunit)
  • schedule (Runnable task, long delay, TimeUnit timeunit)
  • scheduleAtFixedRate (Runnable, long initialDelay, long period, TimeUnit timeunit)
  • scheduleWithFixedDelay (Runnable, long initialDelay, long period, TimeUnit timeunit)

下面我們就簡單看一下這些方法。

schedule (Callable task, long delay, TimeUnit timeunit)

這個方法計劃指定的 Callable 在給定的延遲之後執行。這個方法返回一個 ScheduledFuture,通過它你可以在它被執行之前對它進行取消,或者在它執行之後獲取結果。以下是一個示例:

ScheduledExecutorService scheduledExecutorService =  
        Executors.newScheduledThreadPool(5);  

ScheduledFuture scheduledFuture =  
    scheduledExecutorService.schedule(new Callable() {  
        public Object call() throws Exception {  
            System.out.println("Executed!");  
            return "Called!";  
        }  
    },  
    5,  
    TimeUnit.SECONDS);  

System.out.println("result = " + scheduledFuture.get());  

scheduledExecutorService.shutdown();

示例輸出結果:

Executed!
result = Called!

schedule (Runnable task, long delay, TimeUnit timeunit)

除了 Runnable 無法返回一個結果之外,這一方法工作起來就像以一個 Callable 作為一個引數的那個版本的方法一樣,因此 ScheduledFuture.get() 在任務執行結束之後返回 null。

scheduleAtFixedRate (Runnable, long initialDelay, long period, TimeUnit timeunit)

這一方法規劃一個任務將被定期執行。該任務將會在首個 initialDelay 之後得到執行,然後每個 period 時間之後重複執行。如果給定任務的執行丟擲了異常,該任務將不再執行。如果沒有任何異常的話,這個任務將會持續迴圈執行到 ScheduledExecutorService 被關閉。如果一個任務佔用了比計劃的時間間隔更長的時候,下一次執行將在當前執行結束執行才開始。計劃任務在同一時間不會有多個執行緒同時執行。

scheduleWithFixedDelay (Runnable, long initialDelay, long period, TimeUnit timeunit)

除了 period 有不同的解釋之外這個方法和 scheduleAtFixedRate() 非常像。

scheduleAtFixedRate() 方法中,period 被解釋為前一個執行的開始和下一個執行的開始之間的間隔時間。而在本方法中,period 則被解釋為前一個執行的結束和下一個執行的結束之間的間隔。因此這個延遲是執行結束之間的間隔,而不是執行開始之間的間隔。

ScheduledExecutorService 關閉

正如 ExecutorService,在你使用結束之後你需要把 ScheduledExecutorService 關閉掉。否則他將導致 JVM 繼續執行,即使所有其他執行緒已經全被關閉。

你可以使用從 ExecutorService 介面繼承來的 shutdown() 或 shutdownNow() 方法將 ScheduledExecutorService 關閉。參見 ExecutorService 關閉部分以獲取更多資訊。

19. 使用 ForkJoinPool 進行分叉和合並

ForkJoinPool 在 Java 7 中被引入。它和 ExecutorService 很相似,除了一點不同。ForkJoinPool 讓我們可以很方便地把任務分裂成幾個更小的任務,這些分裂出來的任務也將會提交給 ForkJoinPool。任務可以繼續分割成更小的子任務,只要它還能分割。可能聽起來有些抽象,因此本節中我們將會解釋 ForkJoinPool 是如何工作的,還有任務分割是如何進行的。

分叉和合並解釋

在我們開始看 ForkJoinPool 之前我們先來簡要解釋一下分叉和合並的原理。 分叉和合並原理包含兩個遞迴進行的步驟。兩個步驟分別是分叉步驟和合並步驟。

分叉

一個使用了分叉和合並原理的任務可以將自己分叉(分割)為更小的子任務,這些子任務可以被併發執行。如下圖所示:

通過把自己分割成多個子任務,每個子任務可以由不同的 CPU 並行執行,或者被同一個 CPU 上的不同執行緒執行。只有當給的任務過大,把它分割成幾個子任務才有意義。把任務分割成子任務有一定開銷,因此對於小型任務,這個分割的消耗可能比每個子任務併發執行的消耗還要大。

什麼時候把一個任務分割成子任務是有意義的,這個界限也稱作一個閥值。這要看每個任務對有意義閥值的決定。很大程度上取決於它要做的工作的種類。

合併

當一個任務將自己分割成若干子任務之後,該任務將進入等待所有子任務的結束之中。一旦子任務執行結束,該任務可以把所有結果合併到同一個結果。圖示如下:

當然,並非所有型別的任務都會返回一個結果。如果這個任務並不返回一個結果,它只需等待所有子任務執行完畢。也就不需要結果的合併啦。

ForkJoinPool

ForkJoinPool 是一個特殊的執行緒池,它的設計是為了更好的配合 分叉-和-合併 任務分割的工作。ForkJoinPool 也在 java.util.concurrent 包中,其完整類名為 java.util.concurrent.ForkJoinPool。

建立一個 ForkJoinPool

你可以通過其構造子建立一個 ForkJoinPool。作為傳遞給 ForkJoinPool 構造子的一個引數,你可以定義你期望的並行級別。並行級別表示你想要傳遞給 ForkJoinPool 的任務所需的執行緒或 CPU 數量。以下是一個 ForkJoinPool 示例:

ForkJoinPool forkJoinPool = new ForkJoinPool(4);

這個示例建立了一個並行級別為 4 的 ForkJoinPool。

提交任務到 ForkJoinPool

就像提交任務到 ExecutorService 那樣,把任務提交到 ForkJoinPool。你可以提交兩種型別的任務。一種是沒有任何返回值的(一個 “行動”),另一種是有返回值的(一個”任務”)。這兩種型別分別由 RecursiveAction 和 RecursiveTask 表示。接下來介紹如何使用這兩種型別的任務,以及如何對它們進行提交。

RecursiveAction

RecursiveAction 是一種沒有任何返回值的任務。它只是做一些工作,比如寫資料到磁碟,然後就退出了。一個 RecursiveAction 可以把自己的工作分割成更小的幾塊,這樣它們可以由獨立的執行緒或者 CPU 執行。你可以通過繼承來實現一個 RecursiveAction。示例如下:

import java.util.ArrayList;  
import java.util.List;  
import java.util.concurrent.RecursiveAction;  

public class MyRecursiveAction extends RecursiveAction {  

    private long workLoad = 0;  

    public MyRecursiveAction(long workLoad) {  
        this.workLoad = workLoad;  
    }  

    @Override  
    protected void compute() {  

        //if work is above threshold, break tasks up into smaller tasks  
        if(this.workLoad > 16) {  
            System.out.println("Splitting workLoad : " + this.workLoad);  

            List<MyRecursiveAction> subtasks =  
                new ArrayList<MyRecursiveAction>();  

            subtasks.addAll(createSubtasks());  

            for(RecursiveAction subtask : subtasks){  
                subtask.fork();  
            }  

        } else {  
            System.out.println("Doing workLoad myself: " + this.workLoad);  
        }  
    }  

    private List<MyRecursiveAction> createSubtasks() {  
        List<MyRecursiveAction> subtasks =  
            new ArrayList<MyRecursiveAction>();  

        MyRecursiveAction subtask1 = new MyRecursiveAction(this.workLoad / 2);  
        MyRecursiveAction subtask2 = new MyRecursiveAction(this.workLoad / 2);  

        subtasks.add(subtask1);  
        subtasks.add(subtask2);  

        return subtasks;  
    }  

}

例子很簡單。MyRecursiveAction 將一個虛構的 workLoad 作為引數傳給自己的構造子。如果 workLoad 高於一個特定閥值,該工作將被分割為幾個子工作,子工作繼續分割。如果 workLoad 低於特定閥值,該工作將由 MyRecursiveAction 自己執行。你可以這樣規劃一個 MyRecursiveAction 的執行:

MyRecursiveAction myRecursiveAction = new MyRecursiveAction(24);  

forkJoinPool.invoke(myRecursiveAction);

RecursiveTask

RecursiveTask 是一種會返回結果的任務。它可以將自己的工作分割為若干更小任務,並將這些子任務的執行結果合併到一個集體結果。可以有幾個水平的分割和合並。以下是一個 RecursiveTask 示例:

import java.util.ArrayList;  
import java.util.List;  
import java.util.concurrent.RecursiveTask;  


public class MyRecursiveTask extends RecursiveTask<Long> {  

    private long workLoad = 0;  

    public MyRecursiveTask(long workLoad) {  
        this.workLoad = workLoad;  
    }  

    protected Long compute() {  

        //if work is above threshold, break tasks up into smaller tasks  
        if(this.workLoad > 16) {  
            System.out.println("Splitting workLoad : " + this.workLoad);  

            List<MyRecursiveTask> subtasks =  
                new ArrayList<MyRecursiveTask>();  
            subtasks.addAll(createSubtasks());  

            for(MyRecursiveTask subtask : subtasks){  
                subtask.fork();  
            }  

            long result = 0;  
            for(MyRecursiveTask subtask : subtasks) {  
                result += subtask.join();  
            }  
            return result;  

        } else {  
            System.out.println("Doing workLoad myself: " + this.workLoad);  
            return workLoad * 3;  
        }  
    }  

    private List<MyRecursiveTask> createSubtasks() {  
        List<MyRecursiveTask> subtasks =  
        new ArrayList<MyRecursiveTask>();  

        MyRecursiveTask subtask1 = new MyRecursiveTask(this.workLoad / 2);  
        MyRecursiveTask subtask2 = new MyRecursiveTask(this.workLoad / 2);  

        subtasks.add(subtask1);  
        subtasks.add(subtask2);  

        return subtasks;  
    }  
}

除了有一個結果返回之外,這個示例和 RecursiveAction 的例子很像。MyRecursiveTask 類繼承自 RecursiveTask<Long>,這也就意味著它將返回一個 Long 型別的結果。

MyRecursiveTask 示例也會將工作分割為子任務,並通過 fork() 方法對這些子任務計劃執行。

此外,本示例還通過呼叫每個子任務的 join() 方法收集它們返回的結果。子任務的結果隨後被合併到一個更大的結果,並最終將其返回。對於不同級別的遞迴,這種子任務的結果合併可能會發生遞迴。

你可以這樣規劃一個 RecursiveTask:

MyRecursiveTask myRecursiveTask = new MyRecursiveTask(128);  

long mergedResult = forkJoinPool.invoke(myRecursiveTask);  

System.out.println("mergedResult = " + mergedResult);

注意是如何通過 ForkJoinPool.invoke() 方法的呼叫來獲取最終執行結果的。

ForkJoinPool 評論

貌似並非每個人都對 Java 7 裡的 ForkJoinPool 滿意:《一個 Java 分叉-合併 帶來的災禍》。

在你計劃在自己的專案裡使用 ForkJoinPool 之前最好讀一下該篇文章。

20. 鎖 Lock

java.util.concurrent.locks.Lock 是一個類似於 synchronized 塊的執行緒同步機制。但是 Lock 比 synchronized 塊更加靈活、精細。順便說一下,在我的《Java 併發指南》中我對如何實現你自己的鎖進行了描述。

Java Lock 例子

既然 Lock 是一個介面,在你的程式裡需要使用它的實現類之一來使用它。以下是一個簡單示例:

Lock lock = new ReentrantLock();  

lock.lock();  

//critical section  

lock.unlock();

首先建立了一個 Lock 物件。之後呼叫了它的 lock() 方法。這時候這個 lock 例項就被鎖住啦。任何其他再過來呼叫 lock() 方法的執行緒將會被阻塞住,直到鎖定 lock 例項的執行緒呼叫了 unlock() 方法。最後 unlock() 被呼叫了,lock 物件解鎖了,其他執行緒可以對它進行鎖定了。

Java Lock 實現

java.util.concurrent.locks 包提供了以下對 Lock 介面的實現類:

  • ReentrantLock

Lock 和 synchronized 程式碼塊的主要不同點

一個 Lock 物件和一個 synchronized 程式碼塊之間的主要不同點是:

  • synchronized 程式碼塊不能夠保證進入訪問等待的執行緒的先後順序。
  • 你不能夠傳遞任何引數給一個 synchronized 程式碼塊的入口。因此,對於 synchronized 程式碼塊的訪問等待設定超時時間是不可能的事情。
  • synchronized 塊必須被完整地包含在單個方法裡。而一個 Lock 物件可以把它的 lock() 和 unlock() 方法的呼叫放在不同的方法裡。

Lock 的方法

Lock 介面具有以下主要方法:

  • lock()
  • lockInterruptibly()
  • tryLock()
  • tryLock(long timeout, TimeUnit timeUnit)
  • unlock()

lock() 將 Lock 例項鎖定。如果該 Lock 例項已被鎖定,呼叫 lock() 方法的執行緒將會阻塞,直到 Lock 例項解鎖。

lockInterruptibly() 方法將會被呼叫執行緒鎖定,除非該執行緒被打斷。此外,如果一個執行緒在通過這個方法來鎖定 Lock 物件時進入阻塞等待,而它被打斷了的話,該執行緒將會退出這個方法呼叫。

tryLock() 方法試圖立即鎖定 Lock 例項。如果鎖定成功,它將返回 true,如果 Lock 例項已被鎖定該方法返回 false。這一方法永不阻塞。tryLock(long timeout, TimeUnit timeUnit) 的工作類似於 tryLock() 方法,除了它在放棄鎖定 Lock 之前等待一個給定的超時時間之外。

unlock() 方法對 Lock 例項解鎖。一個 Lock 實現將只允許鎖定了該物件的執行緒來呼叫此方法。其他(沒有鎖定該 Lock 物件的執行緒)執行緒對 unlock() 方法的呼叫將會拋一個未檢查異常(RuntimeException)。

21. 讀寫鎖 ReadWriteLock

java.util.concurrent.locks.ReadWriteLock 讀寫鎖是一種先進的執行緒鎖機制。它能夠允許多個執行緒在同一時間對某特定資源進行讀取,但同一時間內只能有一個執行緒對其進行寫入。

讀寫鎖的理念在於多個執行緒能夠對一個共享資源進行讀取,而不會導致併發問題。併發問題的發生場景在於對一個共享資源的讀和寫操作的同時進行,或者多個寫操作併發進行。

本節只討論 Java 內建 ReadWriteLock。如果你想了解 ReadWriteLock 背後的實現原理,請參考我的《Java 併發指南》主題中的《讀寫鎖》小節。

ReadWriteLock 鎖規則

一個執行緒在對受保護資源在讀或者寫之前對 ReadWriteLock 鎖定的規則如下:

  • 讀鎖:如果沒有任何寫操作執行緒鎖定 ReadWriteLock,並且沒有任何寫操作執行緒要求一個寫鎖(但還沒有獲得該鎖)。因此,可以有多個讀操作執行緒對該鎖進行鎖定。
  • 寫鎖:如果沒有任何讀操作或者寫操作。因此,在寫操作的時候,只能有一個執行緒對該鎖進行鎖定。

ReadWriteLock 實現

ReadWriteLock 是個介面,如果你想用它的話就得去使用它的實現類之一。java.util.concurrent.locks 包提供了 ReadWriteLock 介面的以下實現類:

  • ReentrantReadWriteLock

ReadWriteLock 程式碼示例

以下是 ReadWriteLock 的建立以及如何使用它進行讀、寫鎖定的簡單示例程式碼:

ReadWriteLock readWriteLock = new ReentrantReadWriteLock();  


readWriteLock.readLock().lock();  

    // multiple readers can enter this section  
    // if not locked for writing, and not writers waiting  
    // to lock for writing.  

readWriteLock.readLock().unlock();  


readWriteLock.writeLock().lock();  

    // only one writer can enter this section,  
    // and only if no threads are currently reading.  

readWriteLock.writeLock().unlock();

注意如何使用 ReadWriteLock 對兩種鎖例項的持有。一個對讀訪問進行保