執行緒安全的生產者消費者四種實現方法
問題描述
在IT技術面試過程中,我們經常會遇到生產者消費者問題(Producer-consumer problem), 這是多執行緒併發協作問題的經典案例。場景中包含三個物件,生產者(Producer),消費者(Consumer)以及一個固定大小的緩衝區(Buffer)。生產者的主要作用是不斷生成資料放到緩衝區,消費者則從緩衝區不斷消耗資料。該問題的關鍵是如何執行緒安全的操作共享資料塊,保證生產者執行緒和消費者執行緒可以正確的更新資料塊,主要考慮 1. 生產者不會在緩衝區滿時加入資料. 2. 消費者應當停止在緩衝區時消耗資料. 3. 在同一時間應當只允許一個生產者或者消費者訪問共享緩衝區(這一點是對於互斥操作訪問共享區塊的要求)。
解決方案
解決問題以上問題通常有訊號量,wait & notify, 管道或者阻塞佇列等幾種思路。本文以Java語言為例一一進行舉例講解。
訊號量
訊號量(Semaphore)也稱訊號燈,是用來控制資源被同時訪問的個數,比如控制訪問資料庫最大連線數的數量,執行緒通過acquire()獲得連線許可,完成資料操作後,通過release()釋放許可。對於生產者消費者問題來說,為了滿足執行緒安全操作的要求,同一時間我們只允許一個執行緒訪問共享資料區,因此需要一個大小為1的訊號量mutex來控制互斥操作。注意到我們還定義了notFull 和 notEmpty 訊號量,notFull用於標識當前可用區塊的空間大小,當notFull size 大於0時表明"not full", producer 可以繼續生產,等於0時表示空間已滿,無法繼續生產;同樣,對於notEmpty訊號量來說,大於0時表明 "not empty", consumer可以繼續消耗,等於0 時表明沒有產品,無法繼續消耗。notFull初始size 為5 (5個available空間可供生產),notEmpty初始為0(沒有產品可供消耗)。
/*** 資料倉儲class,所有的producer和consumer共享這個class物件 **/ static class DataWareHouse { //共享資料區 private final Queue<String> data = new LinkedList(); //非滿鎖 private final Semaphore notFull; //非空鎖 private final Semaphore notEmpty; //互斥鎖 private final Semaphore mutex; public DataWareHouse(int capacity) { this.notFull = new Semaphore(capacity); this.notEmpty = new Semaphore(0); mutex = new Semaphore(1); } public void offer(String x) throws InterruptedException { notFull.acquire(); //producer獲取訊號,notFull訊號量減一 mutex.acquire(); //當前程序獲得訊號,mutex訊號量減1,其他執行緒被阻塞操作共享區塊data data.add(x); mutex.release(); //mutex訊號量+1, 其他執行緒可以繼續訊號操作共享區塊data notEmpty.release(); //成功生產資料,notEmpty訊號量加1 } public String poll() throws InterruptedException { notEmpty.acquire(); //notEmpty訊號減一 mutex.acquire(); String result = data.poll(); mutex.release(); notFull.release(); //成功消耗資料, notFull訊號量加1 return result; } } /**Producer執行緒**/ static class Producer implements Runnable { private final DataWareHouse dataWareHouse; public Producer(final DataWareHouse dataWareHouse) { this.dataWareHouse = dataWareHouse; } @Override public void run() { while (true) { try { Thread.sleep(100); //生產的速度慢於消耗的速率 String s = UUID.randomUUID().toString(); System.out.println("put data " + s); dataWareHouse.offer(s); } catch (InterruptedException e) { e.printStackTrace(); } } } } /**Consumer執行緒**/ static class Consumer implements Runnable { private final DataWareHouse dataWareHouse; public Consumer(final DataWareHouse dataWareHouse) { this.dataWareHouse = dataWareHouse; } @Override public void run() { while (true) { while (true) { try { System.out.println("get data " + dataWareHouse.poll()); } catch (InterruptedException e) { e.printStackTrace(); } } } } } //測試程式碼 public static void main(String[] args) { final DataWareHouse dataWareHouse = new DataWareHouse(5); //三個producer 持續生產 for (int i = 0; i < 3; i++) { Thread t = new Thread(new Producer(dataWareHouse)); t.start(); } //三個consumer 持續消耗 for (int i = 0; i < 3; i++) { Thread t = new Thread(new Consumer(dataWareHouse)); t.start(); } }
Wait 和 Notify 機制
Java Object物件類中包含三個final methods來允許執行緒之間進行通訊,告知資源的狀態。它們分別是wait(), notify(), 和notifyAll()。
wait(): 顧名思義告訴當前執行緒釋放鎖,陷入休眠狀態(waiting狀態),等待資源。wait 方法本身是一個native method,它在Java中的使用語法如下所示:
synchronized(lockObject )
{
while( ! condition )
{
lockObject.wait();
}
//take the action here;
}
notify(): 用於喚醒waiting狀態的執行緒, 同時釋放鎖,被喚醒的執行緒可以重新獲得鎖訪問資源。它的基本語法 如下
synchronized(lockObject)
{
//establish_the_condition;
lockObject.notify();
//any additional code if needed
}
notifyAll(): 不同於notify(),它用於喚醒所有處於waiting狀態的執行緒。語法如下:
synchronized(lockObject)
{
establish_the_condition;
lockObject.notifyAll();
}
說完了這三個方法,來看下如何使用wait & notify(All) 來解決我們的問題。新的DataWareHouse 類如下所示:
//producer類和consumer共享物件
static class DataWareHouse {
//共享資料區
private final Queue<String> data = new LinkedList();
private int capacity;
private int size = 0;
public DataWareHouse(int capacity) {
this.capacity = capacity;
}
public synchronized void offer(String x) throws InterruptedException {
while (size == capacity) { //當buffer滿時,producer進入waiting 狀態
this.wait(); //使用this物件來加鎖
}
data.add(x);
size++;
notifyAll(); //當buffer 有資料時,喚醒所有等待的consumer執行緒
}
public synchronized String poll() throws InterruptedException {
while (size == 0) {//當buffer為空時,consumer 進入等待狀態
this.wait();
}
String result = data.poll();
size--;
notifyAll(); //當資料被消耗,空間被釋放,通知所有等待的producer。
return result;
}
}
Note: 在方法上使用synchronized 等價於在方法體內使用synchronized(this),兩者都是使用this物件作為鎖。
生產者和消費者類,以及測試程式碼和 訊號量 section 相同,不做重複列舉了。
管道
管道Pipe是實現程序或者執行緒(執行緒之間通常通過共享記憶體實現通訊,而程序則通過scoket,管道,訊息佇列等技術)之間通訊常用方式,它連線輸入流和輸出流,基於生產者- 消費者模式構建的一種技術。具體實現可以通過建立一個管道輸入流物件和管道輸出流物件,然後將輸入流和輸出流就行連結,生產者通過往管道中寫入資料,而消費者在管道資料流中讀取資料,通過這種方式就實現了執行緒之間的互相通訊。
具體實現程式碼如下所示
public class PipeSolution {
static class DataWareHouse implements Closeable {
private final PipedInputStream pis;
private final PipedOutputStream pos;
public DataWareHouse() throws IOException {
pis = new PipedInputStream();
pos = new PipedOutputStream();
pis.connect(pos); //連線管道
}
//向管道中寫入資料
public void offer(int val) throws IOException {
pos.write(val);
pos.flush();
}
//從管道中取資料.
public int poll() throws IOException {
//當管道中沒有資料,方法阻塞
return pis.read();
}
//關閉管道
@Override
public void close() throws IOException {
if (pis != null) {
pis.close();
}
if (pos != null) {
pos.close();
}
}
}
//consumer類
static class Consumer implements Runnable {
private final DataWareHouse dataWareHouse;
Consumer(DataWareHouse dataWareHouse) {
this.dataWareHouse = dataWareHouse;
}
@Override
public void run() {
try {
//消費者不斷從管道中讀取資料
while (true) {
int num = dataWareHouse.poll();
System.out.println("get data +" + num);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
static class Producer implements Runnable {
private final DataWareHouse dataWareHouse;
private final Random random = new Random();
Producer(DataWareHouse dataWareHouse) {
this.dataWareHouse = dataWareHouse;
}
@Override
public void run() {
try {
//生產者不斷向管道中寫入資料
while (true) {
int num = random.nextInt(256);
dataWareHouse.offer(num);
System.out.println("put data +" + num);
Thread.sleep(1000);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) throws IOException {
DataWareHouse dataWareHouse = new DataWareHouse();
new Thread(new Producer(dataWareHouse)).start();
new Thread(new Consumer(dataWareHouse)).start();
}
}
阻塞佇列
阻塞佇列(BlockingQueue),具有1. 當佇列滿了的時候阻塞入佇列操作 2. 當佇列空了的時候阻塞出佇列操作 3. 執行緒安全 的特性,因而阻塞佇列通常被視為實現生產消費者模式最便捷的工具,其中DataWareHouse類實現程式碼如下:
static class DataWareHouse {
//共享資料區
private final BlockingQueue<String> blockingQueue;
public DataWareHouse(int capacity) {
this.blockingQueue = new ArrayBlockingQueue<>(capacity);
}
public void offer(String x) {
blockingQueue.offer(x);
}
public String poll() {
return blockingQueue.poll();
}
}
生產者和消費者類,以及測試程式碼和 訊號量 section 相同,在此不做重複列舉了。
總結
生產者消費者問題是面試中經常會遇到的題目,本文總結了幾種常見的實現方式,面試過程中通常不必要向面試官描述過多實現細節,說出每種實現方式的特點即可。希望能給大家帶來幫助。
Reference
- https://howtodoinjava.com/java/multi-threading/wait-notify-and-notifyall-methods/