1. 程式人生 > >Java併發包:阻塞佇列(BlockingQueue)

Java併發包:阻塞佇列(BlockingQueue)

BlockingQueue

在java.util.concurrent包中的 BlockingQueue介面類是一種執行緒安全的佇列。這篇文章我們將展示如何使用BlockingQueue。

這篇文章不討論BlockQueue的實現。如果你對此感興趣,有一片理論的文章 Blocking Queues

BlockingQueue的使用說明

BlockingQueue一般用於這樣的場景:一個執行緒生產物件,另一個執行緒來消耗物件,下面的插圖說明了這個規則:
這裡寫圖片描述
生產執行緒會持續生產新的物件並把他們插入到佇列中,直到佇列所能包含物件的最大上限。
如果阻塞佇列到達了上限,這時如果嘗試插入新的的物件,生產執行緒將會被阻塞。並且阻塞會一直保持直到消費執行緒從佇列中取出一個物件。

同樣,消費執行緒會持續從阻塞佇列中取出物件並處理他們。如果消費執行緒試圖從一個空的佇列中取出物件,消費執行緒將被阻塞住,直到生產執行緒向佇列中加入了一個物件。

BlockingQueue方法。

對於在佇列中插入、刪除和檢查元素操作BlockingQueue有4類不同行為的方法。

operation Throws Exception Special Value Blocks Times Out
Insert add(o) offer(o) put(o) offer(o, timeout, timeunit)
Remove remove(o) poll() take() poll(timeout, timeunit)
Examine element() peek()

四種不同行為的含義如下:

  • 1.拋異常
    如果嘗試操作是不可能的,一個異常將會丟擲。
  • 2.特殊值
    如果嘗試操作是不可能的,一個特殊值將返回(通常是true/false)
  • 3.阻塞
    如果嘗試操作是不可能的,方法將會阻塞住,直到可以執行。
  • 4.超時
    如果嘗試操作是不可能的,方法將會阻塞住,直到可以執行,但是阻塞不會超過給定的時間。並且返回一個特定的值來表示操作是否成功(一般是true/false)。

不能向BlockingQueue中插入null,否則會丟擲NullPointerException異常。

訪問BlockingQueue中的任意元素也是可能的,不僅僅是在佇列前端和末端的元素。例如,你已經排隊了一個待處理的物件(有了一個佇列),然而你的應用程式需要取消該物件。這時你可以呼叫remove(o)方法從佇列中移除該物件。然而,這種做法不是高效的,因此應該儘量避免使用,除非你真的需要這麼做。
####BlockingQueue的實現
由於BlockingQueue是一個介面,你需要使用它的具體實現類,java.util.concurrent包中了下面的對於BlockingQueue的具體實現:

  1. ArrayBlockingQueue
  2. DelayQueue
  3. LinkedBlockingQueue
  4. PriorityBlockingQueue

BlockingQueue示例

下面是BlockingQueue的使用示例。這個例子使用了BlockingQueued的一個具體實現類ArrayBlockingQueue。

首先,BlockingQueueExample類開啟了Producer和Consumer兩個不同的執行緒,Producer執行緒向共享BlockingQueue中插入字串,Consumer執行緒從中取出它們。

public class BlockingQueueExample {

    public static void main(String[] args) throws Exception {

        BlockingQueue queue = new ArrayBlockingQueue(1024);

        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);

        new Thread(producer).start();
        new Thread(consumer).start();

        Thread.sleep(4000);
    }
}

下面是Producer類,注意每次呼叫put()方法執行緒將睡1秒。這將引起Consumer阻塞,一直等待物件加入到佇列中。

public class Producer implements Runnable{

    protected BlockingQueue queue = null;

    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            queue.put("1");
            Thread.sleep(1000);
            queue.put("2");
            Thread.sleep(1000);
            queue.put("3");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

下面是Consumer類。它從佇列中取出物件,並打印出來結果。

public class Consumer implements Runnable{

    protected BlockingQueue queue = null;

    public Consumer(BlockingQueue queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            System.out.println(queue.take());
            System.out.println(queue.take());
            System.out.println(queue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

ArrayBlockingQueue

ArrayBlockingQueue是一個有邊界,阻塞佇列,元素儲存在內部的陣列當中。有邊界意味著它不能儲存無限的元素。這裡有一個可同時儲存元素的上邊界。你可以在例項化的時候設定這個上邊界,但之後這個值不能被改變。

ArrayBlockingQueue內部以FIFO(先進先出)的順序儲存元素。佇列頭部元素將在佇列中保持最長的時間。佇列尾部的元素將在佇列中保持最短的時間。

這裡展示如何例項化和使用一個ArrayBlockingQueue:

BlockingQueue queue = new ArrayBlockingQueue(1024);

queue.put("1");

Object object = queue.take();
```java
下面是一個BlockingQueue使用泛型的例子。

BlockingQueue queue = new ArrayBlockingQueue(1024);

queue.put(“1”);

String string = queue.take();

###DelayQueue
DelayQueue內部阻塞元素直到某個延遲到期。其中元素必須實現java.concurrent.Delayed介面。下面是Delayed介面:

```java
public interface Delayed extends Comparable<Delayed< {

 public long getDelay(TimeUnit timeUnit);

}

getDelay()方法返回的值代表元素被釋放前應該延遲的時間。如果返回的是0或者負數,延遲被認為是到期的或者說是期滿的,接下來在DelayQueue上呼叫take()等方法後,元素將會釋放。

傳給getDelay()方法的TimeUnit例項是Enum型別,它判斷那種延遲時間單位被返回。TimeUnit的可能的取值:

  • DAYS
  • HOURS
  • MINUTES
  • SECONDS
  • MILLISECONDS
  • MICROSECONDS

Delayed介面也繼承了java.lang.Comparable介面,正如你所看到的,這也意味著Delayed的物件可以相互比較。這可能被用在DelayQueue內部給佇列中元素排序,這樣元素將在期滿時將按照順序被釋放。

下面是一個如何使用DelayQueue的例子:

public class DelayQueueExample {

    public static void main(String[] args) {
        DelayQueue queue = new DelayQueue();

        Delayed element1 = new DelayedElement();

        queue.put(element1);

        Delayed element2 = queue.take();
    }
}

DelayedElement是我自己建立的對Delayed介面的一個具體實現,它不是java.util.concurrent包中的一部分。你在使用DelayQueue類時必須先建立一個你自己的Delayed介面的具體實現。

LinkedBlockingQueue

LinkedBlockingQueue內部使用一個連結串列結構儲存元素。這種連結串列結構能夠有個理想的上邊界,如果沒有指定上邊界,那麼使用Integer.MAX_VALUE作為上邊界。

LinkedBlockingQueue內部儲存元素遵循FIFO的規則。佇列頭部元素將在佇列中保持最長的時間。佇列尾部的元素將在佇列中保持最短的時間。

下面是如何例項化和使用LinkedBlockingQueue:

BlockingQueue<String> unbounded = new LinkedBlockingQueue<String>();
BlockingQueue<String> bounded   = new LinkedBlockingQueue<String>(1024);

bounded.put("Value");

String value = bounded.take();

PriorityBlockingQueue

PriorityBlockingQueue是一種無邊界的併發佇列。它遵循java.util.PriorityQueue類同樣的規則。這種佇列中不能插入null值。

優先佇列是不同於先進先出佇列的另一種佇列。每次從佇列中取出的是具有最高優先權的元素。

PriorityQueue是從JDK1.5開始提供的新的資料結構介面。
如果不提供Comparator的話,優先佇列中元素預設按自然順序排列,也就是數字預設是小的在佇列頭,字串則按字典序排列。

所有插入到PriorityBlockingQueue 中的元素必須實現java.lang.Comparable介面。因此元素會根據你的Comparable實現來排序。

注意:PriorityBlockingQueue 中不能強制指定元素具有相同的優先順序(compare() == 0).
同時也要注意,假使得到PriorityBlockingQueue 的迭代器(Iterator),迭代器不能保證按元素優先順序迭代。

下面是一個如何使用PriorityBlockingQueue的例子:

BlockingQueue queue   = new PriorityBlockingQueue();

    //String implements java.lang.Comparable
    queue.put("Value");

    String value = queue.take();

SynchronousQueue

SynchronousQueue 是內部僅包含單一元素的佇列。一個向佇列中插入元素的執行緒會被阻塞直到另一個執行緒從佇列中取走元素。同樣的,如果一個執行緒試圖取出元素而當前沒有元素,這個執行緒會被阻塞直到另一個執行緒向佇列中插入了元素。
稱這個類為一個佇列有點誇張。它更像是一個會合點。

In fact, SynchronousQueue has no size at all. There is a direct handoff between Producer and Consumer thread.