1. 程式人生 > >[引用]java多執行緒學習-java.util.concurrent詳解(四) BlockingQueue

[引用]java多執行緒學習-java.util.concurrent詳解(四) BlockingQueue

自:http://janeky.iteye.com/blog/770671
7.BlockingQueue
    “支援兩個附加操作的 Queue,這兩個操作是:獲取元素時等待佇列變為非空,以及儲存元素時等待空間變得可用。“

    這裡我們主要討論BlockingQueue的最典型實現:LinkedBlockingQueue 和ArrayBlockingQueue。兩者的不同是底層的資料結構不夠,一個是連結串列,另外一個是陣列。
   
    後面將要單獨解釋其他型別的BlockingQueue和SynchronousQueue

    BlockingQueue的經典用途是 生產者-消費者模式

    程式碼如下:
Java程式碼  收藏程式碼
  1. import java.util.Random;  
  2. import java.util.concurrent.BlockingQueue;  
  3. import java.util.concurrent.LinkedBlockingQueue;  
  4. public class TestBlockingQueue {  
  5.     public static void main(String[] args) {  
  6.         final BlockingQueue<Integer> queue=new LinkedBlockingQueue<Integer>(3);  
  7.         final Random random=new Random();  
  8.         class Producer implements Runnable{  
  9.             @Override  
  10.             public void run() {  
  11.                 while(true){  
  12.                     try {  
  13.                         int i=random.nextInt(100);  
  14.                         queue.put(i);//當佇列達到容量時候,會自動阻塞的  
  15.                         if(queue.size()==3)  
  16.                         {  
  17.                             System.out.println("full");  
  18.                         }  
  19.                     } catch (InterruptedException e) {  
  20.                         e.printStackTrace();  
  21.                     }  
  22.                 }  
  23.             }  
  24.         }  
  25.         class Consumer implements Runnable{  
  26.             @Override  
  27.             public void run() {  
  28.                 while(true){  
  29.                     try {  
  30.                         queue.take();//當佇列為空時,也會自動阻塞  
  31.                         Thread.sleep(1000);  
  32.                     } catch (InterruptedException e) {  
  33.                         e.printStackTrace();  
  34.                     }  
  35.                 }  
  36.             }  
  37.         }  
  38.         new Thread(new Producer()).start();  
  39.         new Thread(new Consumer()).start();  
  40.     }  
  41. }  

    總結:BlockingQueue使用時候特別注意take 和 put

8. DelayQueue

我們先來學習一下JDK1.5 API中關於這個類的詳細介紹:
    “它是包含Delayed 元素的一個無界阻塞佇列,只有在延遲期滿時才能從中提取元素。該佇列的頭部 是延遲期滿後儲存時間最長的 Delayed 元素。如果延遲都還沒有期滿,則佇列沒有頭部,並且 poll 將返回 null。當一個元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一個小於等於 0 的值時,將發生到期。即使無法使用 take 或 poll 移除未到期的元素,也不會將這些元素作為正常元素對待。例如,size 方法同時返回到期和未到期元素的計數。此佇列不允許使用 null 元素。”

    在現實生活中,很多DelayQueue的例子。就拿上海的SB會來說明,很多國家地區的開館時間不同。你很早就來到園區,然後急急忙忙地跑到一些心儀的館區,發現有些還沒開,你吃了閉門羹。

    仔細研究DelayQueue,你會發現它其實就是一個PriorityQueue的封裝(按照delay時間排序),裡面的元素都實現了Delayed介面,相關操作需要判斷延時時間是否到了。

    在實際應用中,有人拿它來管理跟實際相關的快取、session等

   下面我就通過 “上海SB會的例子來闡述DelayQueue的用法”

程式碼如下:
Java程式碼  收藏程式碼
  1. import java.util.Random;  
  2. import java.util.concurrent.DelayQueue;  
  3. import java.util.concurrent.Delayed;  
  4. import java.util.concurrent.TimeUnit;  
  5. public class TestDelayQueue {  
  6.     private class Stadium implements Delayed  
  7.     {  
  8.         long trigger;  
  9.         public Stadium(long i){  
  10.             trigger=System.currentTimeMillis()+i;  
  11.         }  
  12.         @Override  
  13.         public long getDelay(TimeUnit arg0) {  
  14.             long n=trigger-System.currentTimeMillis();  
  15.             return n;  
  16.         }  
  17.         @Override  
  18.         public int compareTo(Delayed arg0) {  
  19.             return (int)(this.getDelay(TimeUnit.MILLISECONDS)-arg0.getDelay(TimeUnit.MILLISECONDS));  
  20.         }  
  21.         public long getTriggerTime(){  
  22.             return trigger;  
  23.         }  
  24.     }  
  25.     public static void main(String[] args)throws Exception {  
  26.         Random random=new Random();  
  27.         DelayQueue<Stadium> queue=new DelayQueue<Stadium>();  
  28.         TestDelayQueue t=new TestDelayQueue();  
  29.         for(int i=0;i<5;i++){  
  30.             queue.add(t.new Stadium(random.nextInt(30000)));  
  31.         }  
  32.         Thread.sleep(2000);  
  33.         while(true){  
  34.             Stadium s=queue.take();//延時時間未到就一直等待  
  35.             if(s!=null){  
  36.                 System.out.println(System.currentTimeMillis()-s.getTriggerTime());//基本上是等於0  
  37.             }  
  38.             if(queue.size()==0)  
  39.                 break;  
  40.         }  
  41.     }  
  42. }  


    總結:適用於需要延時操作的佇列管理


9. SynchronousQueue
    我們先來學習一下JDK1.5 API中關於這個類的詳細介紹:

    “一種阻塞佇列,其中每個插入操作必須等待另一個執行緒的對應移除操作 ,反之亦然。同步佇列沒有任何內部容量,甚至連一個佇列的容量都沒有。不能在同步佇列上進行 peek,因為僅在試圖要移除元素時,該元素才存在;除非另一個執行緒試圖移除某個元素,否則也不能(使用任何方法)插入元素;也不能迭代佇列,因為其中沒 有元素可用於迭代。佇列的頭 是嘗試新增到佇列中的首個已排隊插入執行緒的元素;如果沒有這樣的已排隊執行緒,則沒有可用於移除的元素並且 poll() 將會返回 null。對於其他 Collection 方法(例如 contains),SynchronousQueue 作為一個空 collection。此佇列不允許 null 元素。
    同步佇列類似於 CSP 和 Ada 中使用的 rendezvous 通道。它非常適合於傳遞性設計,在這種設計中,在一個執行緒中執行的物件要將某些資訊、事件或任務傳遞給在另一個執行緒中執行的物件,它就必須與該物件同步。 “

    看起來很有意思吧。佇列竟然是沒有內部容量的。這個佇列其實是BlockingQueue的一種實現。每個插入操作必須等待另一個執行緒的對應移除操作,反之亦然。它給我們提供了線上程之間交換單一元素的極輕量級方法

   應用舉例:我們要在多個執行緒中傳遞一個變數。

   程式碼如下(其實就是生產者消費者模式)
Java程式碼  收藏程式碼
  1. import java.util.Arrays;  
  2. import java.util.List;  
  3. import java.util.concurrent.BlockingQueue;  
  4. import java.util.concurrent.SynchronousQueue;  
  5. public class TestSynchronousQueue {  
  6.     class Producer implements Runnable {  
  7.         private BlockingQueue<String> queue;  
  8.         List<String> objects = Arrays.asList("one", "two", "three");  
  9.         public Producer(BlockingQueue<String> q) {  
  10.             this.queue = q;  
  11.         }  
  12.         @Override  
  13.         public void run() {  
  14.             try {  
  15.                 for (String s : objects) {  
  16.                     queue.put(s);// 產生資料放入佇列中  
  17.                     System.out.printf("put:%s%n",s);  
  18.                 }  
  19.                 queue.put("Done");// 已完成的標誌  
  20.             } catch (InterruptedException e) {  
  21.                 e.printStackTrace();  
  22.             }  
  23.         }  
  24.     }  
  25.     class Consumer implements Runnable {  
  26.         private BlockingQueue<String> queue;  
  27.         public Consumer(BlockingQueue<String> q) {  
  28.             this.queue = q;  
  29.         }  
  30.         @Override  
  31.         public void run() {  
  32.             String obj = null;  
  33.             try {  
  34.                 while (!((obj = queue.take()).equals("Done"))) {  
  35.                     System.out.println(obj);//從佇列中讀取物件  
  36.                     Thread.sleep(3000);     //故意sleep,證明Producer是put不進去的  
  37.                 }  
  38.             } catch (InterruptedException e) {  
  39.                 e.printStackTrace();  
  40.             }  
  41.         }  
  42.     }  
  43.     public static void main(String[] args) {  
  44.         BlockingQueue<String> q=new SynchronousQueue<String>();  
  45.         TestSynchronousQueue t=new TestSynchronousQueue();  
  46.         new Thread(t.new Producer(q)).start();  
  47.         new Thread(t.new Consumer(q)).start();  
  48.     }  



以下引用自:http://www.cnblogs.com/jackyuj/archive/2010/11/24/1886553.html
其中有很形象的解釋和示例
  • 前言:

     在新增的Concurrent包中,BlockingQueue很好的解決了多執行緒中,如何高效安全“傳輸”資料的問題。通過這些高效並且執行緒安全的佇列 類,為我們快速搭建高質量的多執行緒程式帶來極大的便利。本文詳細介紹了BlockingQueue家庭中的所有成員,包括他們各自的功能以及常見使用場 景。

  • 認識BlockingQueue
    阻塞佇列,顧名思義,首先它是一個佇列,而一個佇列在資料結構中所起的作用大致如下圖所示:

    從上圖我們可以很清楚看到,通過一個共享的佇列,可以使得資料由佇列的一端輸入,從另外一端輸出;
    常用的佇列主要有以下兩種:(當然通過不同的實現方式,還可以延伸出很多不同型別的佇列,DelayQueue就是其中的一種)
      先進先出(FIFO):先插入的佇列的元素也最先出佇列,類似於排隊的功能。從某種程度上來說這種佇列也體現了一種公平性。
      後進先出(LIFO):後插入佇列的元素最先出佇列,這種佇列優先處理最近發生的事件。

          多執行緒環境中,通過佇列可以很容易實現資料共享,比如經典的“生產者”和“消費者”模型中,通過佇列可以很便利地實現兩者之間的資料共享。假設我們有若 幹生產者執行緒,另外又有若干個消費者執行緒。如果生產者執行緒需要把準備好的資料共享給消費者執行緒,利用佇列的方式來傳遞資料,就可以很方便地解決他們之間的 資料共享問題。但如果生產者和消費者在某個時間段內,萬一發生資料處理速度不匹配的情況呢?理想情況下,如果生產者產出資料的速度大於消費者消費的速度, 並且當生產出來的資料累積到一定程度的時候,那麼生產者必須暫停等待一下(阻塞生產者執行緒),以便等待消費者執行緒把累積的資料處理完畢,反之亦然。然而, 在concurrent包釋出以前,在多執行緒環境下,我們每個程式設計師都必須去自己控制這些細節,尤其還要兼顧效率和執行緒安全,而這會給我們的程式帶來不小 的複雜度。好在此時,強大的concurrent包橫空出世了,而他也給我們帶來了強大的BlockingQueue。(在多執行緒領域:所謂阻塞,在某些 情況下會掛起執行緒(即阻塞),一旦條件滿足,被掛起的執行緒又會自動被喚醒)
    下面兩幅圖演示了BlockingQueue的兩個常見阻塞場景:
           如上圖所示:當佇列中沒有資料的情況下,消費者端的所有執行緒都會被自動阻塞(掛起),直到有資料放入佇列。


       如上圖所示:當佇列中填滿資料的情況下,生產者端的所有執行緒都會被自動阻塞(掛起),直到佇列中有空的位置,執行緒被自動喚醒。
         這也是我們在多執行緒環境下,為什麼需要BlockingQueue的原因。作為BlockingQueue的使用者,我們再也不需要關心什麼時候需要阻塞 執行緒,什麼時候需要喚醒執行緒,因為這一切BlockingQueue都給你一手包辦了。既然BlockingQueue如此神通廣大,讓我們一起來見識下 它的常用方法:
    BlockingQueue的核心方法
    放入資料:
      offer(anObject):表示如果可能的話,將anObject加到BlockingQueue裡,即如果BlockingQueue可以容納,
        則返回true,否則返回false.(本方法不阻塞當前執行方法的執行緒)
      offer(E o, long timeout, TimeUnit unit),可以設定等待的時間,如果在指定的時間內,還不能往佇列中
        加入BlockingQueue,則返回失敗。
      put(anObject):把anObject加到BlockingQueue裡,如果BlockQueue沒有空間,則呼叫此方法的執行緒被阻斷
        直到BlockingQueue裡面有空間再繼續.
    獲取資料:
      poll(time):取走BlockingQueue裡排在首位的物件,若不能立即取出,則可以等time引數規定的時間,
        取不到時返回null;
      poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的物件,如果在指定時間內,
        佇列一旦有資料可取,則立即返回佇列中的資料。否則知道時間超時還沒有資料可取,返回失敗。
      take():取走BlockingQueue裡排在首位的物件,若BlockingQueue為空,阻斷進入等待狀態直到
        BlockingQueue有新的資料被加入; 
      drainTo():一次性從BlockingQueue獲取所有可用的資料物件(還可以指定獲取資料的個數), 
        通過該方法,可以提升獲取資料效率;不需要多次分批加鎖或釋放鎖。
  • 常見BlockingQueue
    在瞭解了BlockingQueue的基本功能後,讓我們來看看BlockingQueue家庭大致有哪些成員? 
     
  • BlockingQueue成員詳細介紹
    1. ArrayBlockingQueue

          基於陣列的阻塞佇列實現,在ArrayBlockingQueue內部,維護了一個定長陣列,以便快取佇列中的資料物件,這是一個常用的阻塞佇列,除了 一個定長陣列外,ArrayBlockingQueue內部還儲存著兩個整形變數,分別標識著佇列的頭部和尾部在陣列中的位置。
       ArrayBlockingQueue在生產者放入資料和消費者獲取資料,都是共用同一個鎖物件,由此也意味著兩者無法真正並行執行,這點尤其不同於 LinkedBlockingQueue;按照實現原理來分析,ArrayBlockingQueue完全可以採用分離鎖,從而實現生產者和消費者操作的 完全並行執行。Doug Lea之所以沒這樣去做,也許是因為ArrayBlockingQueue的資料寫入和獲取操作已經足夠輕巧,以至於引入獨立的鎖機制,除了給程式碼帶來額 外的複雜性外,其在效能上完全佔不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不同之處在於,前者在插入或刪除元素時不會產生或銷燬任 何額外的物件例項,而後者則會生成一個額外的Node物件。這在長時間內需要高效併發地處理大批量資料的系統中,其對於GC的影響還是存在一定的區別。而 在建立ArrayBlockingQueue時,我們還可以控制物件的內部鎖是否採用公平鎖,預設採用非公平鎖。

    2. LinkedBlockingQueue
          基於連結串列的阻塞佇列,同ArrayListBlockingQueue類似,其內部也維持著一個數據緩衝佇列(該佇列由一個連結串列構成),當生產者往佇列 中放入一個數據時,佇列會從生產者手中獲取資料,並快取在佇列內部,而生產者立即返回;只有當佇列緩衝區達到最大值快取容量時 (LinkedBlockingQueue可以通過建構函式指定該值),才會阻塞生產者佇列,直到消費者從佇列中消費掉一份資料,生產者執行緒會被喚醒,反 之對於消費者這端的處理也基於同樣的原理。而LinkedBlockingQueue之所以能夠高效的處理併發資料,還因為其對於生產者端和消費者端分別 採用了獨立的鎖來控制資料同步,這也意味著在高併發的情況下生產者和消費者可以並行地操作佇列中的資料,以此來提高整個佇列的併發效能。
    作為開 發 者,我們需要注意的是,如果構造一個LinkedBlockingQueue物件,而沒有指定其容量大小,LinkedBlockingQueue會預設 一個類似無限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產者的速度一旦大於消費者的速度,也許還沒有等到佇列滿阻塞產生,系統 記憶體就有可能已被消耗殆盡了。

    ArrayBlockingQueue和LinkedBlockingQueue是兩個最普通也是最常用的阻塞佇列,一般情況下,在處理多執行緒間的生產者消費者問題,使用這兩個類足以。

    下面的程式碼演示瞭如何使用BlockingQueue:
    01import java.util.concurrent.BlockingQueue;
    02import java.util.concurrent.ExecutorService;
    03import java.util.concurrent.Executors;
    04import java.util.concurrent.LinkedBlockingQueue;
    05
    06/**
    07* @author jackyuj
    08*/
    09public class BlockingQueueTest {
    10
    11public static void main(String[] args) throws InterruptedException {
    12// 宣告一個容量為10的快取佇列
    13BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
    14
    15Producer producer1 = new Producer(queue);
    16Producer producer2 = new Producer(queue);
    17Producer producer3 = new Producer(queue);
    18Consumer consumer = new Consumer(queue);
    19
    20// 藉助Executors
    21ExecutorService service = Executors.newCachedThreadPool();
    22// 啟動執行緒
    23service.execute(producer1);
    24service.execute(producer2);
    25service.execute(producer3);
    26service.execute(consumer);
    27
    28// 執行10s
    29Thread.sleep(10 * 1000);
    30producer1.stop();
    31producer2.stop();
    32producer3.stop();
    33
    34Thread.sleep(2000);
    35// 退出Executor
    36service.shutdown();
    37}
    38}
    01import java.util.Random;
    02import java.util.concurrent.BlockingQueue;
    03import java.util.concurrent.TimeUnit;
    04
    05/**
    06* 消費者執行緒
    07*
    08* @author jackyuj
    09*/
    10public class Consumer implements Runnable {
    11
    12public Consumer(BlockingQueue<String> queue) {
    13this.queue = queue;
    14}
    15
    16public void run() {
    17System.out.println("啟動消費者執行緒!");
    18Random r = new Random();
    19boolean isRunning = true;
    20try {
    21while (isRunning) {
    22System.out.println("正從佇列獲取資料...");
    23String data = queue.poll(2, TimeUnit.SECONDS);
    24if (null != data) {
    25System.out.println("拿到資料:" + data);
    26System.out.println("正在消費資料:" + data);
    27Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
    28} else {
    29// 超過2s還沒資料,認為所有生產執行緒都已經退出,自動退出消費執行緒。
    30isRunning = false;
    31}
    32}
    33} catch (InterruptedException e) {
    34e.printStackTrace();
    35Thread.currentThread().interrupt();
    36} finally {
    37System.out.println("退出消費者執行緒!");
    38}
    39}
    40
    41private BlockingQueue<String> queue;
    42private static final int      DEFAULT_RANGE_FOR_SLEEP = 1000;
    43}
    44
    45import java.util.Random;
    46import java.util.concurrent.BlockingQueue;
    47import java.util.concurrent.TimeUnit;
    48import java.util.concurrent.atomic.AtomicInteger;
    49
    50/**
    51* 生產者執行緒
    52*
    53* @author jackyuj
    54*/
    55public class Producer implements Runnable {
    56
    57public Producer(BlockingQueue queue) {
    58this.queue = queue;
    59}
    60
    61public void run() {
    62String data = null;
    63Random r = new Random();
    64
    65System.out.println("啟動生產者執行緒!");
    66try {
    67while (isRunning) {
    68System.out.println("正在生產資料...");
    69Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
    70
    71data = "data:" + count.incrementAndGet();
    72System.out.println("將資料:" + data + "放入佇列...");
    73if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
    74System.out.println("放入資料失敗:" + data);
    75}
    76}
    77} catch (InterruptedException e) {
    78e.printStackTrace();
    79Thread.currentThread().interrupt();
    80} finally {
    81System.out.println("退出生產者執行緒!");
    82}
    83}
    84
    85public void stop() {
    86isRunning = false;
    87}
    88
    89private volatile boolean      isRunning               = true;
    90private BlockingQueue queue;
    91private static AtomicInteger  count                   = new AtomicInteger();
    92private static final int      DEFAULT_RANGE_FOR_SLEEP = 1000;
    93
    94}
  • 3. DelayQueue
          DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從佇列中獲取到該元素。DelayQueue是一個沒有大小限制的佇列,因此往佇列中插入資料的操作(生產者)永遠不會被阻塞,而只有獲取資料的操作(消費者)才會被阻塞。
    使用場景:
      DelayQueue使用場景較少,但都相當巧妙,常見的例子比如使用一個DelayQueue來管理一個超時未響應的連線佇列。

    4. PriorityBlockingQueue
          基於優先順序的阻塞佇列(優先順序的判斷通過建構函式傳入的Compator物件來決定),但需要注意的是PriorityBlockingQueue並不 會阻塞資料生產者,而只會在沒有可消費的資料時,阻塞資料的消費者。因此使用的時候要特別注意,生產者生產資料的速度絕對不能快於消費者消費資料的速度, 否則時間一長,會最終耗盡所有的可用堆記憶體空間。在實現PriorityBlockingQueue時,內部控制執行緒同步的鎖採用的是公平鎖。

    5. SynchronousQueue
          一種無緩衝的等待佇列,類似於無中介的直接交易,有點像原始社會中的生產者和消費者,生產者拿著產品去集市銷售給產品的最終消費者,而消費者必須親自去 集市找到所要商品的直接生產者,如果一方沒有找到合適的目標,那麼對不起,大家都在集市等待。相對於有緩衝的BlockingQueue來說,少了一箇中 間經銷商的環節(緩衝區),如果有經銷商,生產者直接把產品批發給經銷商,而無需在意經銷商最終會將這些產品賣給那些消費者,由於經銷商可以庫存一部分商 品,因此相對於直接交易模式,總體來說採用中間經銷商的模式會吞吐量高一些(可以批量買賣);但另一方面,又因為經銷商的引入,使得產品從生產者到消費者 中間增加了額外的交易環節,單個產品的及時響應效能可能會降低。
      宣告一個SynchronousQueue有兩種不同的方式,它們之間有著不太一樣的行為。公平模式和非公平模式的區別:
      如果採用公平模式:SynchronousQueue會採用公平鎖,並配合一個FIFO佇列來阻塞多餘的生產者和消費者,從而體系整體的公平策略;
       但如果是非公平模式(SynchronousQueue預設):SynchronousQueue採用非公平鎖,同時配合一個LIFO佇列來管理多餘的 生產者和消費者,而後一種模式,如果生產者和消費者的處理速度有差距,則很容易出現飢渴的情況,即可能有某些生產者或者是消費者的資料永遠都得不到處理。
  • 小結
      BlockingQueue不光實現了一個完整佇列所具有的基本功能,同時在多執行緒環境下,他還自動管理了多線間的自動等待於喚醒功能,從而使得程式設計師可以忽略這些細節,關注更高階的功能。