1. 程式人生 > >Java並發(一)——線程安全的容器(上)

Java並發(一)——線程安全的容器(上)

med ole 復雜 出隊 表示 就是 nsh 附加 構造方法

Java中線程安全的容器主要包括兩類:

  • VectorHashtable,以及封裝器類Collections.synchronizedListCollections.synchronizedMap
  • Java 5.0引入的java.util.concurrent包,其中包含並發隊列、並發HashMap以及寫入時復制容器。

依筆者看,早期使用的同步容器主要有兩方面的問題:1)通過對方法添加synchronized關鍵字實現同步,這種粗粒度的加鎖操作在synchronized關鍵字本身未充分優化之前,效率偏低;2)同步容器雖然是線程安全的,但在某些外部復合操作(例:若沒有則添加)時,依然需要客戶端加鎖保證數據安全。因此,從Java 5.0以後,並發編程偏向於使用java.util.concurrent

包(作者:Doug Lea)中的容器類,本文也將著重介紹該包中的容器類,主要包括:

  1. 阻塞隊列
  2. ConcurrentHashMap
  3. 寫入時復制容器

一、阻塞隊列

在並發環境下,阻塞隊列是常用的數據結構,它能確保數據高效安全的傳輸,為快速搭建高質量的多線程應用帶來極大的便利,比如MQ的原理就是基於阻塞隊列的。java.util.concurrent中包含豐富的隊列實現,它們之間的關系如下圖所示:

技術分享圖片

  • BlockingQueue、Deque(雙向隊列)繼承自Queue接口;
  • BlockingDeque同時繼承自BlockingQueue、Deque接口,提供阻塞的雙向隊列屬性;
  • LinkedBlockingQueue和LinkedBlockingDeque分別實現了BlockingQueue和BlockingDeque接口;
  • DelayQueue實現了BlockingQueue接口,提供任務延遲功能;
  • TransferQueue是Java 7引入的,用於替代BlockingQueue,LinkedTransferQueue是其實現類。

下面對這些隊列進行詳細的介紹:

1.1 BlockingQueue與BlockingDeque

阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:

  • 在隊列為空時,獲取元素的線程會等待隊列變為非空。
  • 當隊列滿時,存儲元素的線程會等待隊列可用。

阻塞隊列常用於生產者和消費者的場景,生產者是往隊列裏添加元素的線程,消費者是從隊列裏拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器裏拿元素。

阻塞隊列提供了四種處理方法:

方法 拋出異常 返回特殊值 一直阻塞 超時退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
檢查方法 element() peek() 不可用 不可用
  • 拋出異常:是指當阻塞隊列滿時候,再往隊列裏插入元素,會拋出IllegalStateException("Queue full")異常。當隊列為空時,從隊列裏獲取元素時會拋出NoSuchElementException異常 。
  • 返回特殊值:插入方法會返回是否成功,成功則返回true。移除方法,則是從隊列裏拿出一個元素,如果沒有則返回null
  • 一直阻塞:當阻塞隊列滿時,如果生產者線程往隊列裏put元素,隊列會一直阻塞生產者線程,直到拿到數據,或者響應中斷退出。當隊列空時,消費者線程試圖從隊列裏take元素,隊列也會阻塞消費者線程,直到隊列可用。
  • 超時退出:當阻塞隊列滿時,隊列會阻塞生產者線程一段時間,如果超過一定的時間,生產者線程就會退出。

BlockingDeque在BlockingQueue的基礎上,增加了支持雙向隊列的屬性。如下圖所示,相比於BlockingQueue的插入和移除方法,變為XxxFirstXxxLast方法,分別對應隊列的兩端,既可以在頭部添加或移除,也可以在尾部添加或移除。

技術分享圖片

1.2 LinkedBlockingQueue與LinkedBlockingDeque

LinkedBlockingQueue是一個用鏈表實現的有界阻塞隊列。此隊列的默認和最大長度為Integer.MAX_VALUE,按照先進先出的原則對元素進行排序。

首先看下LinkedBlockingQueue中核心的域:

static class Node<E> {
    E item;
    Node<E> next;
    Node(E x) { item = x; }
}

private final int capacity;
private final AtomicInteger count = new AtomicInteger();

transient Node<E> head;
private transient Node<E> last;

private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
  • LinkedBlockingQueueLinkedList類似,通過靜態內部類Node<E>進行元素的存儲;
  • capacity表示阻塞隊列所能存儲的最大容量,在創建時可以手動指定最大容量,默認的最大容量為Integer.MAX_VALUE
  • count表示當前隊列中的元素數量,LinkedBlockingQueue的入隊列和出隊列使用了兩個不同的lock對象,因此無論是在入隊列還是出隊列,都會涉及對元素數量的並發修改,因此這裏使用了一個原子操作類來解決對同一個變量進行並發修改的線程安全問題。
  • headlast分別表示鏈表的頭部和尾部;
  • takeLock表示元素出隊列時線程所獲取的鎖,當執行takepoll等操作時線程獲取;notEmpty當隊列為空時,通過該Condition讓獲取元素的線程處於等待狀態;
  • putLock表示元素入隊列時線程所獲取的鎖,當執行putoffer等操作時獲取;notFull當隊列容量達到capacity時,通過該Condition讓加入元素的線程處於等待狀態。

其次,LinkedBlockingQueue有三個構造方法,分別如下:

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

public LinkedBlockingQueue(Collection<? extends E> c) {
    this(Integer.MAX_VALUE);
    final ReentrantLock putLock = this.putLock;
    putLock.lock(); // Never contended, but necessary for visibility
    try {
        int n = 0;
        for (E e : c) {
            if (e == null)
                throw new NullPointerException();
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            enqueue(new Node<E>(e));
            ++n;
        }
        count.set(n);
    } finally {
        putLock.unlock();
    }
}

默認構造函數直接調用LinkedBlockingQueue(int capacity)LinkedBlockingQueue(int capacity)會初始化首尾節點,並置位null。LinkedBlockingQueue(Collection<? extends E> c)在初始化隊列的同時,將一個集合的全部元素加入隊列。

最後分析下puttake的過程,這裏重點關註:LinkedBlockingQueue如何實現添加/移除並行的?

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

之所以把puttake放在一起,是因為它們是一對互逆的過程:

  • put在插入元素前首先獲得putLock和當前隊列的元素數量,take在去除元素前首先獲得takeLock和當前隊列的元素數量;
  • put時需要判斷當前隊列是否已滿,已滿時當前線程進行等待,take時需要判斷隊列是否已空,隊列為空時當前線程進行等待;
  • put調用enqueue在隊尾插入元素,並修改尾指針,take調用dequeuehead指向原來first的位置,並將first的數據域置位null,實現刪除原first指針,並產生新的head,同時,切斷原head節點的引用,便於垃圾回收。

    private void enqueue(Node<E> node) {
    last = last.next = node;
    }
    private E dequeue() {
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
    }
  • 最後,put根據count決定是否觸發隊列未滿和隊列空;take根據count決定是否觸發隊列未空和隊列滿。

回到剛才的問題:LinkedBlockingQueue如何實現添加/移除並行的?
LinkedBlockingQueue在入隊列和出隊列時使用的是不同的Lock,這也意味著它們之間的操作不會存在互斥。在多個CPU的情況下,可以做到在同一時刻既消費、又生產,做到並行處理

同樣的,LinkedBlockingDequeLinkedBlockingQueue的基礎上,增加了雙向操作的屬性。繼續以puttake為例,LinkedBlockingDeque增加了putFirst/putLasttakeFirst/takeLast方法,分別用於在隊列頭、尾進行添加和刪除。與LinkedBlockingQueue不同的是,LinkedBlockingDeque的入隊列和出隊列不再使用不同的Lock。

final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();

其中,lock表示讀寫的主鎖,notEmpty和notFull依然表示相應的控制線程狀態條件量。以putFirsttakeFirst為例:

public void putFirst(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        while (!linkFirst(node))
            notFull.await();
    } finally {
        lock.unlock();
    }
}

putFirst不支持插入null元素,首先新建一個Node對象,然後調用ReentrantLocklock方法獲取鎖,插入操作通過boolean linkFirst(Node<E> node)實現,如果當前隊列頭已滿,那麽該線程等待(linkFirst方法在寫入元素成功後會釋放該鎖信號),最後,在finally塊中釋放鎖(ReentrantLock的使用)。

public E takeFirst() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E x;
        while ( (x = unlinkFirst()) == null)
            notEmpty.await();
        return x;
    } finally {
        lock.unlock();
    }
}

putFirst類似,takeFirst首先獲取鎖,然後在try中解除尾元素對象的引用,如果unlinkFirst為空,表示隊列為空,沒有元素可刪,那麽該線程等待。同樣,最後在finally塊中釋放鎖。

那麽問題來了,LinkedBlockingDeque為什麽不使用LinkedBlockingQueue讀寫鎖分離的方式呢?LinkedBlockingDequeLinkedBlockingQueue的使用場景有什麽區別呢?

1.3 DelayQueue

DelayQueue主要用於實現延時任務,比如:等待一段時間之後關閉連接,緩存對象過期刪除,任務超時處理等等,這些任務的共同特點是等待一段時間之後執行(類似於TimerTask)。DelayQueue的實現包括三個核心特征:

  • 延時任務:DelayQueue的泛型類需要繼承自Delayed接口,而Delayed接口繼承自Comparable<Delayed>,用於隊列中優先排序的比較;
  • 優先隊列:DelayQueue的實現采用了優先隊列PriorityQueue,即延遲時間越短的任務越優先(回憶下優先隊列中二叉堆的實現)。
  • 阻塞隊列:支持並發讀寫,采用ReentrantLock來實現讀寫的鎖操作。

因此,DelayQueue = Delayed + PriorityQueue + BlockingQueue

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
    private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    private Thread leader = null;
    private final Condition available = lock.newCondition();
}

接下來看下DelayQueue的讀寫操作如何實現延時任務的?

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

首先執行加鎖操作,然後往優先隊列中插入元素e,優先隊列會調用泛型E的compareTo方法進行比較(具體關於二叉堆的操作,這裏不再贅述,請參考數據結構部分相關分析),將延遲時間最短的任務添加到隊頭。最後檢查下元素是否為隊頭,如果是隊頭的話,設置leader為空,喚醒所有等待的隊列,釋放鎖。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                first = null; // don‘t retain ref while waiting
                if (leader != null)
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}
  • 首先執行加鎖操作,然後取出優先隊列的隊頭,如果對頭為空,則該線程阻塞;
  • 獲得對頭元素的延遲時間,如果延遲時間小於等於0,說明該元素已經到了可以使用的時間,調用poll方法彈出該元素;
  • 在延遲時間大於0時,首先釋放元素first的引用(避免內存泄露),其次判斷如果leader線程不為空,則該線程阻塞(表示已有線程在等待)。否則,把當前線程賦值給leader元素,然後阻塞delay的時間,即等待隊頭到達延遲時間,在finally塊中釋放leader元素的引用。循環後,取出對頭元素,退出for循環。
  • 最後,如果leader為空並且優先級隊列不為空的情況下(判斷還有沒有其他後續節點),調用signal通知其他的線程,並執行解鎖操作。

1.4 TransferQueue與LinkedTransferQueue

TransferQueue是一個繼承了BlockingQueue的接口,並且增加若幹新的方法。LinkedTransferQueueTransferQueue接口的實現類,其定義為一個無界的隊列,具有先進先出(FIFO)的特性。

TransferQueue接口主要包含以下方法:

public interface TransferQueue<E> extends BlockingQueue<E> {
    boolean tryTransfer(E e);
    void transfer(E e) throws InterruptedException;
    boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
    boolean hasWaitingConsumer();
    int getWaitingConsumerCount();
}
  • transfer(E e):若當前存在一個正在等待獲取的消費者線程,即立刻移交之;否則,會插入當前元素e到隊列尾部,並且等待進入阻塞狀態,到有消費者線程取走該元素。
  • tryTransfer(E e):若當前存在一個正在等待獲取的消費者線程(使用take()或者poll()函數),使用該方法會即刻轉移/傳輸對象元素e;若不存在,則返回false,並且不進入隊列。這是一個不阻塞的操作。
  • tryTransfer(E e, long timeout, TimeUnit unit):若當前存在一個正在等待獲取的消費者線程,會立即傳輸給它;否則將插入元素e到隊列尾部,並且等待被消費者線程獲取消費掉;若在指定的時間內元素e無法被消費者線程獲取,則返回false,同時該元素被移除。
  • hasWaitingConsumer():判斷是否存在消費者線程。
  • getWaitingConsumerCount():獲取所有等待獲取元素的消費線程數量。

LinkedTransferQueue實現了上述方法,較之於LinkedBlockingQueue在隊列滿時,入隊操作會被阻塞的特性,LinkedTransferQueue在隊列不滿時也可以阻塞,只要沒有消費者使用元素。下面來看下LinkedTransferQueue的入隊和和出隊操作:transfertake方法。

public void transfer(E e) throws InterruptedException {
    if (xfer(e, true, SYNC, 0) != null) {
        Thread.interrupted(); // failure possible only due to interrupt
        throw new InterruptedException();
    }
}
public E take() throws InterruptedException {
    E e = xfer(null, false, SYNC, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}

LinkedTransferQueue入隊和和出隊都使用了一個關鍵方法:

private E xfer(E e, boolean haveData, int how, long nanos) {}

其中,E表示被操作的元素,haveDatatrue表示添加數據,false表示移除數據;how有四種取值:NOW, ASYNC, SYNC, 或者TIMED,分別表示執行的時機;nanos表示howTIMED時的時間限制。
xfer方法具體流程較為復雜,這裏不再展開。另外,LinkedTransferQueue采用了CAS非阻塞同步機制,後面會具體講到)

Java並發(一)——線程安全的容器(上)