1. 程式人生 > >原始碼解析關於java阻塞容器:ArrayBlockingQueue,LinkedBlockingQueue等

原始碼解析關於java阻塞容器:ArrayBlockingQueue,LinkedBlockingQueue等


Java的阻塞容器介紹(JDK1.8)

先來看看阻塞容器和其他容器之間的層級關係

  1. Collection
    1. AbstractCollection
    2. Queue
      1. BlockingQueue
      2. AbstractQueue
        1. ArrayBlockingQueue
        2. LinkedBlockingQueue
        3. SynchronousQueue
        4. PriorityBlockingQueue

我們就挑這四個重要的實現類來講解。


一、ArrayBlockingQueue

1.類的定義

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable 

正如上面層級顯示的,java的容器都是支援泛型的,可序列化的

2.重要的成員變數

/** The queued items */
final Object[] items;// 核心陣列

/** items index for next take, poll, peek or remove */
int takeIndex;// 出隊索引,出隊操作都會使用

/** items index for next put, offer, or add */
int putIndex;// 入隊索引, 入隊操作都會使用

/** Number of elements in the queue */
int count;// 集合中元素的個數

/*
 * Concurrency control uses the classic two-condition algorithm
 * found in any textbook.
 */

/** Main lock guarding all access */
final ReentrantLock lock;// 併發的關鍵,是一個final的可重入鎖

/** Condition for waiting takes */
private final Condition notEmpty;// 非空條件(用於通知消費者消費)

/** Condition for waiting puts */
private final Condition notFull;// 非滿條件(用於通知生產者生產)

/**
 * Shared state for currently active iterators, or null if there
 * are known not to be any.  Allows queue operations to update
 * iterator state.
 */
transient Itrs itrs = null;// 這個Itrs是一個內部類,內部包裝了一個迭代器

關於Condition物件,它有兩個重要的方法signal(),await(),用於當滿足條件時,對相關執行緒的喚醒和等待,而且要建立Condition的物件需要呼叫Lock的newCondition(),方法會new一個ConditionObject作為預設的實現,ConditonObject是個AQS的內部類,一般這麼使用就行了。
可以直接理解為Object物件中的wait(), notify()方法,但是Condition物件可以將對執行緒的喚醒等待進行更細化的管理,但是條件是必須用在Lock物件的lock(),unlock()方法之間。

是現線上程之間通訊更推薦的方法。

3.初始化

ArrayBlockingQueue 提供了三個構造器供使用者使用

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);// 直接呼叫了過載的版本,預設非公平鎖
}

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)// 對傳入的容量引數進行合法的校驗
        throw new IllegalArgumentException();
    this.items = new Object[capacity];// 從這裡以及名字上可以看到這個ArrayBlockingQueue底層就是一個數組
    this.lock = new ReentrantLock(fair);// 這個容器的支援併發也基於這個重要的變數lock
    this.notEmpty = lock.newCondition();// 以及兩個條件變數Condition
    this.notFull =  lock.newCondition();
}

public ArrayBlockingQueue(int capacity, boolean fair,
      Collection<? extends E> c) {
    this(capacity, fair);// 先呼叫了過載版本初始化物件

    final ReentrantLock lock = this.lock;
    lock.lock(); // Lock only for visibility, not mutual exclusion,這句的意思是雖然不一定需要鎖,但還是顯式的寫了出來
    // 由於引數裡有集合,需要將引數集合中的元素新增至本集合,所以顯式的鎖住整個集合,再進行元素的新增
    try {
        int i = 0;
        try {
            for (E e : c) {
                checkNotNull(e);// 非空校驗
                items[i++] = e;// 由於是陣列所以直接可以通過索引賦值即可
            }
        } catch (ArrayIndexOutOfBoundsException ex) {
            throw new IllegalArgumentException();
        }
        this.count = i;// 記錄元素的總count數
        this.putIndex = (i == capacity) ? 0 : i;// 記錄“入隊索引”
    } finally {
        lock.unlock();
    }
}

從上面的三個構造器來看,可以知道ArrayBlockingQueue是一個基於陣列的阻塞併發佇列,並且在初始化的時候必須指定整個容器的大小(也就是成員變數陣列的大小),並且後面也會知道,整個容器是不會擴容的,並且預設使用的是非公平鎖。

4.一些重要的非公開的方法

這些方法封裝了一些操作,在相關公開方法中會直接去呼叫

/**
 * Returns item at index i.
 */
@SuppressWarnings("unchecked")
final E itemAt(int i) {
    return (E) items[i];// 直接返回索引i處的物件
}
/**
 * Throws NullPointerException if argument is null.
 *
 * @param v the element
 */
private static void checkNotNull(Object v) {
    if (v == null)// 非空的校驗,null的話就會拋異常
        throw new NullPointerException();
}
/**
 * Inserts element at current put position, advances, and signals.
 * Call only when holding lock.
 */
private void enqueue(E x) {// 其實就是入隊操作
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;// 把當前的“入隊索引”處賦值成傳入引數
    if (++putIndex == items.length)// 如果“索引”超過了長度就歸零
        putIndex = 0;
    count++;// 元素數目增加
    notEmpty.signal();// 並且通知消費執行緒消費
}
/**
 * Extracts element at current take position, advances, and signals.
 * Call only when holding lock.
 */
private E dequeue() {// 出隊操作
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];// 先獲取“出隊索引”出的元素用於返回
    items[takeIndex] = null;// 將該索引處置空
    if (++takeIndex == items.length)// 同入隊
        takeIndex = 0;
    count--;// 元素數目減少
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();// 通知生產執行緒可以繼續生產
    return x;// 將刪除的元素返回
}
/**
 * Deletes item at array index removeIndex.
 * Utility for remove(Object) and iterator.remove.
 * Call only when holding lock.
 */
void removeAt(final int removeIndex) {
    // assert lock.getHoldCount() == 1;
    // assert items[removeIndex] != null;
    // assert removeIndex >= 0 && removeIndex < items.length;
    final Object[] items = this.items;
    if (removeIndex == takeIndex) {// 這個if和出隊方法 dequeue 基本一致
        // removing front item; just advance
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
    } else {
        // an "interior" remove

        // slide over all others up through putIndex.
        // 當需要刪除的索引和當前的出隊索引不一致時才會執行這裡的邏輯
        final int putIndex = this.putIndex;
        for (int i = removeIndex;;) {// 這裡的迴圈就是把當前要刪除的索引之後的所有元素向前挪動一個索引的位置
            int next = i + 1;
            if (next == items.length)
                next = 0;
            if (next != putIndex) {
                items[i] = items[next];
                i = next;
            } else {
                items[i] = null;// 置空最後的索引處
                this.putIndex = i;// 並且在最後把“入隊索引”放在被置空索引的索引處
                break;
            }
        }
        count--;
        if (itrs != null)
            itrs.removedAt(removeIndex);// 關於itrs這個成員在最後講解下
    }
    notFull.signal();// 由於刪除了元素,雖然不是出隊的操作,但是也通知生產執行緒可以繼續生產了
}

5.入隊和出隊操作(重要)

1.入隊操作(add, offer, put )

public boolean add(E e) {
    return super.add(e);
}

可以看到ArrayBlockingQueue的add方法是直接呼叫父類AbstractQueue的add方法的

public boolean add(E e) {
    if (offer(e))// 直接呼叫的offer方法
        return true;
    else // 從這裡也可以看到 add方法當沒有新增成功的時候是會拋異常的,並提示“佇列滿”
        throw new IllegalStateException("Queue full");
}

讓我們繼續看看offer方法

public boolean offer(E e) {
    checkNotNull(e);// 新增的元素不能為空
    final ReentrantLock lock = this.lock;
    lock.lock();// 由於ArrayBlockingQueue是隻有一把鎖,所以lock就是鎖住整個集合
    try {
        if (count == items.length)// 由於整個集合是個陣列所以當元素數目和陣列長度一致時,說明佇列已滿
            return false;
        else {
            enqueue(e);// 呼叫之前的入隊方法新增元素
            return true;
        }
    } finally {
        lock.unlock();// 最後不能忘記釋放鎖
    }
}
// offer方法的另一個過載版本, 多了一個等待的時間和單位
public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    checkNotNull(e);
    long nanos = unit.toNanos(timeout);// 這裡會將入參的時間和單位轉換成納秒
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length) {
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);// 在這裡會等待對應的納秒數,時間到了執行緒會醒來,而並非一直阻塞
        }
        enqueue(e);
        return true;
    } finally {
        lock.unlock();
    }
}

還有put方法

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();// 這裡有區別是因為put方法是會一直阻塞到能夠新增位置,所以這裡並沒有用傳統的lock()方法,因為傳統的lock() 方法會無視執行緒的中斷訊號一直會嘗試獲取鎖
    // 所以這裡的lockInterruptibly()方法是一個可中斷鎖,避免了在收到了中斷訊號後,執行緒仍然阻塞
    try {
        while (count == items.length)
            notFull.await();// 當佇列已滿的時候,等待出隊操作給出的訊號
        enqueue(e);// 當被喚醒後,說明佇列已經空出了位置,可以繼續入隊
    } finally {
        lock.unlock();
    }
}
方法名 備註
add 新增成功返回true,否則丟擲異常
offer 新增成功返回true,否則返回false (常用,推薦
put 當佇列已滿的時候,執行緒阻塞,等待消費者消費通知,直到入隊成功

2.出隊操作(poll, peek, take, remove)

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();// 如果佇列中沒有元素直接返回null,否則就調用出隊方法dequeue
    } finally {
        lock.unlock();
    }
}

// 同offer,也有個過載版本的出隊操作
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0) {
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        return dequeue();
    } finally {
        lock.unlock();
    }
}

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try { // 和poll的區別是,peek方法只是返回處於佇列首的元素,並不將它移除出佇列
        return itemAt(takeIndex); // null when queue is empty
    } finally {
        lock.unlock();
    }
}

public E take() throws InterruptedException { 
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0) // take方法和put方法類似,都是使用中斷鎖,並且會一直阻塞,直到之後的出隊(入隊)操作成功
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

讓我們再來看看remove方法

public boolean remove(Object o) {
    if (o == null) return false; // 這樣的阻塞佇列是不能有null元素的,所以null直接返回false
    final Object[] items = this.items; // 獲得陣列
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count > 0) {
            final int putIndex = this.putIndex;
            int i = takeIndex;
            do { // 這裡的迴圈就是從“出隊索引”開始遍歷,通過equals比較,找到了就刪除
                if (o.equals(items[i])) {
                    removeAt(i);
                    return true;
                }
                if (++i == items.length) // 這裡因為無論“入隊索引”還是“出隊索引”都是向上加的,只要加到了,陣列長度都會歸零,從索引0重新開始
                    i = 0;
            } while (i != putIndex);// 因為take和put 索引可以說相當於這個陣列的“頭”和“尾”,所以遍歷到“入隊索引”時,說明整個陣列都遍歷完了,就結束迴圈
        }
        return false;
    } finally {
        lock.unlock();
    }
}
方法名 備註
poll 移除成功返回true,否則返回false (常用,推薦
peek 只返回佇列最前面的元素,並不移除它
take 當佇列空的時候,執行緒阻塞,等待生產者生產通知,直到出隊成功
remove 有入參,可以移除指定的元素,但是會對集合進行遍歷,效率低(移除是否成功和重寫的equals相關)

6.其他方法

至此,這個阻塞容器的重要方法已經介紹完畢了,現在對其他方法進行依次介紹

public int size() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try { // 由於只有一把鎖,直接鎖住整個集合,返回count值就行了
        return count;
    } finally {
        lock.unlock();
    }
}

// 返回的是集合中剩餘的容量
public int remainingCapacity() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try { // 同理,鎖住整個集合
        return items.length - count; // 返回陣列長度 減去 元素個數,就是剩餘的容量了
    } finally {
        lock.unlock();
    }
}

// 和remove幾乎一樣
public boolean contains(Object o) {
    if (o == null) return false;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count > 0) {
            final int putIndex = this.putIndex;
            int i = takeIndex;
            do {
                if (o.equals(items[i]))
                    return true; // 只是找到對應元素不需要進行刪除操作,直接返回true就行了
                if (++i == items.length)
                    i = 0;
            } while (i != putIndex);
        }
        return false;
    } finally {
        lock.unlock();
    }
}

public Object[] toArray() {
    Object[] a;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final int count = this.count;
        a = new Object[count];// new了一個和count相等長度的陣列
        int n = items.length - takeIndex;
        if (count <= n) // 通過陣列複製完成資料從集合到陣列的遷移
            System.arraycopy(items, takeIndex, a, 0, count);
        else {
            System.arraycopy(items, takeIndex, a, 0, n);
            System.arraycopy(items, 0, a, n, count - n);
        }
    } finally {
        lock.unlock();
    }
    return a;
}

public <T> T[] toArray(T[] a) {
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final int count = this.count;
        final int len = a.length;
        if (len < count)
            a = (T[])java.lang.reflect.Array.newInstance(// 這裡的區別就是通過反射,獲取入引數組的型別,來建立對應型別的新陣列
                a.getClass().getComponentType(), count);
        int n = items.length - takeIndex;
        if (count <= n)
            System.arraycopy(items, takeIndex, a, 0, count);
        else {
            System.arraycopy(items, takeIndex, a, 0, n);
            System.arraycopy(items, 0, a, n, count - n);
        }
        if (len > count)
            a[count] = null;// 並且把之後的count處的索引置空
    } finally {
        lock.unlock();
    }
    return a;
}

public String toString() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int k = count;
        if (k == 0)// 如果是空的話直接打個括號
            return "[]";

        final Object[] items = this.items;
        StringBuilder sb = new StringBuilder();
        sb.append('[');
        for (int i = takeIndex; ; ) {// 遍歷完每個元素,都在後面加個逗號和空格
            Object e = items[i];
            sb.append(e == this ? "(this Collection)" : e);
            if (--k == 0) // 最後一個元素就直接 反括號結束
                return sb.append(']').toString();
            sb.append(',').append(' ');
            if (++i == items.length)
                i = 0;
        }
    } finally {
        lock.unlock();
    }
}


public void clear() {
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int k = count;
        if (k > 0) {
            final int putIndex = this.putIndex;
            int i = takeIndex;
            do {
                items[i] = null; // 通過遍歷,將陣列中的每一項都置空
                if (++i == items.length)
                    i = 0;
            } while (i != putIndex);// 當出隊和入隊索引一樣的時候就是說明所有元素都遍歷到了
            takeIndex = putIndex;
            count = 0;// 清零count
            if (itrs != null)
                itrs.queueIsEmpty(); // 對內部類(其中的迭代器)歸零
            for (; k > 0 && lock.hasWaiters(notFull); k--)
                notFull.signal();// 如果仍然有生產者執行緒在等待,喚醒它
        }
    } finally {
        lock.unlock();
    }
}

// 這兩個方法放在一起說了
public int drainTo(Collection<? super E> c) {
    return drainTo(c, Integer.MAX_VALUE);
}

// 這個方法就是把本集合中的元素轉移到入參集合c中
public int drainTo(Collection<? super E> c, int maxElements) {
    checkNotNull(c);
    if (c == this)
        throw new IllegalArgumentException();
    if (maxElements <= 0)
        return 0;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int n = Math.min(maxElements, count);
        int take = takeIndex;
        int i = 0;
        try {
            while (i < n) {
                @SuppressWarnings("unchecked")
                E x = (E) items[take];
                c.add(x);// 新增到集合c中,但是如果add拋異常了,比如新增的目標集合也是一個有界的集合,那之前的新增成功的元素會轉移到新集合中,原有的集合中的這些元素都會被移除,但是新增失敗和它之後的元素,都仍然會保留在本集合中
                items[take] = null;
                if (++take == items.length)
                    take = 0;
                i++;
            }
            return n;
        } finally {
            // Restore invariants even if c.add() threw
            // 和clear()方法類似,將集合的引數都歸零
            if (i > 0) {
                count -= i;
                takeIndex = take;
                if (itrs != null) {
                    if (count == 0)
                        itrs.queueIsEmpty();
                    else if (i > take)
                        itrs.takeIndexWrapped();
                }
                for (; i > 0 && lock.hasWaiters(notFull); i--)
                    notFull.signal();
            }
        }
    } finally {
        lock.unlock();
    }
}

// 返回迭代器
public Iterator<E> iterator() {
    return new Itr();
}

細心的讀者可能觀察到,之前有個成員變數Itrs 一直沒怎麼用到過,而且也沒看到它在哪裡初始化。那點開原始碼發現,這個Itrs究竟是什麼呢?並且它是在哪裡進行的初始化呢?其實它是在ArrayBlockingQueue的迭代器中初始化的,也就是說你如果用到了iterator()方法,才會對Itrs進行初始化。那既然這樣,就讓我們先來看看迭代器Itr吧。

內部類(迭代器)

類的定義
private class Itr implements Iterator<E> // 就是個內部類,實現了迭代器介面
重要的成員
/** Index to look for new nextItem; NONE at end */
private int cursor; // 迭代器內部的遊標 相當於takeIndex + 1

/** Element to be returned by next call to next(); null if none */
private E nextItem; // 下一個元素

/** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
private int nextIndex; // 下一個索引 相當於takeIndex

/** Last element returned; null if none or not detached. */
private E lastItem; // 最後一個元素

/** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
private int lastRet;

/** Previous value of takeIndex, or DETACHED when detached */
private int prevTakeIndex; // 相當於takeIndex

/** Previous value of iters.cycles */
private int prevCycles;

// 三個常量
/** Special index value indicating "not available" or "undefined" */
private static final int NONE = -1;

/**
 * Special index value indicating "removed elsewhere", that is,
 * removed by some operation other than a call to this.remove().
 */
private static final int REMOVED = -2;

/** Special value for prevTakeIndex indicating "detached mode" */
private static final int DETACHED = -3;
初始化
Itr() {
    // assert lock.getHoldCount() == 0;
    this.lastRet = NONE;
    final ReentrantLock lock = ArrayBlockingQueue.this.lock;
    lock.lock();
    try {
        if (count == 0) { // 如果集合為空
            // assert itrs == null;
            this.cursor = NONE; // 初始化這些成員
            this.nextIndex = NONE;
            this.prevTakeIndex = DETACHED;
        } else {
            final int takeIndex = ArrayBlockingQueue.this.takeIndex;
            this.prevTakeIndex = takeIndex; // 記錄prevTakeIndex 為takeIndex
            this.nextItem = itemAt(this.nextIndex = takeIndex); // 記錄nextIndex 為takeIndex,並且記錄nextItem 
            this.cursor = incCursor(takeIndex); // 將takeIndex+1並賦值給cursor 
            if (itrs == null) {
                itrs = new Itrs(this);// 如果集合的這個成員為空就進行初始化,這個等到講Itrs的時候再詳細說
            } else {
                itrs.register(this); // in this order
                itrs.doSomeSweeping(false);
            }
            this.prevCycles = itrs.cycles;
            // assert takeIndex >= 0;
            // assert prevTakeIndex == takeIndex;
            // assert nextIndex >= 0;
            // assert nextItem != null;
        }
    } finally {
        lock.unlock();
    }
}

private int incCursor(int index) { // 傳入的引數是takeIndex
    // assert lock.getHoldCount() == 1;
    if (++index == items.length)
        index = 0; // 如果加1之後超過了索引最大值,就歸零
    if (index == putIndex)
        index = NONE;// 如果等於了putIndex,說明集合為空
    return index;
}
重要的方法
public E next() {
    // assert lock.getHoldCount() == 0;
    final E x = this.nextItem; // 先獲得nextItem(初始化的時候已經獲取)
    if (x == null)
        throw new NoSuchElementException();
    final ReentrantLock lock = ArrayBlockingQueue.this.lock;
    lock.lock();
    try {
        if (!isDetached())
            incorporateDequeues();
        // assert nextIndex != NONE;
        // assert lastItem == null;
        this.lastRet = this.nextIndex; // lastRet就是記錄上一次獲取元素的索引位置
        final int cursor = this.cursor;
        if (cursor >= 0) { 
            this.nextItem = itemAt(this.nextIndex = this.cursor); 
            // assert nextItem != null;
            this.cursor = incCursor(cursor); // 對遊標進行+1的操作
        } else {
            nextIndex = NONE;
            nextItem = null;
        }
    } finally {
        lock.unlock();
    }
    return x;
}

boolean isDetached() {
    // assert lock.getHoldCount() == 1;
    return prevTakeIndex < 0; // 只要被初始化過,這個prev就不會小於0,應該等於上一次的takeIndex
}


public boolean hasNext() {
    // assert lock.getHoldCount() == 0;
    if (nextItem != null) // 因為在next()方法中,返回下一個元素後,會再對nextItem進行賦值,所以如果有的話,是不會為空的
        return true;
    noNext();
    return false;
}

還有一個Itrs的內部類

內部類 Itrs

類的定義
class Itrs 

但是他內部又封裝了一個Node類

/** Incremented whenever takeIndex wraps around to 0 */
int cycles = 0; // 這個變數記錄了,takeIndex轉了幾圈

/** Linked list of weak iterator references */
private Node head;

/** Used to expunge stale iterators */
private Node sweeper = null;

private static final int SHORT_SWEEP_PROBES = 4;
private static final int LONG_SWEEP_PROBES = 16;

private class Node extends WeakReference<Itr> {
    Node next;

    Node(Itr iterator, Node next) { // 整個iterator就會是前面的內部類Itr 迭代器
        super(iterator);
        this.next = next; // 指向下一個節點
    }
}

void doSomeSweeping(boolean tryHarder) {// 感覺是做一些清理的工作,對記憶體的釋放,變數置空等等
    // assert lock.getHoldCount() == 1;
    // assert head != null;
    int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
    Node o, p;
    final Node sweeper = this.sweeper;
    boolean passedGo;   // to limit search to one full sweep

    if (sweeper == null) {
        o = null;
        p = head;
        passedGo = true;
    } else {
        o = sweeper;
        p = o.next;
        passedGo = false;
    }

    for (; probes > 0; probes--) {
        if (p == null) {
            if (passedGo)
                break;
            o = null;
            p = head;
            passedGo = true;
        }
        final Itr it = p.get();
        final Node next = p.next;
        if (it == null || it.isDetached()) {
            // found a discarded/exhausted iterator
            probes = LONG_SWEEP_PROBES; // "try harder"
            // unlink p
            p.clear();
            p.next = null;
            if (o == null) {
                head = next;
                if (next == null) {
                    // We've run out of iterators to track; retire
                    itrs = null;
                    return;
                }
            }
            else
                o.next = next;
        } else {
            o = p;
        }
        p = next;
    }

    this.sweeper = (p == null) ? null : o;
}

關於這兩個內部類,看了挺久並不能很好的理解,等到時候水平增長,再回顧此部落格的時候,可能會有所更新,讓我們繼續看下一個阻塞容器吧。Orz


二、LinkedBlockingQueue

1.類的定義

// 和ArrayBlockingQueue 定義是一樣的,所以它們是兄弟類,但是它的實現使用了連結串列結構,而非陣列
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable

2.重要的成員變數

/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity; // 集合的容量,LinkedBlockingQueue 預設的容量是 Integer.MAX_VALUE

/** Current number of elements */
private final AtomicInteger count = new AtomicInteger(); // 這裡為什麼沒有直接用int,我想因為LinkedBlockingQueue用了兩把鎖的機制,來提高入隊和出隊併發時候的效率,所以這裡的count為了取到最準確的資料用了原子類

/**
 * Head of linked list.
 * Invariant: head.item == null
 */
transient Node<E> head; // 連結串列結構的頭結點,這個Node是個傀儡節點,它的item 永遠都等於null,它的作用就是指向佇列中的第一個節點

/**
 * Tail of linked list.
 * Invariant: last.next == null
 */
private transient Node<E> last;// 連結串列結構的尾節點,初始化的時候,和head指向同一個Node,但是新增元素後,尾結點代表的就是整個集合最後一個被新增的元素

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();// 出隊鎖

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();// 同ArrayBlockingQueue

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();// 入隊鎖

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

看到這裡,可以知道,LBQ用的是兩把鎖,所以在處理入隊出隊都高併發的場景的時候,效率比ABQ要高許多,這也是為什麼java內建的執行緒池使用LBQ作為預設的阻塞佇列實現了。
這裡看到了一個新的物件Node,讓我們來看看吧。

static class Node<E> {// 就是LBQ的靜態內部類
    E item; // 節點中儲存的內容

    /**
     * One of:
     * - the real successor Node
     * - this Node, meaning the successor is head.next
     * - null, meaning there is no successor (this is the last node)
     */
    Node<E> next;// 只儲存了下一個節點

    Node(E x) { item = x; }// 構造器
}

3.初始化

/**
 * Creates a {@code LinkedBlockingQueue} with a capacity of
 * {@link Integer#MAX_VALUE}.
 */
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE); // 預設 Integer.MAX_VALUE作為容量
}


            
           

相關推薦

原始碼解析關於java阻塞容器ArrayBlockingQueueLinkedBlockingQueue

Java的阻塞容器介紹JDK18 一ArrayBlockingQueue 類的定義 重要的成員變數 初始化 一些重要的非公開的方法

Java集合框架之三HashMap原始碼解析 Java集合框架之三HashMap原始碼解析

Java集合框架之三:HashMap原始碼解析     版權宣告:本文為博主原創文章,轉載請註明出處,歡迎交流學習!       HashMap在我們的工作中應用的非常廣泛,在工作面試中也經常會被問到,對於這樣一個重要的集合

Spring原始碼解析(三)父子容器的概念

  相信大家現在在使用spring專案開發時可能不只是單單使用spring一個框架進行開發, 可能會用到現在主流的ssm,spring和springmvc一起使用。   而在一起使用的時候我就發現了一個問題,在web.xml配置spring容器初始化的時候存在一個問題。     一般我們在配置sprin

Java 集合原始碼解析(1)Iterator

Java 提供的 集合類都在 Java.utils 包下,其中包含了很多 List, Set, Map, Queue… 它們的關係如下面這張類圖所示: 可以看到,Java 集合主要分為兩類:Collection 和 Map. 而 Collection 又繼承了 Iter

【死磕Java併發】-----J.U.C之阻塞佇列ArrayBlockingQueue

ArrayBlockingQueue,一個由陣列實現的有界阻塞佇列。該佇列採用FIFO的原則對元素進行排序新增的。 ArrayBlockingQueue為有界且固定,其大小在構造時由建構函式來決定,確認之後就不能再改變了。ArrayBlockingQueu

JAVA獲取日期今天昨天本週本月本年

/** * 日期工具類 */ public class DateUtils { /** * 獲取今天 * @return String * */ public static String getToday(){ return new SimpleDateFormat("

Java併發容器ConcurrentLinkedQueue

轉自:https://blog.csdn.net/chenssy/article/details/74853120 http://cmsblogs.com/ 要實現一個執行緒安全的佇列有兩種方式:阻塞和非阻塞。阻塞佇列無非就是鎖的應用,而非阻塞則是CAS演算法的應用。下面我們就開始一個非

Java三大特性封裝繼承與多型

(尊重勞動成果,轉載請註明出處:https://blog.csdn.net/qq_25827845/article/details/84592274冷血之心的部落格)           面向物件的語言有三大特性,即封裝繼承與多型。三大特

React原始碼解析(2)元件的掛載

上一章jsx語法是如何解析的講到了 <div> <div>1</div> <div>2</div> <div>3</div> </div> 複製程式碼 jsx語法是如何解析為虛擬dom的,

Spring原始碼解析(二)obtainFreshBeanFactory

spring的ApplicationContext容器的初始化流程主要由AbstractApplicationContext類中的refresh方法實現。 而refresh()方法中獲取新工廠的主要是由obtainFreshBeanFactory()實現的,後續的操作均是beanFactoty的進一步處理。

React原始碼解析(3)元件的生命週期

元件的生命週期分為二個部分 元件的掛載 元件的更新 元件的掛載 在上一章對於元件的掛載已經做了詳細的說明,但是涉及到元件生命週期部分被略過.接下來我將詳細的對其說明. 元件的掛載涉及到二個比較重要的生命週期方法componentWillMount和componentDidMount. c

JAVA併發容器ConcurrentSkipListMap

生活 目標定下來以後就不要去變,只要確定是對的,總可以到達。 二分查詢 二分查詢要求有序性,為了保障可以隨機訪問,因此會把資料儲存在連續的記憶體中,在查詢的時候效率高,但是在增加和刪除時需要大量移動元素以保證有序,所以效率不高。 如果需要快速的二分查詢,又要兼顧刪除增加元素的效率

JAVA併發容器CopyOnWriteArrayList與CopyOnWriteArraySet

生活 所有的程式設計師都劇作家,而所有計算機都是糟糕的演員。 CopyOnWriteArrayList介紹 還記得學集合的時候,學的第一個集合就是ArrayList.它是一個由陣列實現的集合。因為他對陣列的增刪改和查詢都是不加鎖的,所以它並不是執行緒安全的。 因此,我們會引入到一

JAVA併發容器JDK1.7 與 1.8 ConcurrentHashMap 區別

生活 為什麼我們總是沒有時間把事情做對,卻有時間做完它? 瞭解ConcurrentHashMap 工作中常用到hashMap,但是HashMap在多執行緒高併發場景下並不是執行緒安全的。 所以引入了ConcurrentHashMap,它是HashMap的執行緒安全版本,採用了分段

Spring原始碼解析之一(容器的基本實現)

容器的基本實現 DefaultListableBeanFactory是整個bean載入的核心部分,是Spring註冊及載入bean的預設實現, XmlBeanFactory繼承自DefaultListableBeanFactory與DefaultListableBeanFactory,

OKHttp 3.10原始碼解析(三)快取機制

本篇我們來講解OKhttp的快取處理,在網路請求中合理地利用本地快取能有效減少網路開銷,提高響應速度。HTTP報頭也定義了很多控制快取策略的域,我們先來認識一下HTTP的快取策略。 一.HTTP快取策略 HTTP快取有多種規則,根據是否需要向伺服器發起請求來分類,我們將其分為兩大類:強制

OKHttp 3.10原始碼解析(二)攔截器鏈

本篇文章我們主要來講解OKhttp的攔截器鏈,攔截器是OKhttp的主要特色之一,通過攔截器鏈,我們可以對request或response資料進行相關處理,我們也可以自定義攔截器interceptor。 上一篇文章中我們講到,不管是OKhttp的同步請求還是非同步請求,都會呼叫RealCal

OKHttp 3.10原始碼解析(一)執行緒池和任務佇列

OKhttp是Android端最火熱的網路請求框架之一,它以高效的優點贏得了廣大開發者的喜愛,下面是OKhttp的主要特點: 1.支援HTTPS/HTTP2/WebSocket 2.內部維護執行緒池佇列,提高併發訪問的效率 3.內部維護連線池,支援多路複用,減少連線建立開銷 4.

J.U.C 之阻塞佇列ArrayBlockingQueue

1. 簡介 ArrayBlockingQueue,一個由陣列實現的有界阻塞佇列。該佇列採用 FIFO 的原則對元素進行排序新增的。 ArrayBlockingQueue 為有界且固定,其大小在構造時由建構函式來決定,確認之後就不能再改變了。 ArrayBlockin

JAVA併發容器為什麼說ConcurrentHashMap是弱一致性的?

ConcurrentHashMap的弱一致性體現在clear、迭代器和get方法,原因在於沒有加鎖。 舉例: 迭代器在遍歷資料的時候是一個Segment一個Segment去遍歷的,如果在遍歷完一個Segment時正好有一個執行緒在剛遍歷完的Segment上插入資料,就會體現出不一致性。 cl