1. 程式人生 > >Java多執行緒/併發26、阻塞佇列BlockingQueue

Java多執行緒/併發26、阻塞佇列BlockingQueue

BlockingQueue介面定義了一種佇列,這種佇列通常容量是提前固定(確定了容量大小)的。容量滿時往BlockingQueue中新增資料時會造成阻塞,容量為空時取元素操作會阻塞。

我們可以認為BlockingQueue佇列是一個水庫。水庫滿了的時侯,上游的水就要被攔住,不能再往水庫裡灌了。平時農莊澆灌,生活飲用都用這裡面的水。如果水庫幹了,那麼要灌溉的工人只能等著上游放水後,才能繼續工作。
實際上我們已經用Condition實現過一次了,見《利用Condition來實現阻塞佇列》。

BlockingQueue多用於生產-消費模式。還記得Future兌換月餅的例子嗎?裡面用到的CompletionService的實現用的就是一個儲存Future物件的BlockingQueue

在JUC中,BlockingQueue介面有以下實現:
這裡寫圖片描述
ArrayBlockingQueue基於陣列的阻塞佇列實現,在ArrayBlockingQueue內部,維護了一個定長陣列,以便快取佇列中的資料物件,這是一個常用的阻塞佇列,除了一個定長陣列外,ArrayBlockingQueue內部還儲存著兩個整形變數putIndex和takeIndex,分別標識著佇列的頭部和尾部在陣列中的位置。
這裡寫圖片描述

我們用ArrayBlockingQueue改寫一下領月餅的程式:

public class ArrayBlockingQueueDemo {
    public static void
main(String[] args) { final BlockingQueue<Integer> abqueue=new ArrayBlockingQueue<Integer>(10); /* 定義生產者:用來做月餅的Callable */ final Runnable runnable = new Runnable() { public void run() { try { /*模擬耗時操作,需要5秒*/ Thread.sleep(5000
); /*在佇列中放上一盒做好的月餅編號*/ abqueue.put(new Random().nextInt(10000)); } catch (InterruptedException e) { e.printStackTrace(); } } }; /*開啟執行緒B--消費者:獲取月餅*/ Runnable constmor_runnable=new Runnable() { public void run() { try { ExecutorService tPool = Executors.newCachedThreadPool(); System.out.println("老闆,給我開始做月餅..."); /*啟動執行緒A--生產者:執行耗時操作,三條生產線開始生產月餅*/ for(int i=0;i<3;i++){ tPool.submit(runnable); } /*拿月餅*/ System.out.println("5秒鐘後......"); for(int i=0;i<3;i++){ /*生產月餅需要5秒,這時執行緒執行到take()這裡,還沒有月餅,因此就在這裡阻塞等待。*/ System.out.println("用月餅券兌換到月餅,該盒月餅編號:"+abqueue.take()); } System.out.println("拿餅回家..."); } catch (InterruptedException e) { e.printStackTrace(); } } }; new Thread(constmor_runnable).start(); } }

另外再說說LinkedBlockingQueue:
LinkedBlockingQueue是一個用連結串列實現的有界阻塞佇列。此佇列的預設和最大長度為Integer.MAX_VALUE。
它與ArrayBlockingQueue與使用一樣,上例中可以直接用LinkedBlockingDeque替換

附:
BlockingQueue介面常用方法:
放入資料:

  • offer(anObject):表示如果可能的話,將anObject加到BlockingQueue裡,即如果BlockingQueue可以容納,則返回true,否則返回false.(本方法不阻塞當前執行方法的執行緒)
  • offer(E o, long timeout, TimeUnit unit),可以設定等待的時間,如果在指定的時間內,還不能往佇列中加入BlockingQueue,則返回失敗。
  • put(anObject):把anObject加到BlockingQueue裡,如果BlockQueue沒有空間,則呼叫此方法的執行緒被阻斷直到BlockingQueue裡面有空間再繼續.

獲取資料:

  • poll(time):取走BlockingQueue裡排在首位的物件,若不能立即取出,則可以等time引數規定的時間,取不到時返回null;
  • poll(long timeout, TimeUnitunit):從BlockingQueue取出一個隊首的物件,如果在指定時間內,佇列一旦有資料可取,則立即返回佇列中的資料。否則知道時間超時還沒有資料可取,返回失敗。
  • take():取走BlockingQueue裡排在首位的物件,若BlockingQueue為空,阻斷進入等待狀態直到BlockingQueue有新的資料被加入;
  • drainTo():一次性從BlockingQueue獲取所有可用的資料物件(還可以指定獲取資料的個數),通過該方法,可以提升獲取資料效率;不需要多次分批加鎖或釋放鎖。