1. 程式人生 > >【小家java】BlockingQueue阻塞佇列詳解以及5大實現(ArrayBlockingQueue、DelayQueue、LinkedBlockingQueue...)

【小家java】BlockingQueue阻塞佇列詳解以及5大實現(ArrayBlockingQueue、DelayQueue、LinkedBlockingQueue...)

相關閱讀

【小家java】java5新特性(簡述十大新特性) 重要一躍
【小家java】java6新特性(簡述十大新特性) 雞肋升級
【小家java】java7新特性(簡述八大新特性) 不溫不火
【小家java】java8新特性(簡述十大新特性) 飽受讚譽
【小家java】java9新特性(簡述十大新特性) 褒貶不一
【小家java】java10新特性(簡述十大新特性) 小步迭代
【小家java】java11新特性(簡述八大新特性) 首個重磅LTS版本


【小家java】Java中的執行緒池,你真的用對了嗎?(教你用正確的姿勢使用執行緒池)
小家Java】一次Java執行緒池誤用(newFixedThreadPool)引發的線上血案和總結


【小家java】BlockingQueue阻塞佇列詳解以及5大實現(ArrayBlockingQueue、DelayQueue、LinkedBlockingQueue…)
【小家java】用 ThreadPoolExecutor/ThreadPoolTaskExecutor 執行緒池技術提高系統吞吐量(附帶執行緒池引數詳解和使用注意事項)


前言

在新增的Concurrent包中,BlockingQueue很好的解決了多執行緒中,如何高效安全“傳輸”資料的問題。通過這些高效並且執行緒安全的佇列類,為我們快速搭建高質量的多執行緒程式帶來極大的便利。本文詳細介紹了BlockingQueue家庭中的所有成員,包括他們各自的功能以及常見使用場景。

認識BlockingQueue

為什麼說是阻塞(Blocking)的呢?是因為 BlockingQueue 支援當獲取佇列元素但是佇列為空時,會阻塞等待佇列中有元素再返回;也支援新增元素時,如果佇列已滿,那麼等到佇列可以放入新元素時再放入。

BlockingQueue 是一個介面,繼承自 Queue,所以其實現類也可以作為 Queue 的實現來使用,而 Queue 又繼承自 Collection 介面。

阻塞佇列:顧名思義,首先它是一個佇列,而一個佇列在資料結構中所起的作用大致如下圖所示:
在這裡插入圖片描述
從上圖我們可以很清楚看到,通過一個共享的佇列,可以使得資料由佇列的一端輸入,從另外一端輸出;

常用的佇列主要有以下兩種策略:(當然通過不同的實現方式,還可以延伸出很多不同型別的佇列,DelayQueue就是其中的一種)

  1. 先進先出(FIFO):先插入的佇列的元素也最先出佇列,類似於排隊的功能。從某種程度上來說這種佇列也體現了一種公平性。
  2. 後進先出(LIFO):後插入佇列的元素最先出佇列,這種佇列優先處理最近發生的事件。

多執行緒環境中,通過佇列可以很容易實現資料共享,比如經典的**“生產者”和“消費者”模型中**,通過佇列可以很便利地實現兩者之間的資料共享。假設我們有若干生產者執行緒,另外又有若干個消費者執行緒。如果生產者執行緒需要把準備好的資料共享給消費者執行緒,利用佇列的方式來傳遞資料,就可以很方便地解決他們之間的資料共享問題。

但是生產者和消費者的處理速度,肯定是不完全匹配的。因此我們需要引入阻塞的概念:如果生產過剩,那就暫停一下等到消費者消費。反之亦然。然而在concurrent包釋出以前,在多執行緒環境下,我們每個程式設計師都必須去自己控制這些細節(比如我們使用Lock機制等),尤其還要兼顧效率和執行緒安全,而這會給我們的程式帶來不小的複雜度。好在此時,強大的concurrent包橫空出世了,而他也給我們帶來了強大的BlockingQueue。

(在多執行緒領域:所謂阻塞,在某些情況下會掛起執行緒(即阻塞),一旦條件滿足,被掛起的執行緒又會自動被喚醒)
當佇列中填滿資料的情況下,生產者端的所有執行緒都會被自動阻塞(掛起),直到佇列中有空的位置,執行緒被自動喚醒。

作為BlockingQueue的使用者,我們再也不需要關心什麼時候需要阻塞執行緒,什麼時候需要喚醒執行緒,因為這一切BlockingQueue都給你一手包辦了。既然BlockingQueue如此神通廣大,讓我們一起來見識下它的常用方法:

BlockingQueue的核心方法:

BlockingQueue 對插入操作、移除操作、獲取元素操作提供了四種不同的方法用於不同的場景中使用:
1、丟擲異常;如果試圖的操作無法立即執行,拋一個異常。
2、返回特殊值(null 或 true/false,取決於具體的操作);
3、阻塞等待此操作,直到這個操作成功;
4、阻塞等待此操作,直到成功或者超時指定時間。
詳情如下:
在這裡插入圖片描述

BlockingQueue 的各個實現都遵循了這些規則,當然我們也不用死記這個表格,知道有這麼回事,然後寫程式碼的時候根據自己的需要去看方法的註釋來選取合適的方法即可。

備註:peek 方法的語義是隻讀取不移除

放入資料:

  • offer(anObject):表示如果可能的話,將anObject加到BlockingQueue裡,即如果BlockingQueue可以容納,則返回true,否則返回false.(本方法不阻塞當前執行方法的執行緒)
  • offer(E o, long timeout, TimeUnit unit),可以設定等待的時間,如果在指定的時間內,還不能往佇列中加入BlockingQueue,則返回失敗。
  • put(anObject):把anObject加到BlockingQueue裡,如果BlockQueue沒有空間,則呼叫此方法的執行緒被**阻斷直到BlockingQueue裡面有空間再繼續.**

獲取資料:

  • poll(time):取走BlockingQueue裡排在首位的物件,若不能立即取出,則可以等time引數規定的時間,取不到時返回null;
  • poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的物件,如果在指定時間內,佇列一旦有資料可取,則立即返回佇列中的資料。否則直到時間超時還沒有資料可取,返回失敗。
  • take():取走BlockingQueue裡排在首位的物件,若BlockingQueue為空,阻斷進入**等待狀態直到BlockingQueue有新的資料**被加入;
  • drainTo():一次性從BlockingQueue獲取所有可用的資料物件(還可以指定獲取資料的個數), 通過該方法,可以提升獲取資料效率;不需要多次分批加鎖或釋放鎖(一般用於批量操作)。

對於 BlockingQueue,我們的關注點應該在 put(e) 和 take() 這兩個方法,因為這兩個方法是帶阻塞的。

BlockingQueue 是設計用來實現生產者-消費者佇列的,當然,你也可以將它當做普通的 Collection 來用,前面說了,它實現了 java.util.Collection 介面。例如,我們可以用 remove(x) 來刪除任意一個元素,但是,這類操作通常並不高效,所以儘量只在少數的場合使用,比如一條訊息已經入隊,但是需要做取消操作的時候。

BlockingQueue 的實現都是執行緒安全的,但是批量的集合操作如 addAll, containsAll, retainAll 和 removeAll 不一定是原子操作。如 addAll© 有可能在添加了一些元素後中途丟擲異常,此時 BlockingQueue 中已經添加了部分元素,這個是允許的,取決於具體的實現

BlockingQueue常見5種實現

五大常用實現如下:
在這裡插入圖片描述

ArrayBlockingQueue

基於陣列的阻塞佇列實現,在ArrayBlockingQueue內部,維護了一個定長陣列,以便快取佇列中的資料物件,這是一個常用的阻塞佇列,除了一個定長陣列外,ArrayBlockingQueue內部還儲存著兩個整形變數,分別標識著佇列的頭部和尾部在陣列中的位置。

ArrayBlockingQueue在生產者放入資料和消費者獲取資料,都是共用同一個鎖物件,由此也意味著兩者無法真正並行執行,因此併發效率上相對較低,這點尤其不同於LinkedBlockingQueue;

通過看原始碼分析,其實ArrayBlockingQueue完全是可以使用分離鎖的。但是作者Doug Lea並沒有這麼去幹,理由如下:

ArrayBlockingQueue的資料寫入和獲取操作已經足夠輕巧,以至於引入獨立的鎖機制,除了給程式碼帶來額外的複雜性外,其在效能上完全佔不到任何便宜。

ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不同之處在於,前者在插入或刪除元素時不會產生或銷燬任何額外的物件例項,而後者則會生成一個額外的Node物件。這在長時間內需要高效併發地處理大批量資料的系統中,其對於GC的影響還是存在一定的區別。

而在建立ArrayBlockingQueue時,我們還可以控制物件的內部鎖是否採用公平鎖,預設採用非公平鎖。

對於 ArrayBlockingQueue,我們可以在構造的時候指定以下三個引數:

public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
  1. 佇列容量,其限制了佇列中最多允許的元素個數;
  2. 指定獨佔鎖是公平鎖還是非公平鎖。非公平鎖的吞吐量比較高,公平鎖可以保證每次都是等待最久的執行緒獲取到鎖;
  3. 可以指定用一個集合來初始化,將此集合中的元素在構造方法期間就先新增到佇列中。

給個完整的生產者、消費者的例子:

public class ProduceConsumeDemo {

    public static void main(String[] args) {
        //生產者和消費者共用這一個佇列,佇列容量為10
        ArrayBlockingQueue<Cookie> arrayBlockingQueue = new ArrayBlockingQueue<>(10);

        //開啟一個生產者
        for (int i = 0; i < 1; i++) {
            new Produce(arrayBlockingQueue).start();
        }

        //一個生產者,5個消費者
        for (int i = 0; i < 5; i++) {
            new Thread(new Consume(arrayBlockingQueue)).start();
        }

    }
}

class Produce extends Thread {
    private static int i = 0;
    private ArrayBlockingQueue<Cookie> arrayBlockingQueue;

    public Produce(ArrayBlockingQueue<Cookie> arrayBlockingQueue) {
        this.arrayBlockingQueue = arrayBlockingQueue;
    }

    public void run() {
        try {
            while (i < 1000) {
                arrayBlockingQueue.put(new Cookie("cookie" + i));
                if (++i % 100 == 0) {//每生產100個,休息10s
                    Thread.sleep(10000);
                }
            }
        } catch (InterruptedException e) {
            System.out.println("produce queue InterruptedException");
        }
    }
}

class Consume implements Runnable {
    private ArrayBlockingQueue<Cookie> arrayBlockingQueue;

    public Consume(ArrayBlockingQueue<Cookie> arrayBlockingQueue) {
        this.arrayBlockingQueue = arrayBlockingQueue;
    }

    public void run() {
        try {
            while (true) {
                Cookie poll = arrayBlockingQueue.poll(5, TimeUnit.SECONDS);//如果queue為null,那麼5秒之後再去佇列中取資料
                if (poll != null)
                    System.out.println(Thread.currentThread().getName() + "--consume --" + poll);

            }
        } catch (InterruptedException e) {
            System.out.println("consume queue InterruptedException");
        }
    }
}

class Cookie {
    private String number;

    public Cookie(String number) {
        this.number = number;
    }

    @Override
    public String toString() {
        return number + "";
    }
}
LinkedBlockingQueue

基於連結串列的阻塞佇列,同ArrayListBlockingQueue類似,其內部也維持著一個數據緩衝佇列(該佇列由一個連結串列構成)。當生產者往佇列中放入一個數據時,佇列會從生產者手中獲取資料,並快取在佇列內部,而生產者立即返回

只有當佇列緩衝區達到最大值快取容量時(LinkedBlockingQueue可以通過建構函式指定該值),才會阻塞生產者佇列,直到消費者從佇列中消費掉一份資料,生產者執行緒會被喚醒,反之對於消費者這端的處理也基於同樣的原理。而LinkedBlockingQueue之所以能夠高效的處理併發資料,還因為其對於生產者端和消費者端分別採用了獨立的鎖來控制資料同步,這也意味著在高併發的情況下生產者和消費者可以並行地操作佇列中的資料,以此來提高整個佇列的併發效能。

作為開發者,我們需要注意的是,如果構造一個LinkedBlockingQueue物件,而沒有指定其容量大小,LinkedBlockingQueue會預設一個類似無限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產者的速度一旦大於消費者的速度,也許還沒有等到佇列滿阻塞產生,系統記憶體就有可能已被消耗殆盡了。

ArrayBlockingQueue和LinkedBlockingQueue是兩個最普通也是最常用的阻塞佇列,一般情況下,在處理多執行緒間的生產者消費者問題,使用這兩個類足以。

DelayQueue

DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從佇列中獲取到該元素。DelayQueue是一個沒有大小限制的佇列,因此往佇列中插入資料的操作(生產者)永遠不會被阻塞,而只有獲取資料的操作(消費者)才會被阻塞。

DelayQueue阻塞佇列在我們系統開發中也常常會用到,例如:快取系統的設計,快取中的物件,超過了空閒時間,需要從快取中移出;任務排程系統,能夠準確的把握任務的執行時間。我們可能需要通過執行緒處理很多時間上要求很嚴格的資料,如果使用普通的執行緒,我們就需要遍歷所有的物件,一個一個的檢 檢視資料是否過期等,首先這樣在執行上的效率不會太高,其次就是這種設計的風格也大大的影響了資料的精度。一個需要12:00點執行的任務可能12:01 才執行,這樣對資料要求很高的系統有更大的弊端。由此我們可以使用DelayQueue。

使用場景:
  DelayQueue使用場景較少,但都相當巧妙,常見的例子比如使用一個DelayQueue來管理一個超時未響應的連線佇列。

使用:
為了具有呼叫行為,存放到DelayDeque的元素必須繼承Delayed介面。Delayed介面使物件成為延遲物件,它使存放在DelayQueue類中的物件具有了啟用日期。該介面強制執行下列兩個方法。

  1. CompareTo(Delayed o):Delayed介面繼承了Comparable介面,因此有了這個方法。
  2. getDelay(TimeUnit unit):這個方法返回到啟用日期的剩餘時間,時間單位由單位引數指定。

業務場景一:多考生考試
模擬一個考試的日子,考試時間為120分鐘,30分鐘後才可交卷,當時間到了,或者學生都交完捲了宣佈考試結束。

實現思想:用DelayQueue儲存考生(Student類),每一個考生都有自己的名字和完成試卷的時間,Teacher執行緒對DelayQueue進行監控,**收取完成試卷小於120分鐘的學生的試卷。**當考試時間120分鐘到時,先關閉Teacher執行緒,然後強制DelayQueue中還存在的考生交卷。每一個考生交卷都會進行一次

public class Exam {

    public static void main(String[] args) throws InterruptedException {
        int studentNumber = 20;
        CountDownLatch countDownLatch = new CountDownLatch(studentNumber + 1);
        DelayQueue<Student> students = new DelayQueue<>();
        Random random = new Random();
        for (int i = 0; i < studentNumber; i++) {
            students.put(new Student("student" + (i + 1), random.nextInt(120) + 30, countDownLatch));
        }

        //老師監考  準備一個老師即可  把學生們需要交給老師 以便監控
        Thread teacherThread = new Thread(new Teacher(students));

        //強制交卷 時間為120分鐘  並且把老師執行緒穿進去收捲紙
        students.put(new EndExam(students, 120, countDownLatch, teacherThread));
        teacherThread.start();

        countDownLatch.await();
        System.out.println(" 考試時間到,全部交卷!");
    }

}

class Student implements Runnable, Delayed {

    private String name;
    private long workTime; //希望用時  有的學生小於120分鐘  有的會大於120分鐘 會被強制交卷
    private long submitTime; //交卷的時間
    private boolean isForce = false;
    private CountDownLatch countDownLatch;

    public Student() {
    }

    public Student(String name, long workTime, CountDownLatch countDownLatch) {
        this.name = name;
        this.workTime = workTime;
        this.submitTime = TimeUnit.NANOSECONDS.convert(workTime, TimeUnit.NANOSECONDS) + System.nanoTime();
        this.countDownLatch = countDownLatch;
    }

    @Override
    public int compareTo(Delayed o) {
        if (o == null || !(o instanceof Student))
            return 1;
        if (o == this)
            return 0;
        Student s = (Student) o;
        if (this.workTime > s.workTime) {
            return 1;
        } else if (this.workTime == s.workTime) {
            return 0;
        } else {
            return -1;
        }
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(submitTime - System.nanoTime(), TimeUnit.NANOSECONDS);
    }

    @Override
    public void run() {
        if (isForce) {
            System.out.println(name + " 交卷, 希望用時" + workTime + "分鐘" + " ,實際用時 120分鐘");
        } else {
            System.out.println(name + " 交卷, 希望用時" + workTime + "分鐘" + " ,實際用時 " + workTime + " 分鐘");
        }
        countDownLatch.countDown(); //交卷一個 減一個人
    }

    public boolean isForce() {
        return isForce;
    }

    public void setForce(boolean isForce) {
        this.isForce = isForce;
    }

}

class EndExam extends Student {

    private DelayQueue<Student> students;
    private CountDownLatch countDownLatch;
    private Thread teacherThread;

    public EndExam(DelayQueue<Student> students, long workTime, CountDownLatch countDownLatch, Thread teacherThread) {
        super("強制收卷", workTime, countDownLatch);
        this.students = students;
        this.countDownLatch = countDownLatch;
        this.teacherThread = teacherThread;
    }


    @Override
    public void run() {
        teacherThread.interrupt(); //打斷執行緒 強制交卷  不要讓學生自己take了 也可採用一個全域性變數標記
        Student tmpStudent;

        //遍歷所有還未執行的學生們 把他們拿出來 手動呼叫他們的run方法交卷
        for (Iterator<Student> iterator2 = students.iterator(); iterator2.hasNext(); ) {
            tmpStudent = iterator2.next();
            tmpStudent.setForce(true);
            tmpStudent.run();
        }
        countDownLatch.countDown(); //最後注意 把自己強制交卷的執行緒 也要一下
    }

}

class Teacher implements Runnable {

    // 老師需要知道  自己監控哪些學生 宣佈考試開始
    private DelayQueue<Student> students;

    public Teacher(DelayQueue<Student> students) {
        this.students = students;
    }

    @Override
    public void run() {
        try {
            System.out.println(" test start");
            //宣佈考試後 才能讓學生開始run
            // 此處需要注意 take是阻塞的 只要時間到了  才會take出來 才會執行run方法
            // 中途有可能執行緒會被interrupted(比如強制交卷的情況下 就不能再讓take了 需要執行強制交卷的執行緒任務)
            while (!Thread.interrupted()) {
                students.take().run();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
輸出:
 test start
student9 交卷, 希望用時35分鐘 ,實際用時 35 分鐘
student5 交卷, 希望用時45分鐘 ,實際用時 45 分鐘
student6 交卷, 希望用時45分鐘 ,實際用時 45 分鐘
student20 交卷, 希望用時52分鐘 ,實際用時 52 分鐘
student11 交卷, 希望用時62分鐘 ,實際用時 62 分鐘
student2 交卷, 希望用時79分鐘 ,實際用時 79 分鐘
student15 交卷, 希望用時81分鐘 ,實際用時 81 分鐘
student8 交卷, 希望用時83分鐘 ,實際用時 83 分鐘
student10 交卷, 希望用時83分鐘 ,實際用時 83 分鐘
student1 交卷, 希望用時83分鐘 ,實際用時 83 分鐘
student18 交卷, 希望用時94分鐘 ,實際用時 94 分鐘
student3 交卷, 希望用時94分鐘 ,實際用時 94 分鐘
student16 交卷, 希望用時98分鐘 ,實際用時 98 分鐘
student7 交卷, 希望用時114分鐘 ,實際用時 114 分鐘
student12 交卷, 希望用時118分鐘 ,實際用時 118 分鐘
student19 交卷, 希望用時122分鐘 ,實際用時 120分鐘
student17 交卷, 希望用時125分鐘 ,實際用時 120分鐘
student14 交卷, 希望用時134分鐘 ,實際用時 120分鐘
student4 交卷, 希望用時148分鐘 ,實際用時 120分鐘
student13 交卷, 希望用時143分鐘 ,實際用時 120分鐘
 考試時間到,全部交卷!

業務場景二:具有過期時間的快取
向快取新增內容時,給每一個key設定過期時間,系統自動將超過過期時間的key清除。
這個場景中幾個點需要注意:

  1. 當向快取中新增key-value對時,如果這個key在快取中存在並且還沒有過期,需要用這個key對應的新過期時間
  2. 為了能夠讓DelayQueue將其已儲存的key刪除,需要重寫實現Delayed介面新增到DelayQueue的DelayedItem的hashCode函式和equals函式
  3. 當快取關閉,監控程式也應關閉,因而監控執行緒應當用守護執行緒
/**
 * 利用延遲佇列,來書寫一個具有過期key效果的簡單快取  快取使用ConcurrentHashMap實現
 *
 * @author [email protected]
 * @description //
 * @date 2018/11/3 21:25
 */
public class Cache<K, V> {

    //模擬裝載快取資料
    public ConcurrentHashMap<K, V> map = new ConcurrentHashMap<>();
    //快取即將要過期的key們
    public DelayQueue<DelayedItem<K>> queue = new DelayQueue<>();

    public static void main(String[] args) throws InterruptedException {
        Random random = new Random();
        int cacheNumber = 10;
        int liveTime = 0;
        Cache<String, Integer> cache = new Cache<>();

        for (int i = 0; i < cacheNumber; i++) {
            liveTime = random.nextInt(3000);
            System.out.println(i + "  " + liveTime);
            cache.put(i + "", i, random.nextInt(liveTime));

            if (random.nextInt(cacheNumber) > 7) {
                liveTime = random.nextInt(3000);
                System.out.println(i + "  " + liveTime);
                cache.put(i + "", i, random.n