1. 程式人生 > >java併發程式設計——阻塞佇列與非阻塞佇列

java併發程式設計——阻塞佇列與非阻塞佇列

ArrayBlockingQueue

ArrayBlockingQueue是一個有界阻塞佇列資料結構基於陣列、使用ReentrantLock、Condition保證併發同步

所謂阻塞佇列
當佇列滿了,則會對生產執行緒產生阻塞直到有空位可插入;
當佇列空了,則會對消費佇列產生阻塞直到有新的元素被加入佇列。

這裡寫圖片描述

方法中含有字母t的都會產生阻塞waiting;
方法中含有o的都會返回 true/false;
剩下add、remove的會丟擲異常;
peek()會從佇列頭部觀察頭結點,但並不會對佇列造成影響。

我們通過一個簡單的應用,來逐步分析ArrayBlockingQueue佇列的程式碼:

public class ArrayBlockingQueueTest {

    public static void main(String[] args) throws InterruptedException {
        ExecutorService ex = Executors.newFixedThreadPool(50);

        ArrayBlockingQueue<CustomizedTask> tasksQueue = new ArrayBlockingQueue<CustomizedTask>(100);//有界佇列 100個元素
        // 生產者執行緒
new Thread(new Runnable() { @Override public void run() { while (!Thread.currentThread().isInterrupted()) { try { tasksQueue.put(new CustomizedTask()); TimeUnit.SECONDS.sleep(1); } catch
(InterruptedException e) { e.printStackTrace(); } } } }).start(); // 消費者執行緒 new Thread(new Runnable() { @Override public void run() { CustomizedTask task; try { while ((task = tasksQueue.take()) != null && !Thread.currentThread().isInterrupted()) { ex.submit(task); } } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); System.out.println("Main Thread is terminated"); } static class CustomizedTask implements Runnable { @Override public void run() { System.out.println(System.currentTimeMillis()); } } }

1.構造:



    /** The queued items */
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;



    /**
     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
     * capacity and default access policy.
     *
     * @param capacity the capacity of this queue
     * @throws IllegalArgumentException if {@code capacity < 1}
     */
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity];//全域性變數,一個Object[]陣列用來維護入隊元素
            lock = new ReentrantLock(fair);//ReentrantLock.Condition實現等待\通知
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }

2.入佇列。生產者生產訊息並放入佇列


    public void put(E e) throws InterruptedException {
        checkNotNull(e);//入隊元素正確性判斷
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();//獲取鎖
        try {
            while (count == items.length)//如果佇列中資料已經達到佇列上限
                notFull.await();//阻塞並釋放鎖(此時當前執行緒進入Condition佇列併產生park阻塞)
            enqueue(e);//當佇列中有空位存在的時,執行入隊
        } finally {
            lock.unlock();
        }
    }


    /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;//putIndex初始化為0,每次插入元素後遞增
        if (++putIndex == items.length)//達到上限
            putIndex = 0;
        count++;//Number of elements in the queue
    //通知阻塞在佇列上的消費者(AQS:在獲取到鎖的情況下,將阻塞在Condition佇列的結點放入sync佇列中,等待被喚醒再次嘗試鎖獲取)
        notEmpty.signal();
    }

3.出佇列。消費者如果阻塞會被喚醒,並且進行鎖獲取和取佇列元素


      public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0)//如果是個空佇列
                    notEmpty.await();//阻塞直到佇列進入元素同時釋放鎖
                return dequeue();
            } finally {
                lock.unlock();
            }
        }

    /**
     * Extracts element at current take position, advances, and signals.
     * Call only when holding lock.
     */
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];//陣列中取數
        items[takeIndex] = null;//取數後釋放佔用
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;//佇列中總元素數目減1
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();//喚醒阻塞的等待消費的執行緒
        return x;
    }

LinkedBlockingQueue

LinkedBlockingQueue是一個有界阻塞佇列,基於連結串列結構實現,預設capacity為Integer.MAX_VALUE。
我們通過一個簡單的應用,來逐步分析LinkedBlockingQueue佇列的程式碼:


    public class LinkedBlockingQueueTest {

        public static void main(String[] args) throws InterruptedException {
            ExecutorService ex = Executors.newFixedThreadPool(50);

            LinkedBlockingQueue<CustomizedTask> tasksQueue = new LinkedBlockingQueue<CustomizedTask>(100);
            // 生產者執行緒
            new Thread(new Runnable() {
                @Override
                public void run() {
                    while (!Thread.currentThread().isInterrupted()) {
                        try {
                            tasksQueue.put(new CustomizedTask());
                            TimeUnit.SECONDS.sleep(1);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }).start();

            // 消費者執行緒
            new Thread(new Runnable() {
                @Override
                public void run() {
                    CustomizedTask task;
                    try {
                        while ((task = tasksQueue.take()) != null && !Thread.currentThread().isInterrupted()) {
                            ex.submit(task);
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();

            System.out.println("Main Thread is terminated");
        }

        static class CustomizedTask implements Runnable {
            @Override
            public void run() {
                System.out.println(System.currentTimeMillis());
            }
        }
    }

1.初始化構造:


        /** Current number of elements */
        private final AtomicInteger count = new AtomicInteger();

        /** Lock held by take, poll, etc */
        private final ReentrantLock takeLock = new ReentrantLock();

        /** Wait queue for waiting takes */
        private final Condition notEmpty = takeLock.newCondition();

         /** Lock held by put, offer, etc */
        private final ReentrantLock putLock = new ReentrantLock();

        /** Wait queue for waiting puts */
        private final Condition notFull = putLock.newCondition();

       /**
         * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
         *
         * @param capacity the capacity of this queue
         * @throws IllegalArgumentException if {@code capacity} is not greater than
         *             zero
         */
        public LinkedBlockingQueue(int capacity) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.capacity = capacity;
            last = head = new Node<E>(null);//構造連結串列的頭尾結點,連結串列的初始化
        }

1.1 連結串列資料結構


        /**
         * Linked list node class
         * 一個簡單的單向連結串列
         */
        static class Node<E> {
            E item;

            /**
             * One of: - the real successor Node - this Node, meaning the successor
             * is head.next - null, meaning there is no successor (this is the last
             * node)
             */
            Node<E> next;

            Node(E x) {
                item = x;
            }
        }

2.入佇列。生產者生產訊息並放入佇列


          public void put(E e) throws InterruptedException {
        if (e == null)
            throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);//插入的物件包裝為一個結點
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();//獲取putLcok
        try {
            /*
             * Note that count is used in wait guard even though it is not
             * protected by lock. This works because count can only decrease at
             * this point (all other puts are shut out by lock), and we (or some
             * other waiting put) are signalled if it ever changes from
             * capacity. Similarly for all other uses of count in other wait
             * guards.
             */
            while (count.get() == capacity) {//佇列內元素達到上限
                notFull.await();//condition等待
            }
            enqueue(node);//在佇列不滿的情況下 插入元素
            c = count.getAndIncrement();//容量計數
            if (c + 1 < capacity)//佇列是否可以再插入一個元素
                notFull.signal();//喚醒在 putLock.condition等待的執行緒,執行緒執行插入操作。
        } finally {
            putLock.unlock();
        }
        if (c == 0)//如果佇列再進入這個操作之前是空的,那麼現在不空了(剛插入一個元素),喚醒因為佇列空而阻塞的取數執行緒
            signalNotEmpty();
    }

     private void enqueue(Node<E> node) {
            // assert putLock.isHeldByCurrentThread();
            // assert last.next == null;
            last = last.next = node;//尾部插入一個元素,並且把last引用指向這個元素
        }
    private void signalNotEmpty() {
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lock();
            try {
                notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
        }

3.出佇列。消費者如果阻塞會被喚醒,並且進行鎖獲取和取佇列元素


        public E take() throws InterruptedException {
            E x;
            int c = -1;
            final AtomicInteger count = this.count;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lockInterruptibly();
            try {
                while (count.get() == 0) {//佇列為空,則阻塞取操作直到佇列不空
                    notEmpty.await();
                }
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)//如果進入這個操作之前佇列中元素超過1個(比如2個),則表示這個操作取數後依舊不為空(起碼還有1個),那麼可以喚醒其他因為佇列為空而阻塞的執行緒
                    notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
            //喚醒這個操作執行之前因為佇列慢而產生的阻塞,起碼這個操作之後會有一個空位
            if (c == capacity)
                signalNotFull();
            return x;
        }

         private E dequeue() {
                // assert takeLock.isHeldByCurrentThread();
                // assert head.item == null;
                Node<E> h = head;
                Node<E> first = h.next;//head的下個元素。可以看到是按照 FIFO佇列排序獲取的
                //將這個元素從佇列中清除(出隊)
                h.next = h; // help GC
                head = first;
                E x = first.item;
                first.item = null;
                return x;
            }

        private void signalNotFull() {
            final ReentrantLock putLock = this.putLock;
            putLock.lock();
            try {
                notFull.signal();
            } finally {
                putLock.unlock();
            }
        }

DelayedQueue

一個無界的阻塞佇列,其中的元素需要是先Delayed介面,對元素的提取加入了延期限制

當元素的過期時間到了才允許從佇列中取出。佇列頭部的元素是等待時間最久的元素。
如果插入資料增加會自動擴容,建立新的更大的陣列並將原陣列資料放入(PriorityQueue)。
如果沒有元素到了過期時間,那麼佇列頭head不存在,並且poll操作返回null。
當一個元素到了過期時間,那麼它的getDelay(TimeUnit.NANOSECONDS)方法將會返回一個小於0的數字。佇列中不允許放入null元素。

這裡寫圖片描述

還是用一個Demo來入手原始碼的分析:

public class DelayQueueTest {

    public static void main(String[] args) {
        DelayQueue<DelayedElement> delayQueue = new DelayQueue<DelayedElement>();

        producer(delayQueue);
        consumer(delayQueue);// Consumer 1
        consumer(delayQueue);// Consumer 2

    }

    /**
     * 每100毫秒建立一個物件,放入延遲佇列,延遲時間1毫秒
     * @param delayQueue
     */
    private static void producer(final DelayQueue<DelayedElement> delayQueue) {
        // offer
        new Thread(new Runnable() {
            @Override
            public void run() {
                int i = 0;
                while (true) {
                    i++;
                    try {
                        TimeUnit.MILLISECONDS.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    DelayedElement element = new DelayedElement(1000 * 60 * 2, "test" + i);// 2min
                    System.out.println("offer success " + delayQueue.offer(element));
                }
            }
        },"Producer").start();

        /**
         * 每秒列印延遲佇列中的物件個數
         */
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        TimeUnit.MILLISECONDS.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("delayQueue size:" + delayQueue.size());
                }
            }
        },"Watcher").start();
    }

    /**
     * take
     * 
     * 消費者,從延遲佇列中獲得資料,進行處理
     * @param delayQueue
     */
    private static void consumer(final DelayQueue<DelayedElement> delayQueue) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    DelayedElement element = null;
                    try {
                        element = delayQueue.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(System.currentTimeMillis() + "---" + element);
                }
            }
        },"Consumer").start();
    }
}

class DelayedElement implements Delayed {

    private final long delay; // 延遲時間

    private final long expire; // 到期時間

    private final String msg; // 資料

    private final long now; // 建立時間

    public DelayedElement(long delay, String msg) {
        this.delay = delay;
        this.msg = msg;
        expire = System.currentTimeMillis() + delay; // 到期時間 = 當前時間+延遲時間
        now = System.currentTimeMillis();
    }
    /**
     * 需要實現的介面,獲得延遲時間 用過期時間-當前時間
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    /**
     * 用於延遲佇列內部比較排序 當前時間的延遲時間 - 比較物件的延遲時間
     * @param o
     * @return
     */
    @Override
    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public String toString() {
        final StringBuilder sb = new StringBuilder("DelayedElement{");
        sb.append("delay=").append(delay);
        sb.append(", expire=").append(expire);
        sb.append(", msg='").append(msg).append('\'');
        sb.append(", now=").append(now);
        sb.append('}');
        return sb.toString();
    }
}

1.構造初始化DelayedQ


        private final transient ReentrantLock lock = new ReentrantLock();

        private final PriorityQueue<E> q = new PriorityQueue<E>();//內部通過一個PriorityQueue儲存元素,而PriorityQueue內部通過陣列實現。這個priority會自動通過移動陣列元素進行擴容,類似ArrayList

        private final Condition available = lock.newCondition();//同樣是通過condition實現

            public DelayQueue() {
        }


        /**
         * 執行緒被設計來用來等待佇列頭部的元素
         * 
         * 這是 leader-follower模式的變體,為了最大限度減小不必要的時間等待
         * 當一個執行緒成為 leader,它會等待直到頭結點過期,而其他執行緒會無限期的等待下去,直到這個leader被釋放並喚醒其他執行緒。
         * leader 執行緒必須在從take()或者poll()等其他方法中返回前,通知啟用其他執行緒,並釋放leader引用
         * 
         * 無論什麼時候頭結點被替換了一個更早過期的時間。
         * 這個leader field 通過設定為null,被置為無效。
         * 其他執行緒被喚醒然後準備獲取到接著釋放leadship。
         * 
         */
        private Thread leader = null;

2.offer插入元素


    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);//隊尾插入
            if (q.peek() == e) {//佇列中僅有一個元素
                leader = null;
                available.signal();//可能存在其他執行緒因為佇列控而阻塞
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

3.take提取陣列元素


    /**
     * Retrieves and removes the head of this queue, waiting if necessary until
     * an element with an expired delay is available on this queue.
     *
     * @return the head of this queue
     * @throws InterruptedException {@inheritDoc}
     */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();//檢視佇列中的頭元素
                if (first == null)//為null表示沒有可獲取的元素
                    available.await();//condition await
                else {
                    long delay = first.getDelay(NANOSECONDS);//檢視這個元資料的過期時間
                    if (delay <= 0)//已過期 可獲取
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();//如果不是leader則進入等待狀態,直到之前的leader被釋放後被喚醒
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;//當前獲取佇列元素的執行緒
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;//執行緒獲取到元素後釋放leader引用
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)//leader已被釋放 && 下個結點存在
                available.signal();//leader執行緒獲取了元素 並且釋放了leader引用,退出方法前喚醒其他執行緒。
            lock.unlock();
        }
    }

小結

SynchronousQueue:
這個佇列不儲存元素,當一個執行緒向這個佇列插入一個元素,另一個佇列需要立刻從這個佇列裡取出,否則無法繼續插入元素。適合傳遞型場景。

LinkedTransferQueue:
一個由連結串列構成的無界阻塞佇列

LinkedBlockingDeque
一個連結串列結構的 雙向阻塞佇列。可以滿足兩個執行緒分別從頭尾進行插入或移除操作,應用於“工作竊取”演算法:允許一個執行緒從頭部插入\移除元素,另一個竊取執行緒從尾部竊取元素。