1. 程式人生 > >【Java】生產者消費者模式的實現

【Java】生產者消費者模式的實現

前言

生產者消費者問題是執行緒模型中的經典問題:生產者和消費者在同一時間段內共用同一儲存空間,生產者向空間裡生產資料,而消費者取走資料。

阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。這個阻塞佇列就是用來給生產者和消費者解耦的。

wait/notify方法

首先,我們搞清楚Thread.sleep()方法和Object.wait()、Object.notify()方法的區別。根據這篇文章java sleep和wait的區別的疑惑?

  1. sleep()是Thread類的方法;而wait()notify()notifyAll()是Object類中定義的方法;儘管這兩個方法都會影響執行緒的執行行為,但是本質上是有區別的。

  2. Thread.sleep()不會導致鎖行為的改變,如果當前執行緒是擁有鎖的,那麼Thread.sleep()不會讓執行緒釋放鎖。如果能夠幫助你記憶的話,可以簡單認為和鎖相關的方法都定義在Object類中,因此呼叫Thread.sleep()是不會影響鎖的相關行為。

  3. Thread.sleepObject.wait都會暫停當前的執行緒,對於CPU資源來說,不管是哪種方式暫停的執行緒,都表示它暫時不再需要CPU的執行時間。OS會將執行時間分配給其它執行緒。區別是呼叫wait後,需要別的執行緒執行notify/notifyAll才能夠重新獲得CPU執行時間。

執行緒狀態圖:

  • Thread.sleep()
    讓執行緒從 【running】 -> 【阻塞態】 時間結束/interrupt -> 【runnable】
  • Object.wait()讓執行緒從 【running】 -> 【等待佇列】notify -> 【鎖池】 -> 【runnable】

實現生產者消費者模型

生產者消費者問題是研究多執行緒程式時繞不開的經典問題之一,它描述是有一塊緩衝區作為倉庫,生產者可以將產品放入倉庫,消費者則可以從倉庫中取走產品。在Java中一共有四種方法支援同步,其中前三個是同步方法,一個是管道方法。

(1)Object的wait() / notify()方法
(2)LockCondition

的await() / signal()方法
(3)BlockingQueue阻塞佇列方法
(4)PipedInputStream / PipedOutputStream

本文只介紹最常用的前三種,第四種暫不做討論。原始碼在這裡:Java實現生產者消費者模型

1. 使用Object的wait() / notify()方法

wait()/ nofity()方法是基類Object的兩個方法,也就意味著所有Java類都會擁有這兩個方法,這樣,我們就可以為任何物件實現同步機制。

  • wait():當緩衝區已滿/空時,生產者/消費者執行緒停止自己的執行,放棄鎖,使自己處於等待狀態,讓其他執行緒執行。
  • notify():當生產者/消費者向緩衝區放入/取出一個產品時,向其他等待的執行緒發出可執行的通知,同時放棄鎖,使自己處於等待狀態。
/**
 * 生產者消費者模式:使用Object.wait() / notify()方法實現
 */
public class ProducerConsumer {
    private static final int CAPACITY = 5;

    public static void main(String args[]){
        Queue<Integer> queue = new LinkedList<Integer>();

        Thread producer1 = new Producer("P-1", queue, CAPACITY);
        Thread producer2 = new Producer("P-2", queue, CAPACITY);
        Thread consumer1 = new Consumer("C1", queue, CAPACITY);
        Thread consumer2 = new Consumer("C2", queue, CAPACITY);
        Thread consumer3 = new Consumer("C3", queue, CAPACITY);

        producer1.start();
        producer2.start();
        consumer1.start();
        consumer2.start();
        consumer3.start();
    }

    /**
     * 生產者
     */
    public static class Producer extends Thread{
        private Queue<Integer> queue;
        String name;
        int maxSize;
        int i = 0;

        public Producer(String name, Queue<Integer> queue, int maxSize){
            super(name);
            this.name = name;
            this.queue = queue;
            this.maxSize = maxSize;
        }

        @Override
        public void run(){
            while(true){
                synchronized(queue){
                    while(queue.size() == maxSize){
                        try {
                            System.out .println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue.");
                            queue.wait();
                        } catch (Exception ex) {
                            ex.printStackTrace();
                        }
                    }
                    System.out.println("[" + name + "] Producing value : +" + i);
                    queue.offer(i++);
                    queue.notifyAll();

                    try {
                        Thread.sleep(new Random().nextInt(1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }

        }
    }

    /**
     * 消費者
     */
    public static class Consumer extends Thread{
        private Queue<Integer> queue;
        String name;
        int maxSize;

        public Consumer(String name, Queue<Integer> queue, int maxSize){
            super(name);
            this.name = name;
            this.queue = queue;
            this.maxSize = maxSize;
        }

        @Override
        public void run(){
            while(true){
                synchronized(queue){
                    while(queue.isEmpty()){
                        try {
                            System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer");
                            queue.wait();
                        } catch (Exception ex) {
                            ex.printStackTrace();
                        }
                    }
                    int x = queue.poll();
                    System.out.println("[" + name + "] Consuming value : " + x);
                    queue.notifyAll();

                    try {
                        Thread.sleep(new Random().nextInt(1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}
注意要點

判斷Queue大小為0或者大於等於queueSize時須使用 while (condition) {},不能使用 if(condition) {}。其中 while(condition)迴圈,它又被叫做“自旋鎖”。自旋鎖以及wait()notify()方法在執行緒通訊這篇文章中有更加詳細的介紹。為防止該執行緒沒有收到notify()呼叫也從wait()中返回(也稱作虛假喚醒),這個執行緒會重新去檢查condition條件以決定當前是否可以安全地繼續執行還是需要重新保持等待,而不是認為執行緒被喚醒了就可以安全地繼續執行了。

輸出日誌如下:

[P-1] Producing value : +0
[P-1] Producing value : +1
[P-1] Producing value : +2
[P-1] Producing value : +3
[P-1] Producing value : +4
Queue is full, Producer[P-1] thread waiting for consumer to take something from queue.
[C3] Consuming value : 0
[C3] Consuming value : 1
[C3] Consuming value : 2
[C3] Consuming value : 3
[C3] Consuming value : 4
Queue is empty, Consumer[C3] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C1] thread is waiting for Producer
[P-2] Producing value : +0
[C1] Consuming value : 0
Queue is empty, Consumer[C1] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C3] thread is waiting for Producer
[P-1] Producing value : +5
[P-1] Producing value : +6
[P-1] Producing value : +7
[P-1] Producing value : +8
[P-1] Producing value : +9
Queue is full, Producer[P-1] thread waiting for consumer to take something from queue.
[C3] Consuming value : 5
[C3] Consuming value : 6
[C3] Consuming value : 7
[C3] Consuming value : 8
[C3] Consuming value : 9
Queue is empty, Consumer[C3] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C1] thread is waiting for Producer
[P-2] Producing value : +1
[C1] Consuming value : 1
Queue is empty, Consumer[C1] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C3] thread is waiting for Producer
[P-1] Producing value : +10
[P-1] Producing value : +11
[P-1] Producing value : +12
[P-1] Producing value : +13
[P-1] Producing value : +14
Queue is full, Producer[P-1] thread waiting for consumer to take something from queue.
[C3] Consuming value : 10
[C3] Consuming value : 11
[C3] Consuming value : 12
[C3] Consuming value : 13
[C3] Consuming value : 14
Queue is empty, Consumer[C3] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C1] thread is waiting for Producer
[P-2] Producing value : +2
[P-2] Producing value : +3
[P-2] Producing value : +4
[P-2] Producing value : +5
[P-2] Producing value : +6
Queue is full, Producer[P-2] thread waiting for consumer to take something from queue.
[C1] Consuming value : 2
[C1] Consuming value : 3
[C1] Consuming value : 4
[C1] Consuming value : 5
[C1] Consuming value : 6
Queue is empty, Consumer[C1] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C3] thread is waiting for Producer
[P-1] Producing value : +15
[C3] Consuming value : 15
Queue is empty, Consumer[C3] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C1] thread is waiting for Producer
[P-2] Producing value : +7
[P-2] Producing value : +8
[P-2] Producing value : +9

2. 使用Lock和Condition的await() / signal()方法

在JDK5.0之後,Java提供了更加健壯的執行緒處理機制,包括同步、鎖定、執行緒池等,它們可以實現更細粒度的執行緒控制。Condition介面的await()signal()就是其中用來做同步的兩種方法,它們的功能基本上和Object的wait()/ nofity()相同,完全可以取代它們,但是它們和新引入的鎖定機制Lock直接掛鉤,具有更大的靈活性。通過在Lock物件上呼叫newCondition()方法,將條件變數和一個鎖物件進行繫結,進而控制併發程式訪問競爭資源的安全。下面來看程式碼:

/**
 * 生產者消費者模式:使用Lock和Condition實現
 * {@link java.util.concurrent.locks.Lock}
 * {@link java.util.concurrent.locks.Condition}
 */
public class ProducerConsumerByLock {
    private static final int CAPACITY = 5;
    private static final Lock lock = new ReentrantLock();
    private static final Condition fullCondition = lock.newCondition();     //佇列滿的條件
    private static final Condition emptyCondition = lock.newCondition();        //佇列空的條件


    public static void main(String args[]){
        Queue<Integer> queue = new LinkedList<Integer>();

        Thread producer1 = new Producer("P-1", queue, CAPACITY);
        Thread producer2 = new Producer("P-2", queue, CAPACITY);
        Thread consumer1 = new Consumer("C1", queue, CAPACITY);
        Thread consumer2 = new Consumer("C2", queue, CAPACITY);
        Thread consumer3 = new Consumer("C3", queue, CAPACITY);

        producer1.start();
        producer2.start();
        consumer1.start();
        consumer2.start();
        consumer3.start();
    }

    /**
     * 生產者
     */
    public static class Producer extends Thread{
        private Queue<Integer> queue;
        String name;
        int maxSize;
        int i = 0;

        public Producer(String name, Queue<Integer> queue, int maxSize){
            super(name);
            this.name = name;
            this.queue = queue;
            this.maxSize = maxSize;
        }

        @Override
        public void run(){
            while(true){

                //獲得鎖
                lock.lock();
                while(queue.size() == maxSize){
                    try {
                        System.out .println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue.");
                        //條件不滿足,生產阻塞
                        fullCondition.await();
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
                System.out.println("[" + name + "] Producing value : +" + i);
                queue.offer(i++);

                //喚醒其他所有生產者、消費者
                fullCondition.signalAll();
                emptyCondition.signalAll();

                //釋放鎖
                lock.unlock();

                try {
                    Thread.sleep(new Random().nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }
    }

    /**
     * 消費者
     */
    public static class Consumer extends Thread{
        private Queue<Integer> queue;
        String name;
        int maxSize;

        public Consumer(String name, Queue<Integer> queue, int maxSize){
            super(name);
            this.name = name;
            this.queue = queue;
            this.maxSize = maxSize;
        }

        @Override
        public void run(){
            while(true){
                //獲得鎖
                lock.lock();

                while(queue.isEmpty()){
                    try {
                        System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer");
                        //條件不滿足,消費阻塞
                        emptyCondition.await();
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
                int x = queue.poll();
                System.out.println("[" + name + "] Consuming value : " + x);

                //喚醒其他所有生產者、消費者
                fullCondition.signalAll();
                emptyCondition.signalAll();

                //釋放鎖
                lock.unlock();

                try {
                    Thread.sleep(new Random().nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

輸入日誌如下:

[P-1] Producing value : +0
[C1] Consuming value : 0
Queue is empty, Consumer[C3] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
[P-2] Producing value : +0
[C3] Consuming value : 0
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C1] thread is waiting for Producer
[P-2] Producing value : +1
[C2] Consuming value : 1
Queue is empty, Consumer[C1] thread is waiting for Producer
Queue is empty, Consumer[C3] thread is waiting for Producer
[P-1] Producing value : +1
[C1] Consuming value : 1
Queue is empty, Consumer[C3] thread is waiting for Producer
[P-1] Producing value : +2
[C3] Consuming value : 2
Queue is empty, Consumer[C2] thread is waiting for Producer
[P-2] Producing value : +2
[C2] Consuming value : 2
Queue is empty, Consumer[C1] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
[P-1] Producing value : +3
[C1] Consuming value : 3
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C1] thread is waiting for Producer
Queue is empty, Consumer[C3] thread is waiting for Producer
[P-2] Producing value : +3
[C2] Consuming value : 3
Queue is empty, Consumer[C1] thread is waiting for Producer
Queue is empty, Consumer[C3] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
[P-1] Producing value : +4
[C1] Consuming value : 4
Queue is empty, Consumer[C3] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C1] thread is waiting for Producer
[P-2] Producing value : +4
[C3] Consuming value : 4
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C1] thread is waiting for Producer
[P-2] Producing value : +5
[C2] Consuming value : 5
Queue is empty, Consumer[C1] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
[P-1] Producing value : +5
[C1] Consuming value : 5
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C3] thread is waiting for Producer
[P-2] Producing value : +6
[C2] Consuming value : 6
Queue is empty, Consumer[C3] thread is waiting for Producer
[P-1] Producing value : +6
[C3] Consuming value : 6
Queue is empty, Consumer[C3] thread is waiting for Producer
Queue is empty, Consumer[C1] thread is waiting for Producer
[P-2] Producing value : +7
[C3] Consuming value : 7
Queue is empty, Consumer[C1] thread is waiting for Producer
[P-1] Producing value : +7
[C1] Consuming value : 7
Queue is empty, Consumer[C2] thread is waiting for Producer
[P-2] Producing value : +8
[C2] Consuming value : 8
[P-1] Producing value : +8
[C1] Consuming value : 8
[P-2] Producing value : +9
[C3] Consuming value : 9
[P-2] Producing value : +10
[C2] Consuming value : 10
[P-1] Producing value : +9
[P-1] Producing value : +10
[C1] Consuming value : 9
[P-2] Producing value : +11
[C3] Consuming value : 10
[C2] Consuming value : 11
[P-2] Producing value : +12
[C1] Consuming value : 12
[P-1] Producing value : +11
[C3] Consuming value : 11
[P-2] Producing value : +13
[C2] Consuming value : 13
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C3] thread is waiting for Producer
[P-1] Producing value : +12
[C2] Consuming value : 12
Queue is empty, Consumer[C3] thread is waiting for Producer
[P-1] Producing value : +13
[C3] Consuming value : 13
Queue is empty, Consumer[C1] thread is waiting for Producer
Queue is empty, Consumer[C3] thread is waiting for Producer
[P-2] Producing value : +14
[C1] Consuming value : 14
Queue is empty, Consumer[C3] thread is waiting for Producer
Queue is empty, Consumer[C1] thread is waiting for Producer
[P-1] Producing value : +14
[C3] Consuming value : 14
Queue is empty, Consumer[C1] thread is waiting for Producer
[P-1] Producing value : +15
[C1] Consuming value : 15
[P-2] Producing value : +15
[P-1] Producing value : +16
[C3] Consuming value : 15
[P-2] Producing value : +16

3. 使用BlockingQueue阻塞佇列方法

JDK 1.5 以後新增的 java.util.concurrent包新增了 BlockingQueue 介面。並提供瞭如下幾種阻塞佇列實現:

  • java.util.concurrent.ArrayBlockingQueue
  • java.util.concurrent.LinkedBlockingQueue
  • java.util.concurrent.SynchronousQueue
  • java.util.concurrent.PriorityBlockingQueue

實現生產者-消費者模型使用 ArrayBlockingQueue或者 LinkedBlockingQueue即可。

我們這裡使用LinkedBlockingQueue,它是一個已經在內部實現了同步的佇列,實現方式採用的是我們第2種await()/ signal()方法。它可以在生成物件時指定容量大小。它用於阻塞操作的是put()和take()方法。

  • put()方法:類似於我們上面的生產者執行緒,容量達到最大時,自動阻塞。
  • take()方法:類似於我們上面的消費者執行緒,容量為0時,自動阻塞。

我們可以跟進原始碼看一下LinkedBlockingQueue類的put()方法實現:

/** Main lock guarding all access */
final ReentrantLock lock = new ReentrantLock();

/** Condition for waiting takes */
private final Condition notEmpty = lock.newCondition();

/** Condition for waiting puts */
private final Condition notFull = lock.newCondition();



public void put(E e) throws InterruptedException {
    putLast(e);
}

public void putLast(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        while (!linkLast(node))
            notFull.await();
    } finally {
        lock.unlock();
    }
}

看到這裡證實了它的實現方式採用的是我們第2種await()/ signal()方法。下面我們就使用它實現吧。

/**
 * 生產者消費者模式:使用{@link java.util.concurrent.BlockingQueue}實現
 */
public class ProducerConsumerByBQ{
    private static final int CAPACITY = 5;

    public static void main(String args[]){
        LinkedBlockingDeque<Integer> blockingQueue = new LinkedBlockingDeque<Integer>(CAPACITY);

        Thread producer1 = new Producer("P-1", blockingQueue, CAPACITY);
        Thread producer2 = new Producer("P-2", blockingQueue, CAPACITY);
        Thread consumer1 = new Consumer("C1", blockingQueue, CAPACITY);
        Thread consumer2 = new Consumer("C2", blockingQueue, CAPACITY);
        Thread consumer3 = new Consumer("C3", blockingQueue, CAPACITY);

        producer1.start();
        producer2.start();
        consumer1.start();
        consumer2.start();
        consumer3.start();
    }

    /**
     * 生產者
     */
    public static class Producer extends Thread{
        private LinkedBlockingDeque<Integer> blockingQueue;
        String name;
        int maxSize;
        int i = 0;

        public Producer(String name, LinkedBlockingDeque<Integer> queue, int maxSize){
            super(name);
            this.name = name;
            this.blockingQueue = queue;
            this.maxSize = maxSize;
        }

        @Override
        public void run(){
            while(true){
                try {
                    blockingQueue.put(i);
                    System.out.println("[" + name + "] Producing value : +" + i);
                    i++;

                    //暫停最多1秒
                    Thread.sleep(new Random().nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }
    }

    /**
     * 消費者
     */
    public static class Consumer extends Thread{
        private LinkedBlockingDeque<Integer> blockingQueue;
        String name;
        int maxSize;

        public Consumer(String name, LinkedBlockingDeque<Integer> queue, int maxSize){
            super(name);
            this.name = name;
            this.blockingQueue = queue;
            this.maxSize = maxSize;
        }

        @Override
        public void run(){
            while(true){
                try {
                    int x = blockingQueue.take();
                    System.out.println("[" + name + "] Consuming : " + x);

                    //暫停最多1秒
                    Thread.sleep(new Random().nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

輸出日誌如下:

[P-2] Producing value : +0
[P-1] Producing value : +0
[C1] Consuming : 0
[C3] Consuming : 0
[P-2] Producing value : +1
[C2] Consuming : 1
[P-2] Producing value : +2
[C1] Consuming : 2
[P-1] Producing value : +1
[C2] Consuming : 1
[P-1] Producing value : +2
[C3] Consuming : 2
[P-1] Producing value : +3
[C2] Consuming : 3
[P-2] Producing value : +3
[C1] Consuming : 3
[P-1] Producing value : +4
[C2] Consuming : 4
[P-2] Producing value : +4
[C3] Consuming : 4
[P-2] Producing value : +5
[C1] Consuming : 5
[P-1] Producing value : +5
[C2] Consuming : 5
[P-1] Producing value : +6
[C1] Consuming : 6
[P-2] Producing value : +6
[C2] Consuming : 6
[P-2] Producing value : +7
[C2] Consuming : 7
[P-1] Producing value : +7
[C1] Consuming : 7
[P-2] Producing value : +8
[C3] Consuming : 8
[P-2] Producing value : +9
[C2] Consuming : 9
[P-1] Producing value : +8
[C2] Consuming : 8
[P-2] Producing value : +10
[C1] Consuming : 10
[P-1] Producing value : +9
[C3] Consuming : 9
[P-1] Producing value : +10
[C2] Consuming : 10
[P-2] Producing value : +11
[C1] Consuming : 11
[C3] Consuming : 12
[P-2] Producing value : +12
[P-2] Producing value : +13
[C2] Consuming : 13
[P-1] Producing value : +11
[C3] Consuming : 11
[P-1] Producing value : +12
[C3] Consuming : 12
[P-2] Producing value : +14
[C1] Consuming : 14
[P-1] Producing value : +13
[C2] Consuming : 13
[P-2] Producing value : +15
[C3] Consuming : 15
[P-2] Producing value : +16
[C1] Consuming : 16
[P-1] Producing value : +14
[C3] Consuming : 14
[P-2] Producing value : +17
[C2] Consuming : 17

參考資料