1. 程式人生 > >【Java併發程式設計】LinkedBlockingQueue的使用(六)

【Java併發程式設計】LinkedBlockingQueue的使用(六)

 一、LinkedBlockingQueue

   1.1 簡介

  LinkedBlockingQueue是一個由連結串列結構組成的有界阻塞佇列,此佇列是FIFO(先進先出)的順序來訪問的,它由隊尾插入後再從隊頭取出或移除,其中佇列的頭部是在佇列中時間最長的元素,佇列的尾部是在佇列中時間最短的元素。在LinkedBlockingQueue類中分別用2個不同的鎖takeLock、putLock來保護隊頭和隊尾操作。如下圖所示:

   1.2 類圖


1.3 原始碼分析

   1.3.1 屬性與連結串列節點類

//連結串列節點類,next指向下一個節點。如果下一個節點時null表示沒有節點了。
static class Node<E> {
    E item;

    Node<E> next;

    Node(E x) { item = x; }
}

// 最大容量上限,預設是 Integer.MAX_VALUE
private final int capacity;

// 當前元素數量,這是個原子類。
private final AtomicInteger count = new AtomicInteger(0);

// 頭結點
private transient Node<E> head;

// 尾結點
private transient Node<E> last;

// 隊頭訪問鎖
private final ReentrantLock takeLock = new ReentrantLock();

// 隊頭訪問等待條件、佇列
private final Condition notEmpty = takeLock.newCondition();

// 隊尾訪問鎖
private final ReentrantLock putLock = new ReentrantLock();

// 隊尾訪問等待條件、佇列
private final Condition notFull = putLock.newCondition();

  使用原子類AtomicInteger是因為讀寫分別使用了不同的鎖,但都會訪問這個屬性來計算佇列中元素的數量,所以它需要是執行緒安全的。關

   1.3.2 offer操作

public boolean offer(E e) {
	if (e == null) throw new NullPointerException();
	final AtomicInteger count = this.count;
	//當佇列滿時,直接返回了false,沒有被阻塞等待元素插入
	if (count.get() == capacity)
		return false;
	int c = -1;
	Node<E> node = new Node(e);
	//開啟隊尾保護鎖
	final ReentrantLock putLock = this.putLock;
	putLock.lock();
	try {
		if (count.get() < capacity) {
			enqueue(node);
			//原則計數類
			c = count.getAndIncrement();
			if (c + 1 < capacity)
				notFull.signal();
		}
	} finally {
		//釋放鎖
		putLock.unlock();
	}
	if (c == 0)
		signalNotEmpty();
	return c >= 0;
}

//在持有鎖下指向下一個節點
private void enqueue(Node<E> node) {
	// assert putLock.isHeldByCurrentThread();
	// assert last.next == null;
	last = last.next = node;
}

   1.3.3 put操作

//put 操作把指定元素新增到隊尾,如果沒有空間則一直等待。
public void put(E e) throws InterruptedException {
	if (e == null) throw new NullPointerException();
	// Note: convention in all put/take/etc is to preset local var
	// holding count negative to indicate failure unless set.
	//註釋:在所有的 put/take/etc等操作中變數c為負數表示失敗,>=0表示成功。
	int c = -1;
	Node<E> node = new Node(e);
	final ReentrantLock putLock = this.putLock;
	final AtomicInteger count = this.count;
	putLock.lockInterruptibly();
	try {
		/*
		 * Note that count is used in wait guard even though it is
		 * not protected by lock. This works because count can
		 * only decrease at this point (all other puts are shut
		 * out by lock), and we (or some other waiting put) are
		 * signalled if it ever changes from capacity. Similarly
		 * for all other uses of count in other wait guards.
		 */
		/*
		 * 注意,count用於等待監視,即使它沒有用鎖保護。這個可行是因為
		 * count 只能在此刻(持有putLock)減小(其他put執行緒都被鎖拒之門外),
		 * 當count對capacity發生變化時,當前執行緒(或其他put等待執行緒)將被通知。
		 * 在其他等待監視的使用中也類似。
		 */
		while (count.get() == capacity) {
			notFull.await();
		}
		enqueue(node);
		c = count.getAndIncrement();
		// 還有可新增空間則喚醒put等待執行緒。
		if (c + 1 < capacity)
			notFull.signal();
	} finally {
		putLock.unlock();
	}
	if (c == 0)
		signalNotEmpty();
}

   1.3.4 take操作

//彈出隊頭元素,如果沒有會被阻塞直到元素返回
public E take() throws InterruptedException {
	E x;
	int c = -1;
	final AtomicInteger count = this.count;

	final ReentrantLock takeLock = this.takeLock;
	takeLock.lockInterruptibly();
	try {
		while (count.get() == 0) {
			notEmpty.await();//沒有元素一直阻塞
		}
		x = dequeue();
		c = count.getAndDecrement();
		if (c > 1)//如果還有可獲取元素,喚醒等待獲取的執行緒。
		  notEmpty.signal();
	} finally {
		//拿到元素後釋放鎖
		takeLock.unlock();
	}
	if (c == capacity)
		signalNotFull();
	return x;
}

//在持有鎖下返回佇列隊頭第一個節點
private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    //出隊後的節點作為頭節點並將元素置空
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

   1.3.5 remove操作


//移除指定元素。
public boolean remove(Object o) {
	if (o == null) return false;
	//對兩把鎖加鎖
	fullyLock();
	try {
		for (Node<E> trail = head, p = trail.next;
				p != null;
				trail = p, p = p.next) {
			if (o.equals(p.item)) {
				unlink(p, trail);
				return true;
			}
		}
		return false;
	} finally {
		fullyUnlock();
	}
}

//p是移除元素所在節點,trail是移除元素的上一個節點
void unlink(Node<E> p, Node<E> trail) {
	// assert isFullyLocked();
	// p.next is not changed, to allow iterators that are
	// traversing p to maintain their weak-consistency guarantee.
	p.item = null;
	//將trail下一個節點指向p的下一個節點
	trail.next = p.next;
	if (last == p)
		last = trail;
	if (count.getAndDecrement() == capacity)
		notFull.signal();
}

void fullyLock() {
	putLock.lock();
	takeLock.lock();
}

//釋放鎖時確保和加鎖順序一致
void fullyUnlock() {
	takeLock.unlock();
	putLock.unlock();
}

注意,鎖的釋放順序與加鎖順序是相反的。

參考資料
http://ifeve.com/juc-linkedblockingqueue/

     本部落格中未標明轉載的文章歸作者小毛驢所有,歡迎轉載,但未經作者同意必須保留此段宣告,且在文章頁面明顯位置給出原文連線,否則保留追究法律責任的權利。