1. 程式人生 > >執行緒池中常用的阻塞佇列簡述

執行緒池中常用的阻塞佇列簡述

一、ArrayBlockingQueue

基於陣列的阻塞佇列,有界佇列,按照先進先出(FIFO)的形式,初始化是必須指定capacity.看一下原始碼:

/**第一種構造方法,指定初始容量*/
public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
/**第二種構造方法,指定初始容量和一個標誌,true:公平鎖,false:非公平鎖*/
 public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
/**第三種構造方法,指定初始容量和一個標誌,true:公平鎖,false:非公平鎖;指定一個初始化的集合*/
 public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

二、LinkedBlockingQueue

基於連結串列的阻塞佇列,元素按照先進先出(FIFO)的策略。原始碼如下:

/**無界佇列*/ 
public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }    
/**指定初始化容量*/
public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }
/**無界佇列,並指定初始化的集合*/
public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }

三、SynchronousBlockingQueue

容量為0,不儲存任務,通俗地講就是有一個處理一個。原始碼如下:

/**第一種無參,this指定的是第二種構造方法*/
public SynchronousQueue() {
        this(false);
    }
/**第二種,指定fair true:雙端佇列  false:雙端棧*/
public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

四、PriorityBlockingQueue

這是一個無界有序的阻塞佇列,排序規則和之前介紹的PriorityQueue一致,只是增加了阻塞操作。同樣的該佇列不支援插入null元素,同時不支援插入非comparable的物件。它的迭代器並不保證佇列保持任何特定的順序,如果想要順序遍歷,考慮使用Arrays.sort(pq.toArray())。該類不保證同等優先順序的元素順序,如果你想要強制順序,就需要考慮自定義順序或者是Comparator使用第二個比較屬性。原始碼如下:

/**第一種,建立容量的11的佇列*/ 
public PriorityBlockingQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }
/**第二種,指定初始化容量*/
public PriorityBlockingQueue(int initialCapacity) {
        this(initialCapacity, null);
    }
/**第三種,指定初始化容量和比較規則*/
public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }
/**第四種,指定初始化集合,使用鎖*/
public PriorityBlockingQueue(Collection<? extends E> c) {
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        boolean heapify = true; // true if not known to be in heap order
        boolean screen = true;  // true if must screen for nulls
        if (c instanceof SortedSet<?>) {
            SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
            this.comparator = (Comparator<? super E>) ss.comparator();
            heapify = false;
        }
        else if (c instanceof PriorityBlockingQueue<?>) {
            PriorityBlockingQueue<? extends E> pq =
                (PriorityBlockingQueue<? extends E>) c;
            this.comparator = (Comparator<? super E>) pq.comparator();
            screen = false;
            if (pq.getClass() == PriorityBlockingQueue.class) // exact match
                heapify = false;
        }
        Object[] a = c.toArray();
        int n = a.length;
        // If c.toArray incorrectly doesn't return Object[], copy it.
        if (a.getClass() != Object[].class)
            a = Arrays.copyOf(a, n, Object[].class);
        if (screen && (n == 1 || this.comparator != null)) {
            for (int i = 0; i < n; ++i)
                if (a[i] == null)
                    throw new NullPointerException();
        }
        this.queue = a;
        this.size = n;
        if (heapify)
            heapify();
    }

它使用如下方法進行擴容:

/***/ 
private void tryGrow(Object[] array, int oldCap) {
        lock.unlock(); // must release and then re-acquire main lock
        Object[] newArray = null;
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {
            try {
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                }
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
                allocationSpinLock = 0;
            }
        }
        if (newArray == null) // back off if another thread is allocating
            Thread.yield();
        lock.lock();
        if (newArray != null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
    }

 其先放開了鎖,然後通過CAS設定allocationSpinLock來判斷哪個執行緒獲得了擴容許可權,如果沒搶到許可權就會讓出CPU使用權。最後還是要鎖住開始真正的擴容。擴容許可權爭取到了就是計算大小,分配陣列。擴容的大小時原有的一倍。