1. 程式人生 > >執行緒基礎(二十二)-併發容器-ArrayBlockingQueue(下)

執行緒基礎(二十二)-併發容器-ArrayBlockingQueue(下)

本文作者:王一飛,叩丁狼高階講師。原創文章,轉載請註明出處。

概念

ArrayBlockingQueue 是一個有界阻塞的佇列。有界原因是它底層維護了一個數組,初始化時,可以直接指定。要注意,一旦建立成功後,陣列將無法進行再擴容。而阻塞是因為它對入列出列做了加鎖處理,如果佇列滿了,再入列則需要阻塞等待, 如果佇列是空的,出列時也需要阻塞等待。

ArrayBlockingQueue 底層是一個有界陣列,遵循FIFO原則,對進入的元素進行排序,先進先出。

ArrayBlockingQueue 使用ReentrantLock鎖,再配合兩種Condition實現佇列的執行緒安全操作。併發環境下ArrayBlockingQueue 使用頻率較高

ArrayBlockingQueue 支援公平與非公平2種操作策略,在建立物件時通過建構函式將fair引數設定為true/false即可,需要注意的是,如果fair設定為false,表示持有公平鎖,這種操作會降低系統吞吐量,慎用。

 

外部結構

 

內部結構

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    final Object[] items;  //存放元素陣列
    final ReentrantLock lock;  //互斥鎖物件
    private final Condition notEmpty;  //非空條件變數
    private final Condition notFull;  //非滿條件變數
    ....
}

從內部結構原始碼上看,ArrayBlockingQueue 內部維護一個final陣列,當佇列初始化後將無法再進行拓展,保證佇列的有界性。lock 互斥鎖,在出隊入隊中保證執行緒的安全。而notEmpty 跟 notFull 條件變數保證佇列在滿隊時入隊等待, 當佇列空列時,出隊等待。

初始化

//引數1:佇列初始長度  
//引數2:是否為公平佇列   fasle: 是, true 不是
public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
}
public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
}
//引數3:佇列初始化元素
public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
      .....
}

ArrayBlockingQueue 有3個構造器,核心是2個引數的構造器, capacity表示佇列初始化長度, fair 指定ArrayBlockingQueue是公平佇列還是非公平佇列。

入列

ArrayBlockingQueue 入列方式有大體三種:

public class App {
    public static void main(String[] args) throws InterruptedException {
        //佇列長度為2
        ArrayBlockingQueue queue = new ArrayBlockingQueue(2);

        //方式1:滿列拋異常
        ///System.out.println(queue.add("add"));  //true
        ///System.out.println(queue.add("add"));  //true
        ///System.out.println(queue.add("add"));  //滿列異常

        //方式2:滿列返回false,不阻塞
        //System.out.println(queue.offer("offer"));  //true
        //System.out.println(queue.offer("offer"));  //true
        //System.out.println(queue.offer("offer"));  //false

        //方式3:滿列阻塞(推薦)
        queue.put("put");
        queue.put("put");
        queue.put("put");  //滿列阻塞等待
    }
}

這裡我們以put方法為例

public void put(E e) throws InterruptedException {
       checkNotNull(e);
       final ReentrantLock lock = this.lock;
       lock.lockInterruptibly();  //取鎖: 執行緒執行中斷
       try {
           while (count == items.length)
               notFull.await(); //佇列滿隊,需要暫停等待
           enqueue(e); //入列
       } finally {
           lock.unlock();  //釋放鎖
       }
   }

在put方法開始前, 先獲取可中斷lock.lockInterruptibly(), 對put核心邏輯進行加鎖,當判斷到佇列已滿,阻塞當前執行緒。反之, 執行enqueue()實現入列邏輯。

    private void enqueue(E x) {
        final Object[] items = this.items;
        items[putIndex] = x;  //入列
        //putIndex  表示下一個入列所以, 如果為佇列長度, 下一個輪迴
        //原因: 佇列為陣列, 操作所以從0開始
        if (++putIndex == items.length)  
            putIndex = 0; 
        count++;   //總數+1
        notEmpty.signal();  //喚醒等待出列執行緒
    }

進入enqueue之後, 因為該方法已經持有鎖,所以無法再進行鎖重入,在enqueue方法之後, 執行notEmpty.signal(); 喚醒出列等待執行緒。

出列

ArrayBlockingQueue 出列也對應的有3中方式

public class App {
    public static void main(String[] args) throws InterruptedException {
        //佇列長度為2
        ArrayBlockingQueue queue = new ArrayBlockingQueue(2);

        queue.put("admin");
        queue.put("admin");

        //方式1:空列出隊時,拋異常
        //System.out.println(queue.remove());
        //System.out.println(queue.remove());
        //System.out.println(queue.remove()); //空列報異常

        //方式2:空列出隊時,返回null
        System.out.println(queue.poll());
        System.out.println(queue.poll());
        System.out.println(queue.poll()); //空列返回null

        //方式3:空列出隊時,阻塞(推薦)
        System.out.println(queue.take());
        System.out.println(queue.take());
        System.out.println(queue.take()); //空列阻塞
    }
}

這裡我們以take方法為例

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();  // 隊長為0,需要暫停等待
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

跟put方法操作一樣, 進入方法之後, 先獲取鎖,再判斷佇列長度是否為0, 如果為0, 當前執行緒進入阻塞。反之,進入dequeue 方法執行出列操作。

    private E dequeue() {
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;  //出列之後,原先佇列設定為null 
        //takeIndex 下一個出列的資料索引, 一個輪迴後,設定為0
        if (++takeIndex == items.length)   
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();    //喚醒等待入列執行緒
        return x;
    }

公平/非公平佇列

ArrayBlockingQueue 可以實現公平與非公平2種佇列, 公平隊列表示在併發環境下,如果佇列已經滿列了,入列執行緒按照FIFO的順序阻塞,等待召喚。非公平佇列就沒有這種規矩,誰先搶到,誰先入列。

來看一下例子:
需求:開啟10個執行緒往邊界為3的佇列新增資料, 同時開始一個執行緒不斷出列。

public class App {
    public static void main(String[] args) throws InterruptedException {
        //佇列長度為3
        //公平佇列
        ArrayBlockingQueue queue = new ArrayBlockingQueue(3, true);
        for (int i= 0; i < 10; i++){

            new Thread(new Runnable() {
                @Override
                public void run() {

                    try {
                        Thread.sleep((long) (Math.random() * 10)); //將問題放大
                        //執行緒進入
                        System.out.println("進入-"+ Thread.currentThread().getName());
                        //阻塞等待入列
                        queue.put("出列-" + Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            },"t_" + i).start();
        }


        new Thread(new Runnable() {
            @Override
            public void run() {
                while(true){
                    try {
                        //按順序出列
                        System.out.println("------" + queue.take());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }
}
進入-t_5
進入-t_1
------出列-t_5
------出列-t_1
進入-t_8
------出列-t_8
進入-t_7
------出列-t_7
進入-t_2
------出列-t_2
進入-t_9
------出列-t_9
進入-t_0
------出列-t_0
進入-t_3
進入-t_6
------出列-t_3
------出列-t_6
進入-t_4
------出列-t_4

觀察結果,發現進入順序跟出列順序一樣。公平佇列講究公平, 進入0到9執行緒啟動後,執行run方法,都能執行 “進入” 程式碼,但是入列的操作是阻塞的,同一時間點只允許一個執行緒進入。其他執行緒必須等待,那麼誰先列印 “進入” 程式碼,就表示誰先阻塞,依照公平FIFO原則,就應該誰先出列。 所以當進入順序與出列一致就把表示公平原則生效。

將引數改為false,我們再看列印結果

ArrayBlockingQueue queue = new ArrayBlockingQueue(3, false);
進入-t_3
進入-t_7
進入-t_6
進入-t_0
進入-t_4
進入-t_8
進入-t_5
進入-t_1
------出列-t_3
------出列-t_7
------出列-t_6
------出列-t_8
------出列-t_4
------出列-t_5
------出列-t_1
------出列-t_0
進入-t_9
------出列-t_9
進入-t_2
------出列-t_2

觀察, 很明顯進入與出列順序不一致,這就是非公平佇列。

注意: 10個執行緒效果不是太明顯,可以適當加大。

到這,本篇結束。

想獲取更多技術視訊,請前往叩丁狼官網:http://www.wolfcode.cn/all_article.html