【Java】生產者消費者模式的實現
前言
生產者消費者問題是執行緒模型中的經典問題:生產者和消費者在同一時間段內共用同一儲存空間,生產者向空間裡生產資料,而消費者取走資料。
阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。這個阻塞佇列就是用來給生產者和消費者解耦的。
wait/notify方法
首先,我們搞清楚Thread.sleep()方法和Object.wait()、Object.notify()方法的區別。根據這篇文章java sleep和wait的區別的疑惑?
sleep()
是Thread類的方法;而wait()
,notify()
,notifyAll()
是Object類中定義的方法;儘管這兩個方法都會影響執行緒的執行行為,但是本質上是有區別的。Thread.sleep()
不會導致鎖行為的改變,如果當前執行緒是擁有鎖的,那麼Thread.sleep()
不會讓執行緒釋放鎖。如果能夠幫助你記憶的話,可以簡單認為和鎖相關的方法都定義在Object類中,因此呼叫Thread.sleep()
是不會影響鎖的相關行為。Thread.sleep
和Object.wait
都會暫停當前的執行緒,對於CPU資源來說,不管是哪種方式暫停的執行緒,都表示它暫時不再需要CPU的執行時間。OS會將執行時間分配給其它執行緒。區別是呼叫wait後,需要別的執行緒執行notify/notifyAll才能夠重新獲得CPU執行時間。
執行緒狀態圖:
Thread.sleep()
Object.wait()
讓執行緒從 【running】 -> 【等待佇列】notify -> 【鎖池】 -> 【runnable】
實現生產者消費者模型
生產者消費者問題是研究多執行緒程式時繞不開的經典問題之一,它描述是有一塊緩衝區作為倉庫,生產者可以將產品放入倉庫,消費者則可以從倉庫中取走產品。在Java中一共有四種方法支援同步,其中前三個是同步方法,一個是管道方法。
(1)Object的wait() / notify()方法
(2)Lock和Condition
(3)BlockingQueue阻塞佇列方法
(4)PipedInputStream / PipedOutputStream
本文只介紹最常用的前三種,第四種暫不做討論。原始碼在這裡:Java實現生產者消費者模型
1. 使用Object的wait() / notify()方法
wait()
/ nofity()
方法是基類Object的兩個方法,也就意味著所有Java類都會擁有這兩個方法,這樣,我們就可以為任何物件實現同步機制。
wait()
:當緩衝區已滿/空時,生產者/消費者執行緒停止自己的執行,放棄鎖,使自己處於等待狀態,讓其他執行緒執行。notify()
:當生產者/消費者向緩衝區放入/取出一個產品時,向其他等待的執行緒發出可執行的通知,同時放棄鎖,使自己處於等待狀態。
/**
* 生產者消費者模式:使用Object.wait() / notify()方法實現
*/
public class ProducerConsumer {
private static final int CAPACITY = 5;
public static void main(String args[]){
Queue<Integer> queue = new LinkedList<Integer>();
Thread producer1 = new Producer("P-1", queue, CAPACITY);
Thread producer2 = new Producer("P-2", queue, CAPACITY);
Thread consumer1 = new Consumer("C1", queue, CAPACITY);
Thread consumer2 = new Consumer("C2", queue, CAPACITY);
Thread consumer3 = new Consumer("C3", queue, CAPACITY);
producer1.start();
producer2.start();
consumer1.start();
consumer2.start();
consumer3.start();
}
/**
* 生產者
*/
public static class Producer extends Thread{
private Queue<Integer> queue;
String name;
int maxSize;
int i = 0;
public Producer(String name, Queue<Integer> queue, int maxSize){
super(name);
this.name = name;
this.queue = queue;
this.maxSize = maxSize;
}
@Override
public void run(){
while(true){
synchronized(queue){
while(queue.size() == maxSize){
try {
System.out .println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue.");
queue.wait();
} catch (Exception ex) {
ex.printStackTrace();
}
}
System.out.println("[" + name + "] Producing value : +" + i);
queue.offer(i++);
queue.notifyAll();
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
/**
* 消費者
*/
public static class Consumer extends Thread{
private Queue<Integer> queue;
String name;
int maxSize;
public Consumer(String name, Queue<Integer> queue, int maxSize){
super(name);
this.name = name;
this.queue = queue;
this.maxSize = maxSize;
}
@Override
public void run(){
while(true){
synchronized(queue){
while(queue.isEmpty()){
try {
System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer");
queue.wait();
} catch (Exception ex) {
ex.printStackTrace();
}
}
int x = queue.poll();
System.out.println("[" + name + "] Consuming value : " + x);
queue.notifyAll();
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
注意要點
判斷Queue大小為0或者大於等於queueSize時須使用 while (condition) {}
,不能使用 if(condition) {}
。其中 while(condition)
迴圈,它又被叫做“自旋鎖”。自旋鎖以及wait()
和notify()
方法在執行緒通訊這篇文章中有更加詳細的介紹。為防止該執行緒沒有收到notify()
呼叫也從wait()
中返回(也稱作虛假喚醒),這個執行緒會重新去檢查condition條件以決定當前是否可以安全地繼續執行還是需要重新保持等待,而不是認為執行緒被喚醒了就可以安全地繼續執行了。
輸出日誌如下:
[P-1] Producing value : +0
[P-1] Producing value : +1
[P-1] Producing value : +2
[P-1] Producing value : +3
[P-1] Producing value : +4
Queue is full, Producer[P-1] thread waiting for consumer to take something from queue.
[C3] Consuming value : 0
[C3] Consuming value : 1
[C3] Consuming value : 2
[C3] Consuming value : 3
[C3] Consuming value : 4
Queue is empty, Consumer[C3] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C1] thread is waiting for Producer
[P-2] Producing value : +0
[C1] Consuming value : 0
Queue is empty, Consumer[C1] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C3] thread is waiting for Producer
[P-1] Producing value : +5
[P-1] Producing value : +6
[P-1] Producing value : +7
[P-1] Producing value : +8
[P-1] Producing value : +9
Queue is full, Producer[P-1] thread waiting for consumer to take something from queue.
[C3] Consuming value : 5
[C3] Consuming value : 6
[C3] Consuming value : 7
[C3] Consuming value : 8
[C3] Consuming value : 9
Queue is empty, Consumer[C3] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C1] thread is waiting for Producer
[P-2] Producing value : +1
[C1] Consuming value : 1
Queue is empty, Consumer[C1] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C3] thread is waiting for Producer
[P-1] Producing value : +10
[P-1] Producing value : +11
[P-1] Producing value : +12
[P-1] Producing value : +13
[P-1] Producing value : +14
Queue is full, Producer[P-1] thread waiting for consumer to take something from queue.
[C3] Consuming value : 10
[C3] Consuming value : 11
[C3] Consuming value : 12
[C3] Consuming value : 13
[C3] Consuming value : 14
Queue is empty, Consumer[C3] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C1] thread is waiting for Producer
[P-2] Producing value : +2
[P-2] Producing value : +3
[P-2] Producing value : +4
[P-2] Producing value : +5
[P-2] Producing value : +6
Queue is full, Producer[P-2] thread waiting for consumer to take something from queue.
[C1] Consuming value : 2
[C1] Consuming value : 3
[C1] Consuming value : 4
[C1] Consuming value : 5
[C1] Consuming value : 6
Queue is empty, Consumer[C1] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C3] thread is waiting for Producer
[P-1] Producing value : +15
[C3] Consuming value : 15
Queue is empty, Consumer[C3] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C1] thread is waiting for Producer
[P-2] Producing value : +7
[P-2] Producing value : +8
[P-2] Producing value : +9
2. 使用Lock和Condition的await() / signal()方法
在JDK5.0之後,Java提供了更加健壯的執行緒處理機制,包括同步、鎖定、執行緒池等,它們可以實現更細粒度的執行緒控制。Condition介面的await()
和signal()
就是其中用來做同步的兩種方法,它們的功能基本上和Object的wait()
/ nofity()
相同,完全可以取代它們,但是它們和新引入的鎖定機制Lock
直接掛鉤,具有更大的靈活性。通過在Lock
物件上呼叫newCondition()
方法,將條件變數和一個鎖物件進行繫結,進而控制併發程式訪問競爭資源的安全。下面來看程式碼:
/**
* 生產者消費者模式:使用Lock和Condition實現
* {@link java.util.concurrent.locks.Lock}
* {@link java.util.concurrent.locks.Condition}
*/
public class ProducerConsumerByLock {
private static final int CAPACITY = 5;
private static final Lock lock = new ReentrantLock();
private static final Condition fullCondition = lock.newCondition(); //佇列滿的條件
private static final Condition emptyCondition = lock.newCondition(); //佇列空的條件
public static void main(String args[]){
Queue<Integer> queue = new LinkedList<Integer>();
Thread producer1 = new Producer("P-1", queue, CAPACITY);
Thread producer2 = new Producer("P-2", queue, CAPACITY);
Thread consumer1 = new Consumer("C1", queue, CAPACITY);
Thread consumer2 = new Consumer("C2", queue, CAPACITY);
Thread consumer3 = new Consumer("C3", queue, CAPACITY);
producer1.start();
producer2.start();
consumer1.start();
consumer2.start();
consumer3.start();
}
/**
* 生產者
*/
public static class Producer extends Thread{
private Queue<Integer> queue;
String name;
int maxSize;
int i = 0;
public Producer(String name, Queue<Integer> queue, int maxSize){
super(name);
this.name = name;
this.queue = queue;
this.maxSize = maxSize;
}
@Override
public void run(){
while(true){
//獲得鎖
lock.lock();
while(queue.size() == maxSize){
try {
System.out .println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue.");
//條件不滿足,生產阻塞
fullCondition.await();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
System.out.println("[" + name + "] Producing value : +" + i);
queue.offer(i++);
//喚醒其他所有生產者、消費者
fullCondition.signalAll();
emptyCondition.signalAll();
//釋放鎖
lock.unlock();
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 消費者
*/
public static class Consumer extends Thread{
private Queue<Integer> queue;
String name;
int maxSize;
public Consumer(String name, Queue<Integer> queue, int maxSize){
super(name);
this.name = name;
this.queue = queue;
this.maxSize = maxSize;
}
@Override
public void run(){
while(true){
//獲得鎖
lock.lock();
while(queue.isEmpty()){
try {
System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer");
//條件不滿足,消費阻塞
emptyCondition.await();
} catch (Exception ex) {
ex.printStackTrace();
}
}
int x = queue.poll();
System.out.println("[" + name + "] Consuming value : " + x);
//喚醒其他所有生產者、消費者
fullCondition.signalAll();
emptyCondition.signalAll();
//釋放鎖
lock.unlock();
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
輸入日誌如下:
[P-1] Producing value : +0
[C1] Consuming value : 0
Queue is empty, Consumer[C3] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
[P-2] Producing value : +0
[C3] Consuming value : 0
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C1] thread is waiting for Producer
[P-2] Producing value : +1
[C2] Consuming value : 1
Queue is empty, Consumer[C1] thread is waiting for Producer
Queue is empty, Consumer[C3] thread is waiting for Producer
[P-1] Producing value : +1
[C1] Consuming value : 1
Queue is empty, Consumer[C3] thread is waiting for Producer
[P-1] Producing value : +2
[C3] Consuming value : 2
Queue is empty, Consumer[C2] thread is waiting for Producer
[P-2] Producing value : +2
[C2] Consuming value : 2
Queue is empty, Consumer[C1] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
[P-1] Producing value : +3
[C1] Consuming value : 3
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C1] thread is waiting for Producer
Queue is empty, Consumer[C3] thread is waiting for Producer
[P-2] Producing value : +3
[C2] Consuming value : 3
Queue is empty, Consumer[C1] thread is waiting for Producer
Queue is empty, Consumer[C3] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
[P-1] Producing value : +4
[C1] Consuming value : 4
Queue is empty, Consumer[C3] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C1] thread is waiting for Producer
[P-2] Producing value : +4
[C3] Consuming value : 4
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C1] thread is waiting for Producer
[P-2] Producing value : +5
[C2] Consuming value : 5
Queue is empty, Consumer[C1] thread is waiting for Producer
Queue is empty, Consumer[C2] thread is waiting for Producer
[P-1] Producing value : +5
[C1] Consuming value : 5
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C3] thread is waiting for Producer
[P-2] Producing value : +6
[C2] Consuming value : 6
Queue is empty, Consumer[C3] thread is waiting for Producer
[P-1] Producing value : +6
[C3] Consuming value : 6
Queue is empty, Consumer[C3] thread is waiting for Producer
Queue is empty, Consumer[C1] thread is waiting for Producer
[P-2] Producing value : +7
[C3] Consuming value : 7
Queue is empty, Consumer[C1] thread is waiting for Producer
[P-1] Producing value : +7
[C1] Consuming value : 7
Queue is empty, Consumer[C2] thread is waiting for Producer
[P-2] Producing value : +8
[C2] Consuming value : 8
[P-1] Producing value : +8
[C1] Consuming value : 8
[P-2] Producing value : +9
[C3] Consuming value : 9
[P-2] Producing value : +10
[C2] Consuming value : 10
[P-1] Producing value : +9
[P-1] Producing value : +10
[C1] Consuming value : 9
[P-2] Producing value : +11
[C3] Consuming value : 10
[C2] Consuming value : 11
[P-2] Producing value : +12
[C1] Consuming value : 12
[P-1] Producing value : +11
[C3] Consuming value : 11
[P-2] Producing value : +13
[C2] Consuming value : 13
Queue is empty, Consumer[C2] thread is waiting for Producer
Queue is empty, Consumer[C3] thread is waiting for Producer
[P-1] Producing value : +12
[C2] Consuming value : 12
Queue is empty, Consumer[C3] thread is waiting for Producer
[P-1] Producing value : +13
[C3] Consuming value : 13
Queue is empty, Consumer[C1] thread is waiting for Producer
Queue is empty, Consumer[C3] thread is waiting for Producer
[P-2] Producing value : +14
[C1] Consuming value : 14
Queue is empty, Consumer[C3] thread is waiting for Producer
Queue is empty, Consumer[C1] thread is waiting for Producer
[P-1] Producing value : +14
[C3] Consuming value : 14
Queue is empty, Consumer[C1] thread is waiting for Producer
[P-1] Producing value : +15
[C1] Consuming value : 15
[P-2] Producing value : +15
[P-1] Producing value : +16
[C3] Consuming value : 15
[P-2] Producing value : +16
3. 使用BlockingQueue阻塞佇列方法
JDK 1.5 以後新增的 java.util.concurrent
包新增了 BlockingQueue
介面。並提供瞭如下幾種阻塞佇列實現:
- java.util.concurrent.ArrayBlockingQueue
- java.util.concurrent.LinkedBlockingQueue
- java.util.concurrent.SynchronousQueue
- java.util.concurrent.PriorityBlockingQueue
實現生產者-消費者模型使用 ArrayBlockingQueue
或者 LinkedBlockingQueue
即可。
我們這裡使用LinkedBlockingQueue
,它是一個已經在內部實現了同步的佇列,實現方式採用的是我們第2種await()
/ signal()
方法。它可以在生成物件時指定容量大小。它用於阻塞操作的是put()和take()方法。
put()
方法:類似於我們上面的生產者執行緒,容量達到最大時,自動阻塞。take()
方法:類似於我們上面的消費者執行緒,容量為0時,自動阻塞。
我們可以跟進原始碼看一下LinkedBlockingQueue
類的put()
方法實現:
/** Main lock guarding all access */
final ReentrantLock lock = new ReentrantLock();
/** Condition for waiting takes */
private final Condition notEmpty = lock.newCondition();
/** Condition for waiting puts */
private final Condition notFull = lock.newCondition();
public void put(E e) throws InterruptedException {
putLast(e);
}
public void putLast(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkLast(node))
notFull.await();
} finally {
lock.unlock();
}
}
看到這裡證實了它的實現方式採用的是我們第2種await()
/ signal()
方法。下面我們就使用它實現吧。
/**
* 生產者消費者模式:使用{@link java.util.concurrent.BlockingQueue}實現
*/
public class ProducerConsumerByBQ{
private static final int CAPACITY = 5;
public static void main(String args[]){
LinkedBlockingDeque<Integer> blockingQueue = new LinkedBlockingDeque<Integer>(CAPACITY);
Thread producer1 = new Producer("P-1", blockingQueue, CAPACITY);
Thread producer2 = new Producer("P-2", blockingQueue, CAPACITY);
Thread consumer1 = new Consumer("C1", blockingQueue, CAPACITY);
Thread consumer2 = new Consumer("C2", blockingQueue, CAPACITY);
Thread consumer3 = new Consumer("C3", blockingQueue, CAPACITY);
producer1.start();
producer2.start();
consumer1.start();
consumer2.start();
consumer3.start();
}
/**
* 生產者
*/
public static class Producer extends Thread{
private LinkedBlockingDeque<Integer> blockingQueue;
String name;
int maxSize;
int i = 0;
public Producer(String name, LinkedBlockingDeque<Integer> queue, int maxSize){
super(name);
this.name = name;
this.blockingQueue = queue;
this.maxSize = maxSize;
}
@Override
public void run(){
while(true){
try {
blockingQueue.put(i);
System.out.println("[" + name + "] Producing value : +" + i);
i++;
//暫停最多1秒
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 消費者
*/
public static class Consumer extends Thread{
private LinkedBlockingDeque<Integer> blockingQueue;
String name;
int maxSize;
public Consumer(String name, LinkedBlockingDeque<Integer> queue, int maxSize){
super(name);
this.name = name;
this.blockingQueue = queue;
this.maxSize = maxSize;
}
@Override
public void run(){
while(true){
try {
int x = blockingQueue.take();
System.out.println("[" + name + "] Consuming : " + x);
//暫停最多1秒
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
輸出日誌如下:
[P-2] Producing value : +0
[P-1] Producing value : +0
[C1] Consuming : 0
[C3] Consuming : 0
[P-2] Producing value : +1
[C2] Consuming : 1
[P-2] Producing value : +2
[C1] Consuming : 2
[P-1] Producing value : +1
[C2] Consuming : 1
[P-1] Producing value : +2
[C3] Consuming : 2
[P-1] Producing value : +3
[C2] Consuming : 3
[P-2] Producing value : +3
[C1] Consuming : 3
[P-1] Producing value : +4
[C2] Consuming : 4
[P-2] Producing value : +4
[C3] Consuming : 4
[P-2] Producing value : +5
[C1] Consuming : 5
[P-1] Producing value : +5
[C2] Consuming : 5
[P-1] Producing value : +6
[C1] Consuming : 6
[P-2] Producing value : +6
[C2] Consuming : 6
[P-2] Producing value : +7
[C2] Consuming : 7
[P-1] Producing value : +7
[C1] Consuming : 7
[P-2] Producing value : +8
[C3] Consuming : 8
[P-2] Producing value : +9
[C2] Consuming : 9
[P-1] Producing value : +8
[C2] Consuming : 8
[P-2] Producing value : +10
[C1] Consuming : 10
[P-1] Producing value : +9
[C3] Consuming : 9
[P-1] Producing value : +10
[C2] Consuming : 10
[P-2] Producing value : +11
[C1] Consuming : 11
[C3] Consuming : 12
[P-2] Producing value : +12
[P-2] Producing value : +13
[C2] Consuming : 13
[P-1] Producing value : +11
[C3] Consuming : 11
[P-1] Producing value : +12
[C3] Consuming : 12
[P-2] Producing value : +14
[C1] Consuming : 14
[P-1] Producing value : +13
[C2] Consuming : 13
[P-2] Producing value : +15
[C3] Consuming : 15
[P-2] Producing value : +16
[C1] Consuming : 16
[P-1] Producing value : +14
[C3] Consuming : 14
[P-2] Producing value : +17
[C2] Consuming : 17