1. 程式人生 > >【多執行緒】BlockingQueue詳解

【多執行緒】BlockingQueue詳解

  • 前言:

     在新增的Concurrent包中,BlockingQueue很好的解決了多執行緒中,如何高效安全“傳輸”資料的問題。通過這些高效並且執行緒安全的佇列類,為我們快速搭建高質量的多執行緒程式帶來極大的便利。本文詳細介紹了BlockingQueue家庭中的所有成員,包括他們各自的功能以及常見使用場景。

  • 認識BlockingQueue
    阻塞佇列,顧名思義,首先它是一個佇列,而一個佇列在資料結構中所起的作用大致如下圖所示:

    從上圖我們可以很清楚看到,通過一個共享的佇列,可以使得資料由佇列的一端輸入,從另外一端輸出;
    常用的佇列主要有以下兩種:(當然通過不同的實現方式,還可以延伸出很多不同型別的佇列,DelayQueue就是其中的一種)
      先進先出(FIFO):先插入的佇列的元素也最先出佇列,類似於排隊的功能。從某種程度上來說這種佇列也體現了一種公平性。
      後進先出(LIFO):後插入佇列的元素最先出佇列,這種佇列優先處理最近發生的事件。

          多執行緒環境中,通過佇列可以很容易實現資料共享,比如經典的“生產者”和“消費者”模型中,通過佇列可以很便利地實現兩者之間的資料共享。假設我們有若干生產者執行緒,另外又有若干個消費者執行緒。如果生產者執行緒需要把準備好的資料共享給消費者執行緒,利用佇列的方式來傳遞資料,就可以很方便地解決他們之間的資料共享問題。但如果生產者和消費者在某個時間段內,萬一發生資料處理速度不匹配的情況呢?理想情況下,如果生產者產出資料的速度大於消費者消費的速度,並且當生產出來的資料累積到一定程度的時候,那麼生產者必須暫停等待一下(阻塞生產者執行緒),以便等待消費者執行緒把累積的資料處理完畢,反之亦然。然而,在concurrent包釋出以前,在多執行緒環境下,我們每個程式設計師都必須去自己控制這些細節,尤其還要兼顧效率和執行緒安全,而這會給我們的程式帶來不小的複雜度。好在此時,強大的concurrent包橫空出世了,而他也給我們帶來了強大的BlockingQueue。(在多執行緒領域:所謂阻塞,在某些情況下會掛起執行緒(即阻塞),一旦條件滿足,被掛起的執行緒又會自動被喚醒)
    下面兩幅圖演示了BlockingQueue的兩個常見阻塞場景:
           如上圖所示:當佇列中沒有資料的情況下,消費者端的所有執行緒都會被自動阻塞(掛起),直到有資料放入佇列。


       如上圖所示:當佇列中填滿資料的情況下,生產者端的所有執行緒都會被自動阻塞(掛起),直到佇列中有空的位置,執行緒被自動喚醒。
         這也是我們在多執行緒環境下,為什麼需要BlockingQueue的原因。作為BlockingQueue的使用者,我們再也不需要關心什麼時候需要阻塞執行緒,什麼時候需要喚醒執行緒,因為這一切BlockingQueue都給你一手包辦了。既然BlockingQueue如此神通廣大,讓我們一起來見識下它的常用方法:
    BlockingQueue的核心方法:
    放入資料:

      offer(anObject):表示如果可能的話,將anObject加到BlockingQueue裡,即如果BlockingQueue可以容納,
        則返回true,否則返回false.(本方法不阻塞當前執行方法的執行緒)
      offer(E o, long timeout, TimeUnit unit),可以設定等待的時間,如果在指定的時間內,還不能往佇列中
        加入BlockingQueue,則返回失敗。
      put(anObject):把anObject加到BlockingQueue裡,如果BlockQueue沒有空間,則呼叫此方法的執行緒被阻斷
        直到BlockingQueue裡面有空間再繼續.
    獲取資料:

      poll(time):取走BlockingQueue裡排在首位的物件,若不能立即取出,則可以等time引數規定的時間,
        取不到時返回null;
      poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的物件,如果在指定時間內,
        佇列一旦有資料可取,則立即返回佇列中的資料。否則知道時間超時還沒有資料可取,返回失敗。
      take():取走BlockingQueue裡排在首位的物件,若BlockingQueue為空,阻斷進入等待狀態直到
        BlockingQueue有新的資料被加入; 
      drainTo():一次性從BlockingQueue獲取所有可用的資料物件(還可以指定獲取資料的個數), 
        通過該方法,可以提升獲取資料效率;不需要多次分批加鎖或釋放鎖。
  • 常見BlockingQueue
    在瞭解了BlockingQueue的基本功能後,讓我們來看看BlockingQueue家庭大致有哪些成員? 
  • BlockingQueue成員詳細介紹
    1. ArrayBlockingQueue
          基於陣列的阻塞佇列實現,在ArrayBlockingQueue內部,維護了一個定長陣列,以便快取佇列中的資料物件,這是一個常用的阻塞佇列,除了一個定長陣列外,ArrayBlockingQueue內部還儲存著兩個整形變數,分別標識著佇列的頭部和尾部在陣列中的位置。
      ArrayBlockingQueue在生產者放入資料和消費者獲取資料,都是共用同一個鎖物件,由此也意味著兩者無法真正並行執行,這點尤其不同於LinkedBlockingQueue;按照實現原理來分析,ArrayBlockingQueue完全可以採用分離鎖,從而實現生產者和消費者操作的完全並行執行。Doug Lea之所以沒這樣去做,也許是因為ArrayBlockingQueue的資料寫入和獲取操作已經足夠輕巧,以至於引入獨立的鎖機制,除了給程式碼帶來額外的複雜性外,其在效能上完全佔不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不同之處在於,前者在插入或刪除元素時不會產生或銷燬任何額外的物件例項,而後者則會生成一個額外的Node物件。這在長時間內需要高效併發地處理大批量資料的系統中,其對於GC的影響還是存在一定的區別。而在建立ArrayBlockingQueue時,我們還可以控制物件的內部鎖是否採用公平鎖,預設採用非公平鎖。

    2. LinkedBlockingQueue
          基於連結串列的阻塞佇列,同ArrayListBlockingQueue類似,其內部也維持著一個數據緩衝佇列(該佇列由一個連結串列構成),當生產者往佇列中放入一個數據時,佇列會從生產者手中獲取資料,並快取在佇列內部,而生產者立即返回;只有當佇列緩衝區達到最大值快取容量時(LinkedBlockingQueue可以通過建構函式指定該值),才會阻塞生產者佇列,直到消費者從佇列中消費掉一份資料,生產者執行緒會被喚醒,反之對於消費者這端的處理也基於同樣的原理。而LinkedBlockingQueue之所以能夠高效的處理併發資料,還因為其對於生產者端和消費者端分別採用了獨立的鎖來控制資料同步,這也意味著在高併發的情況下生產者和消費者可以並行地操作佇列中的資料,以此來提高整個佇列的併發效能。
    作為開發者,我們需要注意的是,如果構造一個LinkedBlockingQueue物件,而沒有指定其容量大小,LinkedBlockingQueue會預設一個類似無限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產者的速度一旦大於消費者的速度,也許還沒有等到佇列滿阻塞產生,系統記憶體就有可能已被消耗殆盡了。

    ArrayBlockingQueue和LinkedBlockingQueue是兩個最普通也是最常用的阻塞佇列,一般情況下,在處理多執行緒間的生產者消費者問題,使用這兩個類足以。

    下面的程式碼演示瞭如何使用BlockingQueue:

    ?

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    import java.util.concurrent.BlockingQueue;

    import java.util.concurrent.ExecutorService;

    import java.util.concurrent.Executors;

    import java.util.concurrent.LinkedBlockingQueue;

    /**

    * @author jackyuj

    */

    public class BlockingQueueTest {

    public static void main(String[] args) throws InterruptedException {

    // 宣告一個容量為10的快取佇列

    BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);

    Producer producer1 = new Producer(queue);

    Producer producer2 = new Producer(queue);

    Producer producer3 = new Producer(queue);

    Consumer consumer = new Consumer(queue);

    // 藉助Executors

    ExecutorService service = Executors.newCachedThreadPool();

    // 啟動執行緒

    service.execute(producer1);

    service.execute(producer2);

    service.execute(producer3);

    service.execute(consumer);

    // 執行10s

    Thread.sleep(10 *1000);

    producer1.stop();

    producer2.stop();

    producer3.stop();

    Thread.sleep(2000);

    // 退出Executor

    service.shutdown();

    }

    }

    ?

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    40

    41

    42

    43

    44

    45

    46

    47

    48

    49

    50

    51

    52

    53

    54

    55

    56

    57

    58

    59

    60

    61

    62

    63

    64

    65

    66

    67

    68

    69

    70

    71

    72

    73

    74

    75

    76

    77

    78

    79

    80

    81

    82

    83

    84

    85

    86

    87

    88

    89

    90

    91

    92

    93

    94

    import java.util.Random;

    import java.util.concurrent.BlockingQueue;

    import java.util.concurrent.TimeUnit;

    /**

    * 消費者執行緒

    *

    * @author jackyuj

    */

    public class Consumerimplements Runnable {

    public Consumer(BlockingQueue<String> queue) {

    this.queue = queue;

    }

    public void run() {

    System.out.println("啟動消費者執行緒!");

    Random r = new Random();

    boolean isRunning = true;

    try {

    while (isRunning) {

    System.out.println("正從佇列獲取資料...");

    String data = queue.poll(2, TimeUnit.SECONDS);

    if (null != data) {

    System.out.println("拿到資料:" + data);

    System.out.println("正在消費資料:" + data);

    Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));

    }else {

    // 超過2s還沒資料,認為所有生產執行緒都已經退出,自動退出消費執行緒。

    isRunning = false;

    }

    }

    }catch (InterruptedException e) {

    e.printStackTrace();

    Thread.currentThread().interrupt();

    }finally {

    System.out.println("退出消費者執行緒!");

    }

    }

    private BlockingQueue<String> queue;

    private static final int      DEFAULT_RANGE_FOR_SLEEP = 1000;

    }

    import java.util.Random;

    import java.util.concurrent.BlockingQueue;

    import java.util.concurrent.TimeUnit;

    import java.util.concurrent.atomic.AtomicInteger;

    /**

    * 生產者執行緒

    *

    * @author jackyuj

    */

    public class Producerimplements Runnable {

    public Producer(BlockingQueue queue) {

    this.queue = queue;

    }

    public void run() {

    String data = null;

    Random r = new Random();

    System.out.println("啟動生產者執行緒!");

    try {

    while (isRunning) {

    System.out.println("正在生產資料...");

    Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));

    data = "data:" + count.incrementAndGet();

    System.out.println("將資料:" + data + "放入佇列...");

    if (!queue.offer(data,2, TimeUnit.SECONDS)) {

    System.out.println("放入資料失敗:" + data);

    }

    }

    }catch (InterruptedException e) {

    e.printStackTrace();

    Thread.currentThread().interrupt();

    }finally {

    System.out.println("退出生產者執行緒!");

    }

    }

    public void stop() {

    isRunning = false;

    }

    private volatile boolean      isRunning               = true;

    private BlockingQueue queue;

    private static AtomicInteger  count                   = new AtomicInteger();

    private static final int      DEFAULT_RANGE_FOR_SLEEP = 1000;

    }

  • 3. DelayQueue
          DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從佇列中獲取到該元素。DelayQueue是一個沒有大小限制的佇列,因此往佇列中插入資料的操作(生產者)永遠不會被阻塞,而只有獲取資料的操作(消費者)才會被阻塞。
    使用場景:
      DelayQueue使用場景較少,但都相當巧妙,常見的例子比如使用一個DelayQueue來管理一個超時未響應的連線佇列。

    4. PriorityBlockingQueue
          基於優先順序的阻塞佇列(優先順序的判斷通過建構函式傳入的Compator物件來決定),但需要注意的是PriorityBlockingQueue並不會阻塞資料生產者,而只會在沒有可消費的資料時,阻塞資料的消費者。因此使用的時候要特別注意,生產者生產資料的速度絕對不能快於消費者消費資料的速度,否則時間一長,會最終耗盡所有的可用堆記憶體空間。在實現PriorityBlockingQueue時,內部控制執行緒同步的鎖採用的是公平鎖。

    5. SynchronousQueue
          一種無緩衝的等待佇列,類似於無中介的直接交易,有點像原始社會中的生產者和消費者,生產者拿著產品去集市銷售給產品的最終消費者,而消費者必須親自去集市找到所要商品的直接生產者,如果一方沒有找到合適的目標,那麼對不起,大家都在集市等待。相對於有緩衝的BlockingQueue來說,少了一箇中間經銷商的環節(緩衝區),如果有經銷商,生產者直接把產品批發給經銷商,而無需在意經銷商最終會將這些產品賣給那些消費者,由於經銷商可以庫存一部分商品,因此相對於直接交易模式,總體來說採用中間經銷商的模式會吞吐量高一些(可以批量買賣);但另一方面,又因為經銷商的引入,使得產品從生產者到消費者中間增加了額外的交易環節,單個產品的及時響應效能可能會降低。
      宣告一個SynchronousQueue有兩種不同的方式,它們之間有著不太一樣的行為。公平模式和非公平模式的區別:
      如果採用公平模式:SynchronousQueue會採用公平鎖,並配合一個FIFO佇列來阻塞多餘的生產者和消費者,從而體系整體的公平策略;
      但如果是非公平模式(SynchronousQueue預設):SynchronousQueue採用非公平鎖,同時配合一個LIFO佇列來管理多餘的生產者和消費者,而後一種模式,如果生產者和消費者的處理速度有差距,則很容易出現飢渴的情況,即可能有某些生產者或者是消費者的資料永遠都得不到處理。
  • 小結
      BlockingQueue不光實現了一個完整佇列所具有的基本功能,同時在多執行緒環境下,他還自動管理了多線間的自動等待於喚醒功能,從而使得程式設計師可以忽略這些細節,關注更高階的功能。 
  • 歡迎關注公眾號:

相關推薦

執行BlockingQueue

前言:      在新增的Concurrent包中,BlockingQueue很好的解決了多執行緒中,如何高效安全“傳輸”資料的問題。通過這些高效並且執行緒安全的佇列類,為我們快速搭建高質量的多執行緒程式帶來極大的便利。本文詳細介紹了BlockingQueue家庭中的所有成員

執行BlockingQueue阻塞佇列

BlockingQueue繼承Queue介面BlockingQueue加入了阻塞等待的操作,可以理解成如果佇列滿了,插入任務就在門口等著,不丟擲錯誤資訊,直到有元素從佇列中取出,佇列有空位了,再進行插入操作,相應的你還可以加入等待超時機制,如果過時了,就不等了。public

執行執行互斥之synchronized

定義: 執行緒互斥是指某一資源同時只允許一個訪問者對其進行訪問,具有唯一性和排它性。但互斥無法限制訪問者對資源的訪問順序,即訪問是無序的。 我們都知道保證執行緒完整執行。則需要對其加鎖。使用synchronized關鍵字。在這裡鎖的物件理論上可以為任何物件。

執行執行

1、好處 第一:降低資源消耗。通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗。 第二:提高響應速度。當任務到達時,任務可以不需要等到執行緒建立就能立即執行。 第三:提高執行緒的可管理性。執行緒是稀缺資源,如果無限制地建立,不僅會消耗系統資源, 還會降低系統的穩定性,使用執行

執行ListenableFuture非同步執行查詢實現

  業務場景:為優化查詢效率,將原有查詢的條件做成單獨的索引表,每次產生記錄就會同步到索引表中,每次查詢索引表,根據索引便利的條件欄位再分別查詢每張子表的內容,最後封裝成前臺要的實體類。這裡面涉及到非同步查詢,如何保證一條記錄下的子表全部都查出來後才執行下面的操作。 下面Demo簡

Java執行——synchronized使用

一 前言 Java多執行緒是面試必考的知識點,哈哈,說的有點太絕對了,題外話不說那麼多了,今天我們就來了解一下Java多執行緒中的synchronized。 synchronized相信大家都看過或者用過,synchronized是Java中的關鍵字,synchronized可以

ThreadLocal 執行環境使用

ThreadLocal 介紹 ThreadLocal  就是一個Map。key - > Thread.getCurrentThread()。value - > 執行緒需要儲存的變數。在多執行緒環境中,相當於各自執行緒的一個內部map變數。 每個

執行在專案中用JAVA使用執行

一,初衷 因為在學習java基礎的時候,學習過兩種實現多執行緒的方法。今天在看一個文章的時候,看到了別人在專案中執行多執行緒。想到自己還沒用過,所以將別人的使用方法記錄下來,方便以後自己在專案中呼叫多執行緒可以嘗試一下。 二,多執行緒的實現 2.1 繼

執行屬性pthread_attr

Posix執行緒中的執行緒屬性pthread_attr_t主要包括scope屬性、detach屬性、堆疊地址、堆疊大小、優先順序。在pthread_create中,把第二個引數設定為NULL的話,將採用預設的屬性配置。 執行緒屬性結構如下: typedef s

執行Thread.interrupted()與thread.isInterrupted()的區別

在Java的執行緒基本操作方法中,有兩種方式獲取當前執行緒的isInterrupt屬性。一種是物件方法thread.isInterrupted(),另一種是Thread類的靜態方法Thread.interrupted()。這兩個方法看似相同,實際上是有區別的,我們來看看Jav

Java執行---阻塞佇列(舉例說明)

一. 前言   在新增的Concurrent包中,BlockingQueue很好的解決了多執行緒中,如何高效安全“傳輸”資料的問題。通過這些高效並且執行緒安全的佇列類,為我們快速搭建高質量的多執行緒程式帶來極大的便利。本文詳細介紹了BlockingQueue家庭中的所有成員

《瘋狂Java講義(第4版)》-----第16章執行執行通訊、執行池)

執行緒通訊 傳統的執行緒通訊 用synchonized同步的情況下,可以使用Object的三個方法: wait():釋放同步監視器,直到其他執行緒呼叫該同步監視器的notify()或notifyAll()方法 notify():喚醒此同步監視器上等待的單個執行緒

《瘋狂Java講義(第4版)》-----第16章執行(控制執行執行同步)

控制執行緒 join執行緒 等那個執行緒做完後,當前執行緒再做! import java.lang.Thread; public class MyThread extends Thread{ public MyThread(String name){ super(

《瘋狂Java講義(第4版)》-----第16章執行執行的建立及生命週期)

執行緒的獨立執行的,他並不知道程序是否還有其他執行緒存在 當作業系統建立一個程序時,必須為該程序分配獨立的記憶體空間,並分配大量的相關資源;但建立一個執行緒則簡單得多,因此使用多執行緒來實現併發比使用多程序實現併發的效能要高得多 多執行緒是非常有用的,一個瀏覽器必須能

java執行小示例

package com.chillax.controller; import java.io.IOException; public class MultiThread { public static void main(String[] args) { System.out.p

執行傳參

1.傳遞臨時物件做執行緒引數 1.1要避免的陷阱1 用detach()時,如果主執行緒先結束,變數就會被回收;所以用detach()的話,不推薦用引用,同時絕對不能用指標。 1.2要避免的陷阱2 只要臨時物件的用臨時構造A類物件作為引數傳遞給執行緒,那麼就一定能夠在主執行緒結束之

執行安全問題

在上一篇部落格中已經提到了什麼是多執行緒:https://blog.csdn.net/weixin_42647847/article/details/80969240那多執行緒在java中是如何實現的呢。一、實現多執行緒的四種方式1.繼承Thread類,重寫run方法2.實現

JAVA執行 join() 方法及應用場景

在某些情況下,主執行緒建立並啟動了子執行緒,如果子執行緒中需要進行大量的耗時運算,主執行緒往往將早於子執行緒結束之前結束,如果主執行緒想等待子執行緒執行完畢後,獲得子執行緒中的處理完的某個資料,就要用

執行——join、yield、wait、sleep的區別

join    通常由使用執行緒的程式呼叫,將大問題劃分為許多小問題,每個小問題分配一個執行緒,當所有小問題都得到處理後,再呼叫主執行緒進一步操作。 join(); Join(long mi

執行程式猿進階執行(四)—— 執行同步

一、前言       在上一篇部落格,小編向大家介紹了執行緒的狀態,算是進一步拉開了多執行緒的面試,在這篇部落格中,小編向大家介紹一下多執行緒中常見問題有執行緒同步和執行緒通訊,這篇部落格中小編向大家