1. 程式人生 > >Java 併發工具箱之concurrent包

Java 併發工具箱之concurrent包

概述

java.util.concurrent 包是專為 Java併發程式設計而設計的包。包下的所有類可以分為如下幾大類:

  • locks部分:顯式鎖(互斥鎖和速寫鎖)相關;
  • atomic部分:原子變數類相關,是構建非阻塞演算法的基礎;
  • executor部分:執行緒池相關;
  • collections部分:併發容器相關;
  • tools部分:同步工具相關,如訊號量、閉鎖、柵欄等功能;

類圖結構:

J.U.C

BlockingQueue

此介面是一個執行緒安全存取例項的佇列。

使用場景

BlockingQueue通常用於一個執行緒生產物件,而另外一個執行緒消費這些物件的場景。

BlockingQueue

注意事項:

  • 此佇列是有限的,如果佇列到達臨界點,Thread1就會阻塞,直到Thread2從佇列中拿走一個物件。
  • 若果佇列是空的,Thread2會阻塞,直到Thread1把一個物件丟進佇列。

相關方法

BlockingQueue中包含了如下操作方法:

Throws Exception Special Value Blocks Times Out
Insert add(o) offer(o) put(o) offer(o, timeout, timeunit)
Remove remove(o) poll() take() poll(timeout, timeunit)
Examine element() peek()

名詞解釋:

  • Throws Exception: 如果試圖的操作無法立即執行,拋一個異常。
  • Special Value: 如果試圖的操作無法立即執行,返回一個特定的值(常常是 true / false)。
  • Blocks: 如果試圖的操作無法立即執行,該方法呼叫將會發生阻塞,直到能夠執行。
  • Times Out: 如果試圖的操作無法立即執行,該方法呼叫將會發生阻塞,直到能夠執行,但等待時間不會超過給定值。返回一個特定值以告知該操作是否成功(典型的是 true / false)。

注意事項:

  • 無法插入 null,否則會丟擲一個 NullPointerException。
  • 佇列這種資料結構,導致除了獲取開始和結尾位置的其他物件的效率都不高,雖然可通過remove(o)來移除任一物件。

實現類

因為是一個介面,所以我們必須使用一個實現類來使用它,有如下實現類:

  • ArrayBlockingQueue: 陣列阻塞佇列
  • DelayQueue: 延遲佇列
  • LinkedBlockingQueue: 鏈阻塞佇列
  • PriorityBlockingQueue: 具有優先順序的阻塞佇列
  • SynchronousQueue: 同步佇列

使用示例:

ArrayBlockingQueue

ArrayBlockingQueue 是一個有界的阻塞佇列

  • 內部實現是將物件放到一個數組裡。陣列有個特性:一旦初始化,大小就無法修改。因此無法修改ArrayBlockingQueue初始化時的上限。
  • ArrayBlockingQueue 內部以 FIFO(先進先出)的順序對元素進行儲存。佇列中的頭元素在所有元素之中是放入時間最久的那個,而尾元素則是最短的那個。

DelayQueue

DelayQueue 對元素進行持有直到一個特定的延遲到期。注入其中的元素必須實現 java.util.concurrent.Delayed 介面:

public interface Delayed extends Comparable<Delayed< {  
 public long getDelay(TimeUnit timeUnit);  // 返回將要延遲的時間段
}
  • 在每個元素的 getDelay() 方法返回的值的時間段之後才釋放掉該元素。如果返回的是 0 或者負值,延遲將被認為過期,該元素將會在 DelayQueue 的下一次 take 被呼叫的時候被釋放掉。
  • Delayed 介面也繼承了 java.lang.Comparable 介面,Delayed物件之間可以進行對比。這對DelayQueue 佇列中的元素進行排序時有用,因此它們可以根據過期時間進行有序釋放。

LinkedBlockingQueue

內部以一個鏈式結構(連結節點)對其元素進行儲存 。

  • 可以選擇一個上限。如果沒有定義上限,將使用 Integer.MAX_VALUE 作為上限。
  • 內部以 FIFO(先進先出)的順序對元素進行儲存。

PriorityBlockingQueue

一個無界的併發佇列,它使用了和類 java.util.PriorityQueue 一樣的排序規則。

  • 無法向這個佇列中插入 null 值。
  • 插入到 其中的元素必須實現 java.lang.Comparable 介面。
  • 對於具有相等優先順序(compare() == 0)的元素並不強制任何特定行為。
  • 從一個 PriorityBlockingQueue 獲得一個 Iterator 的話,該 Iterator 並不能保證它對元素的遍歷是以優先順序為序的。

SynchronousQueue

一個特殊的佇列,它的內部同時只能夠容納單個元素。

  • 如果該佇列已有一元素的話,試圖向佇列中插入一個新元素的執行緒將會阻塞,直到另一個執行緒將該元素從佇列中抽走。
  • 如果該佇列為空,試圖向佇列中抽取一個元素的執行緒將會阻塞,直到另一個執行緒向佇列中插入了一條新的元素。

BlockingDeque

此介面表示一個執行緒安全放入和提取例項的雙端佇列

使用場景

通常用在一個執行緒既是生產者又是消費者的時候。
BlockingDeque

注意事項

  • 如果雙端佇列已滿,插入執行緒將被阻塞,直到一個移除執行緒從該佇列中移出了一個元素。
  • 如果雙端佇列為空,移除執行緒將被阻塞,直到一個插入執行緒向該佇列插入了一個新元素。

相關方法

Throws Exception Special Value Blocks Times Out
Insert addFirst(o) offerFirst(o) putFirst(o) offerFirst(o, timeout, timeunit)
Remove removeFirst(o) pollFirst(o) takeFirst(o) pollFirst(timeout, timeunit)
Examine getFirst(o) peekFirst(o)
Throws Exception Special Value Blocks Times Out
Insert addLast(o) offerLast(o) putLast(o) offerLast(o, timeout, timeunit)
Remove removeLast(o) pollLast(o) takeLast(o) pollLast(timeout, timeunit)
Examine getLast(o) peekLast(o)

注意事項

  • 關於方法的處理方式和上節一樣。
  • BlockingDeque 介面繼承自 BlockingQueue 介面,可以用其中定義的方法。

實現類

  • LinkedBlockingDeque : 鏈阻塞雙端佇列

LinkedBlockingDeque

LinkedBlockingDeque 是一個雙端佇列,可以從任意一端插入或者抽取元素的佇列。

  • 在它為空的時候,一個試圖從中抽取資料的執行緒將會阻塞,無論該執行緒是試圖從哪一端抽取資料。

ConcurrentMap

一個能夠對別人的訪問(插入和提取)進行併發處理的 java.util.Map介面。
ConcurrentMap 除了從其父介面 java.util.Map 繼承來的方法之外還有一些額外的原子性方法。

實現類

因為是介面,必須用實現類來使用它,其實現類為

  • ConcurrentHashMap

ConcurrentHashMap與HashTable比較

  • 更好的併發效能,在你從中讀取物件的時候 ConcurrentHashMap 並不會把整個 Map 鎖住,只是把 Map 中正在被寫入的部分進行鎖定。
  • 在被遍歷的時候,即使是 ConcurrentHashMap 被改動,它也不會拋 ConcurrentModificationException。

ConcurrentNavigableMap

一個支援併發訪問的 java.util.NavigableMap,它還能讓它的子 map 具備併發訪問的能力。

headMap

headMap(T toKey) 方法返回一個包含了小於給定 toKey 的 key 的子 map。

tailMap

tailMap(T fromKey) 方法返回一個包含了不小於給定 fromKey 的 key 的子 map。

subMap

subMap() 方法返回原始 map 中,鍵介於 from(包含) 和 to (不包含) 之間的子 map。

更多方法

  • descendingKeySet()
  • descendingMap()
  • navigableKeySet()

CountDownLatch

CountDownLatch 是一個併發構造,它允許一個或多個執行緒等待一系列指定操作的完成。

  • CountDownLatch 以一個給定的數量初始化。countDown() 每被呼叫一次,這一數量就減一。
  • 通過呼叫 await() 方法之一,執行緒可以阻塞等待這一數量到達零。

CyclicBarrier

CyclicBarrier 類是一種同步機制,它能夠對處理一些演算法的執行緒實現同步。

Exchanger

Exchanger 類表示一種兩個執行緒可以進行互相交換物件的會和點。

Semaphore

Semaphore 類是一個計數訊號量。具備兩個主要方法:

  • acquire()
  • release()
  • 每呼叫一次 acquire(),一個許可會被呼叫執行緒取走。
  • 每呼叫一次 release(),一個許可會被返還給訊號量。

Semaphore 用法

  • 保護一個重要(程式碼)部分防止一次超過 N 個執行緒進入。
  • 在兩個執行緒之間傳送訊號。

保護重要部分

如果你將訊號量用於保護一個重要部分,試圖進入這一部分的程式碼通常會首先嚐試獲得一個許可,然後才能進入重要部分(程式碼塊),執行完之後,再把許可釋放掉。

Semaphore semaphore = new Semaphore(1);  
//critical section  
semaphore.acquire();  
...  
semaphore.release();

線上程之間傳送訊號

如果你將一個訊號量用於在兩個執行緒之間傳送訊號,通常你應該用一個執行緒呼叫 acquire() 方法,而另一個執行緒呼叫 release() 方法。

  • 如果沒有可用的許可,acquire() 呼叫將會阻塞,直到一個許可被另一個執行緒釋放出來。
  • 如果無法往訊號量釋放更多許可時,一個 release() 呼叫也會阻塞。

公平性

無法擔保掉第一個呼叫 acquire() 的執行緒會是第一個獲得一個許可的執行緒。

可以通過如下來強制公平:

Semaphore semaphore = new Semaphore(1, true); 
  • 需要注意,強制公平會影響到併發效能,建議不使用。

ExecutorService

存在於 java.util.concurrent 包裡的 ExecutorService 實現就是一個執行緒池實現。

實現類

此介面實現類包括:

  • ScheduledThreadPoolExecutor : 通過 Executors.newScheduledThreadPool(10)建立的
  • ThreadPoolExecutor: 除了第一種的其他三種方式建立的

相關方法

  • execute(Runnable):
    無法得知被執行的 Runnable 的執行結果
  • submit(Runnable):
    返回一個 Future 物件,可以知道Runnable 是否執行完畢。
  • submit(Callable):
    Callable 例項除了它的 call() 方法能夠返回一個結果,通過Future可以獲取。
  • invokeAny(…):
    傳入一系列的 Callable 或者其子介面的例項物件,無法保證返回的是哪個 Callable 的結果 ,只能表明其中一個已執行結束。
    如果其中一個任務執行結束(或者拋了一個異常),其他 Callable 將被取消。
  • invokeAll(…):
    返回一系列的 Future 物件,通過它們你可以獲取每個 Callable 的執行結果。

關閉ExecutorService

  • shutdown() : 不會立即關閉,但它將不再接受新的任務
  • shutdownNow(): 立即關閉

ThreadPoolExecutor

  • ThreadPoolExecutor 使用其內部池中的執行緒執行給定任務(Callable 或者 Runnable)。

ScheduledExecutorService(介面,其實現類為ScheduledThreadPoolExecutor)

  • ScheduledExecutorService能夠將任務延後執行,或者間隔固定時間多次執行。
  • ScheduledExecutorService中的 任務由一個工作者執行緒非同步執行,而不是由提交任務給 ScheduledExecutorService 的那個執行緒執行。

相關方法

  • schedule (Callable task, long delay, TimeUnit timeunit):
    Callable 在給定的延遲之後執行,並返回結果。
  • schedule (Runnable task, long delay, TimeUnit timeunit)
    除了 Runnable 無法返回一個結果之外,和第一個方法類似。
  • scheduleAtFixedRate (Runnable, long initialDelay, long period, TimeUnit timeunit)
    這一方法規劃一個任務將被定期執行。該任務將會在首個 initialDelay 之後得到執行,然後每個 period 時間之後重複執行。
    period 被解釋為前一個執行的開始和下一個執行的開始之間的間隔時間。
  • scheduleWithFixedDelay (Runnable, long initialDelay, long period, TimeUnit timeunit)
    和上一個方法類似,只是period 則被解釋為前一個執行的結束和下一個執行的結束之間的間隔。

ForkJoinPool

ForkJoinPool 在 Java 7 中被引入。它和 ExecutorService 很相似,除了一點不同。ForkJoinPool 讓我們可以很方便地把任務分裂成幾個更小的任務,這些分裂出來的任務也將會提交給 ForkJoinPool。

Lock

Lock 是一個類似於 synchronized 塊的執行緒同步機制。但是 Lock 比 synchronized 塊更加靈活、精細。

實現類

Lock是一個介面,其實現類包括:

  • ReentrantLock

示例

Lock lock = new ReentrantLock();  
lock.lock();  
//critical section  
lock.unlock();
  • 呼叫lock() 方法之後,這個 lock 例項就被鎖住啦。
  • 當lock示例被鎖後,任何其他再過來呼叫 lock() 方法的執行緒將會被阻塞住,直到呼叫了unlock() 方法。
  • unlock() 被呼叫了,lock 物件解鎖了,其他執行緒可以對它進行鎖定了。

Lock 和 synchronized區別

  • synchronized 程式碼塊不能夠保證進入訪問等待的執行緒的先後順序。
  • 你不能夠傳遞任何引數給一個 synchronized 程式碼塊的入口。因此,對於 synchronized 程式碼塊的訪問等待設定超時時間是不可能的事情。
  • synchronized 塊必須被完整地包含在單個方法裡。而一個 Lock 物件可以把它的 lock() 和 unlock() 方法的呼叫放在不同的方法裡。

ReadWriteLock

讀寫鎖一種先進的執行緒鎖機制。

  • 允許多個執行緒在同一時間對某特定資源進行讀取,
  • 但同一時間內只能有一個執行緒對其進行寫入。

實現類

  • ReentrantReadWriteLock

規則

  • 如果沒有任何寫操作鎖定,那麼可以有多個讀操作鎖定該鎖
  • 如果沒有任何讀操作或者寫操作,只能有一個寫執行緒對該鎖進行鎖定。

示例:

ReadWriteLock readWriteLock = new ReentrantReadWriteLock();  
readWriteLock.readLock().lock();  
    // multiple readers can enter this section  
    // if not locked for writing, and not writers waiting  
    // to lock for writing.  
readWriteLock.readLock().unlock();  
readWriteLock.writeLock().lock();  
    // only one writer can enter this section,  
    // and only if no threads are currently reading.  
readWriteLock.writeLock().unlock();

更多原子性包裝類

位於 atomic包下,包含一系列原子性變數。

  • AtomicBoolean
  • AtomicInteger
  • AtomicLong
  • AtomicReference