1. 程式人生 > >Java容器來一發(三)Queue

Java容器來一發(三)Queue

1、阻塞佇列和非阻塞佇列

阻塞佇列與非阻塞佇列的區別在於,當佇列是空的時,從佇列中獲取元素的操作將會被阻塞,或者當佇列是滿時,往佇列裡新增元素的操作會被阻塞。試圖從空的阻塞佇列中獲取元素的執行緒將會被阻塞,直到其他的執行緒往空的佇列插入新的元素。同樣,試圖往已滿的阻塞佇列中新增新元素的執行緒同樣也會被阻塞,直到其他的執行緒使佇列重新變得空閒起來,如從佇列中移除一個或者多個元素,或者完全清空佇列。

put和take屬於阻塞方法;offer和pull屬於非阻塞方法,可以不阻塞,也可以指定timeout操作時限。

2、常用佇列

常用佇列有LinkedBlockingQueue、ArrayBlockingQueue、PriorityBlockingQueue和ConcurrentLinkedQueue等。

LinkedBlockingQueue能夠更高效地處理併發資料,因為其對於生產者端和消費者端分別採用了獨立的鎖(ReentrantLock)來控制資料同步,這也意味著在高併發的情況下生產者和消費者可以並行地操作佇列中的資料,以此來提高整個佇列的併發效能。LinkedBlockingQueue是基於連結串列實現,故在生產和消費時會頻繁建立和銷燬物件,對GC有一定影響。

ArrayBlockingQueue是基於陣列實現的,沒有實現鎖分離。

PriorityBlockingQueue是一個支援執行緒優先順序排序的無界佇列,預設自然序進行排序,也可以自定義實現compareTo()方法來指定元素排序規則,不能保證同優先順序元素的順序。

ConcurrentLinkedQueue是非阻塞佇列,其入隊和出隊操作均利用CAS更新,這樣允許多個執行緒併發執行,並且不會因為加鎖而阻塞執行緒,使得併發效能更好。

3、阻塞佇列核心方法

public interface BlockingQueue<E> extends Queue<E> {

    //將給定元素設定到佇列中,如果設定成功返回true, 否則返回false。如果是往限定了長度的佇列中設定值,推薦使用offer()方法。
    boolean add(E e);

    //將給定的元素設定到佇列中,如果設定成功返回true, 否則返回false. e的值不能為空,否則丟擲空指標異常。
    boolean offer(E e);

    //將元素設定到佇列中,如果佇列中沒有多餘的空間,該方法會一直阻塞,直到佇列中有多餘的空間。
    void put(E e) throws InterruptedException;

    //將給定元素在給定的時間內設定到佇列中,如果設定成功返回true, 否則返回false.
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    //從佇列中獲取值,如果佇列中沒有值,執行緒會一直阻塞,直到佇列中有值,並且該方法取得了該值。
    E take() throws InterruptedException;

    //在給定的時間裡,從佇列中獲取值,時間到了直接呼叫普通的poll方法,為null則直接返回null。
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

    //獲取佇列中剩餘的空間。
    int remainingCapacity();

    //從佇列中移除指定的值。
    boolean remove(Object o);

    //判斷佇列中是否擁有該值。
    public boolean contains(Object o);

    //將佇列中值,全部移除,併發設定到給定的集合中。
    int drainTo(Collection<? super E> c);

    //指定最多數量限制將佇列中值,全部移除,併發設定到給定的集合中。
    int drainTo(Collection<? super E> c, int maxElements);
}

LinkedBlockingQueue的核心方法(take、put、offer、poll、contains)實現:

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();


public E takeFirst() throws InterruptedException {
	final ReentrantLock lock = this.lock;
	lock.lock();
	try {
		E x;
		while ( (x = unlinkFirst()) == null)
			notEmpty.await();
		return x;
	} finally {
		lock.unlock();
	}
}

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -1;
    Node<E> node = new Node(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        /*
         * Note that count is used in wait guard even though it is
         * not protected by lock. This works because count can
         * only decrease at this point (all other puts are shut
         * out by lock), and we (or some other waiting put) are
         * signalled if it ever changes from capacity. Similarly
         * for all other uses of count in other wait guards.
         */
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    if (e == null) throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    int c = -1;
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(new Node<E>(e));
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return true;
}

public E pollFirst(long timeout, TimeUnit unit)
		throws InterruptedException {
	long nanos = unit.toNanos(timeout);
	final ReentrantLock lock = this.lock;
	lock.lockInterruptibly();
	try {
		E x;
		while ( (x = unlinkFirst()) == null) {
			if (nanos <= 0)
				return null;
			nanos = notEmpty.awaitNanos(nanos);
		}
		return x;
	} finally {
		lock.unlock();
	}
}

public boolean contains(Object o) {
    if (o == null) return false;
    fullyLock();
    try {
        for (Node<E> p = head.next; p != null; p = p.next)
            if (o.equals(p.item))
                return true;
        return false;
    } finally {
        fullyUnlock();
    }
}

 

參考資料:

https://www.cnblogs.com/WangHaiMing/p/8798709.html