「每日一面」生產者消費者問題(今日頭條Android崗位面試題)
以前去面試今日頭條Android崗位,沒有問太多安卓技術問題,印象比較深的就是讓手寫一下生產者消費者的問題。當時只想到多執行緒,等待阻塞,但是準備不足,沒有寫出來。今天再覆盤理解分享一下。
1. 生產者消費者問題?
生產者消費者問題(英語:Producer-consumer problem),也稱有限緩衝問題或者快取繫結問題(英語:Bounded-buffer problem),是一個多執行緒同步問題的經典案例。該問題描述了兩個共享固定大小緩衝區的執行緒——即所謂的“生產者”和“消費者”——在實際執行時會發生的問題。生產者的主要作用是生成一定量的資料放到緩衝區中,然後重複此過程。與此同時,消費者也在緩衝區消耗這些資料。該問題的關鍵就是要保證生產者不會在緩衝區滿時加入資料,消費者也不會在緩衝區中空時消耗資料。主要看多生產者多消費者。
2. 問題分析
1) 關係分析。生產者和消費者對緩衝區互斥訪問是互斥關係,同時生產者和消費者又是一個相互協作的關係,只有生產者生產之後,消費者才能消費,他們也是同步關係。
2) 整理思路。這裡比較簡單,只有生產者和消費者兩個程序,正好是這兩個程序存在著互斥關係和同步關係。那麼需要解決的是互斥和同步PV操作的位置。
3. 程式碼實現
在Java中有四種方法支援同步,其中前三個是同步方法,一個是管道方法。
wait() / notify()方法await() / signal()方法
BlockingQueue阻塞佇列方法
PipedInputStream / PipedOutputStream
緩衝區Storage.java程式碼如下:
3.1 wait() / notify()方法
wait() / nofity()方法是基類Object的兩個方法:
wait()方法:當緩衝區已滿/空時,生產者/消費者執行緒停止自己的執行,放棄鎖,使自己處於等等狀態,讓其他執行緒執行。
notify()方法:當生產者/消費者向緩衝區放入/取出一個產品時,向其他等待的執行緒發出可執行的通知,同時放棄鎖,使自己處於等待狀態。
import java.util.LinkedList; public class Storage { // 倉庫最大儲存量 private final int MAX_SIZE = 100; // 倉庫儲存的載體 private LinkedList<Object> list = new LinkedList<Object>(); // 生產產品 public void produce(String producer) { synchronized (list) { // 如果倉庫已滿 while (list.size() == MAX_SIZE) { System.out.println("倉庫已滿,【"+producer+"】: 暫時不能執行生產任務!"); try { // 由於條件不滿足,生產阻塞 list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } // 生產產品 list.add(new Object()); System.out.println("【"+producer+"】:生產了一個產品\t【現倉儲量為】:" + list.size()); list.notifyAll(); } } // 消費產品 public void consume(String consumer) { synchronized (list) { //如果倉庫儲存量不足 while (list.size()==0) { System.out.println("倉庫已空,【"+consumer+"】: 暫時不能執行消費任務!"); try { // 由於條件不滿足,消費阻塞 list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } list.remove(); System.out.println("【"+consumer+"】:消費了一個產品\t【現倉儲量為】:" + list.size()); list.notifyAll(); } } public LinkedList<Object> getList() { return list; } public void setList(LinkedList<Object> list) { this.list = list; } public int getMAX_SIZE() { return MAX_SIZE; } }
生產者Producer.java程式碼如下
public class Producer extends Thread
{
private String producer;
private Storage storage;
public Producer(Storage storage)
{
this.storage = storage;
}
@Override
public void run()
{
produce(producer);
}
public void produce(String producer)
{
storage.produce(producer);
}
public String getProducer()
{
return producer;
}
public void setProducer(String producer)
{
this.producer = producer;
}
public Storage getStorage()
{
return storage;
}
public void setStorage(Storage storage)
{
this.storage = storage;
}
}
消費者Cosumer.java程式碼如下:
public class Consumer extends Thread
{
private String consumer;
private Storage storage;
public Consumer(Storage storage)
{
this.storage = storage;
}
@Override
public void run()
{
consume(consumer);
}
public void consume(String consumer)
{
storage.consume(consumer);
}
public Storage getStorage()
{
return storage;
}
public void setStorage(Storage storage)
{
this.storage = storage;
}
public String getConsumer() {
return consumer;
}
public void setConsumer(String consumer) {
this.consumer = consumer;
}
}
執行結果如下
倉庫已空,【消費者1】: 暫時不能執行消費任務!
【生產者3】:生產了一個產品 【現倉儲量為】:1
【消費者2】:消費了一個產品 【現倉儲量為】:0
倉庫已空,【消費者3】: 暫時不能執行消費任務!
【生產者1】:生產了一個產品 【現倉儲量為】:1
【生產者4】:生產了一個產品 【現倉儲量為】:2
【生產者2】:生產了一個產品 【現倉儲量為】:3
【生產者5】:生產了一個產品 【現倉儲量為】:4
【消費者1】:消費了一個產品 【現倉儲量為】:3
【消費者3】:消費了一個產品 【現倉儲量為】:2
3.2 await() / signal()方法
await()和signal()的功能基本上和wait() / nofity()相同,完全可以取代它們,但是它們和新引入的鎖定機制Lock直接掛鉤,具有更大的靈活性。通過在Lock物件上呼叫newCondition()方法,將條件變數和一個鎖物件進行繫結,進而控制併發程式訪問競爭資源的安全。
緩衝區Storage.java程式碼如下:
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Storage {
// 倉庫最大儲存量
private final int MAX_SIZE = 100;
// 倉庫儲存的載體
private LinkedList<Object> list = new LinkedList<Object>();
// 鎖
private final Lock lock = new ReentrantLock();
// 倉庫滿的條件變數
private final Condition full = lock.newCondition();
// 倉庫空的條件變數
private final Condition empty = lock.newCondition();
// 生產產品
public void produce(String producer) {
lock.lock();
// 如果倉庫已滿
while (list.size() == MAX_SIZE) {
System.out.println("倉庫已滿,【" + producer + "】: 暫時不能執行生產任務!");
try {
// 由於條件不滿足,生產阻塞
full.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 生產產品
list.add(new Object());
System.out.println("【" + producer + "】:生產了一個產品\t【現倉儲量為】:" + list.size());
empty.signalAll();
// 釋放鎖
lock.unlock();
}
// 消費產品
public void consume(String consumer) {
// 獲得鎖
lock.lock();
// 如果倉庫儲存量不足
while (list.size() == 0) {
System.out.println("倉庫已空,【" + consumer + "】: 暫時不能執行消費任務!");
try {
// 由於條件不滿足,消費阻塞
empty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.remove();
System.out.println("【" + consumer + "】:消費了一個產品\t【現倉儲量為】:" + list.size());
full.signalAll();
// 釋放鎖
lock.unlock();
}
public LinkedList<Object> getList() {
return list;
}
public void setList(LinkedList<Object> list) {
this.list = list;
}
public int getMAX_SIZE() {
return MAX_SIZE;
}
}
3.3 BlockingQueue它是一個已經在內部實現了同步的佇列,實現方式採用的是我們第2種await() / signal()方法。它可以在生成物件時指定容量大小。它用於阻塞操作的是put()和take()方法:
put()方法:類似於我們上面的生產者執行緒,容量達到最大時,自動阻塞。
take()方法:類似於我們上面的消費者執行緒,容量為0時,自動阻塞。
import java.util.concurrent.LinkedBlockingQueue;
public class Storage {
// 倉庫最大儲存量
private final int MAX_SIZE = 100;
// 倉庫儲存的載體
private LinkedBlockingQueue<Object> list = new LinkedBlockingQueue<Object>(100);
// 生產產品
public void produce(String producer) {
// 如果倉庫已滿
if (list.size() == MAX_SIZE) {
System.out.println("倉庫已滿,【" + producer + "】: 暫時不能執行生產任務!");
}
// 生產產品
try {
list.put(new Object());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("【" + producer + "】:生產了一個產品\t【現倉儲量為】:" + list.size());
}
// 消費產品
public void consume(String consumer) {
// 如果倉庫儲存量不足
if (list.size() == 0) {
System.out.println("倉庫已空,【" + consumer + "】: 暫時不能執行消費任務!");
}
try {
list.take();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("【" + consumer + "】:消費了一個產品\t【現倉儲量為】:" + list.size());
}
public LinkedBlockingQueue<Object> getList() {
return list;
}
public void setList(LinkedBlockingQueue<Object> list) {
this.list = list;
}
public int getMAX_SIZE() {
return MAX_SIZE;
}
}