1. 程式人生 > >Java中阻塞佇列的幾種實現方式

Java中阻塞佇列的幾種實現方式

1.wait()和notify()方式(摘自:https://segmentfault.com/a/1190000000373535)

阻塞佇列與普通佇列的區別在於,當佇列是空的時,從佇列中獲取元素的操作將會被阻塞,或者當佇列是滿時,往佇列裡新增元素的操作會被阻塞。試圖從空的阻塞佇列中獲取元素的執行緒將會被阻塞,直到其他的執行緒往空的佇列插入新的元素。同樣,試圖往已滿的阻塞佇列中新增新元素的執行緒同樣也會被阻塞,直到其他的執行緒使佇列重新變得空閒起來,如從佇列中移除一個或者多個元素,或者完全清空佇列。

執行緒1往阻塞佇列中新增元素,而執行緒2從阻塞佇列中移除元素。

具體實現:

public class BlockingQueue {

  private List queue = new LinkedList();
  private int  limit = 10;

  public BlockingQueue(int limit){
    this.limit = limit;
  }


  public synchronized void enqueue(Object item)
  throws InterruptedException  {
    while(this.queue.size() == this.limit) {
      wait();
    }
    if(this.queue.size() == 0) {
      notifyAll();
    }
    this.queue.add(item);
  }


  public synchronized Object dequeue()
  throws InterruptedException{
    while(this.queue.size() == 0){
      wait();
    }
    if(this.queue.size() == this.limit){
      notifyAll();
    }

    return this.queue.remove(0);
  }

}

必須注意到,在enqueue和dequeue方法內部,只有佇列的大小等於上限(limit)或者下限(0)時,才呼叫notifyAll方法。如果佇列的大小既不等於上限,也不等於下限,任何執行緒呼叫enqueue或者dequeue方法時,都不會阻塞,都能夠正常的往佇列中新增或者移除元素。

2.併發類實現

eg:使用ArrayBlockingQueue實現(摘自:http://blog.csdn.net/eson_15/article/details/51586127)

public class BlockingQueueTest {

    public static void main(String[] args) {
        final BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(3); //緩衝區允許放3個數據

        for(int i = 0; i < 2; i ++) {
            new Thread() { //開啟兩個執行緒不停的往緩衝區存資料

                @Override
                public void run() {
                    while(true) {
                        try {
                            Thread.sleep((long) (Math.random()*1000));
                            System.out.println(Thread.currentThread().getName() + "準備放資料"
                                    + (queue.size() == 3?"..佇列已滿,正在等待":"..."));
                            queue.put(1);
                            System.out.println(Thread.currentThread().getName() + "存入資料," 
                                    + "佇列目前有" + queue.size() + "個數據");
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        } 
                    }
                }

            }.start();
        }

        new Thread() { //開啟一個執行緒不停的從緩衝區取資料

            @Override
            public void run() {
                while(true) {
                    try {
                        Thread.sleep(1000);
                        System.out.println(Thread.currentThread().getName() + "準備取資料"
                                + (queue.size() == 0?"..佇列已空,正在等待":"..."));
                        queue.take();
                        System.out.println(Thread.currentThread().getName() + "取出資料," 
                                + "佇列目前有" + queue.size() + "個數據");
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    } 
                }
            }
        }.start();
    }

}

幾種常用的阻塞佇列
  • ArrayBlockingQueue :一個由陣列結構組成的有界阻塞佇列。
  • LinkedBlockingQueue :一個由連結串列結構組成的有界阻塞佇列。
  • PriorityBlockingQueue :一個支援優先順序排序的無界阻塞佇列。
  • DelayQueue:一個使用優先順序佇列實現的無界阻塞佇列。
  • SynchronousQueue:一個不儲存元素的阻塞佇列。
  • LinkedTransferQueue:一個由連結串列結構組成的無界阻塞佇列。
  • LinkedBlockingDeque:一個由連結串列結構組成的雙向阻塞佇列。
摘自:

http://ifeve.com/java-blocking-queue/

阻塞佇列的適用場景:

生產者和消費者速度不一致造成的佇列為空或者佇列過大的情況。