1. 程式人生 > >java多執行緒:執行緒間通訊——生產者消費者模型

java多執行緒:執行緒間通訊——生產者消費者模型


# 一、背景 && 定義
多執行緒環境下,只要有併發問題,就要保證資料的安全性,一般指的是通過 synchronized 來進行同步。 另一個問題是,**多個執行緒之間如何協作呢**? 我們看一個倉庫出貨問題(更具體一些,快餐店直接放好炸貨的架子,不過每次只放一份) 1. 假設倉庫中只能存放一件商品,生產者將生產出來的產品放入倉庫,消費者將倉庫中產品取走進行消費; 2. 如果倉庫中沒有商品,那麼生產者將產品放入倉庫,否則停止生產並等待,直到倉庫中的產品被消費者取走為止; 3. 如果倉庫中放有產品,消費者可快速取走並消費,否則停止消費並等待,直到倉庫中再次放入產品為止。 這其實就是一個執行緒同步問題。**生產者和消費者共享同一個資源,並且生產者和消費者之間互相依賴,互為條件。** >
如果一個快餐店: 先點單,餐出來之後再收錢。這種模式叫BIO-阻塞IO模式。 如果一個快餐店: 先收錢,收完錢消費者在旁邊等。這種就是生產者-消費者模式。 這類問題裡,同步的候只有 synchronized 是不夠的,因為他雖然能解決資源的共享問題,實現資源的同步更新,但是無法**在不同執行緒之間進行訊息傳遞**(通訊)。 所以只有我們之前所說的**加鎖**和**排隊**是不夠的,還要有**通知**。 ### 定義: 生產者和消費者在同一時間段內共用同一個儲存空間,生產者往儲存空間中新增產品,消費者從儲存空間中取走產品,當儲存空間為空時,消費者阻塞,當儲存空間滿時,生產者阻塞。 為了解決雙方能力不等而等待的問題,引入對應的解決方案。生產者消費者模型是一種併發協作模型。
# 二、解決方式介紹
## 2.1 管程法 1. **生產者**:負責生產資料的模組(模組可能是方法、物件、執行緒、程序); 2. **消費者**:負責處理資料的模組(模組可能是方法、物件、執行緒、程序); 3. **緩衝區**:消費者不能直接使用生產者的資料,它們之間有個“緩衝區”(緩衝區一般是佇列)。 生產者和消費者都是通過緩衝區進行資料的 放 和 拿 。 這樣的話,一來可以避免旱的旱死,澇的澇死的問題:不管哪一方過快或者過慢,緩衝區始終有一部分資料;二來能夠達到生產者和消費者的解耦,不再直接通訊,從而提高效率。 因為容器相當於一個輸送商品的管道,所以成為**管程法**。 ## 2.2 訊號燈法 採用類似紅燈綠燈的模式,決定車走還是人走。 - 管程法使用容器的狀態來控制,資料在容器中; - 而訊號燈法只是用訊號來給生產者和消費者提醒,他們的互動資料並不由訊號燈來保管。 ## 2.3 Object類 jdk 裡面 Object 類老早就有提供解決執行緒間通訊的問題的方法: 1. **wait()**:表示執行緒一直等待,直到其他執行緒通知(也就是呼叫了notify或者notifyAll方法),與sleep不同,會釋放鎖; 2. **wait(long timeout)**:指定時間; 3. **notify()**:喚醒一個處於等待狀態的執行緒; 4. **notifyAll()**:喚醒同一個物件上所有呼叫 wait() 方法的執行緒,優先級別高的執行緒優先排程。 這幾個方法都是在**同步方法或者同步程式碼塊**中使用,否則會丟擲異常。 (很多面試題問 Java 的 Object 類有哪些方法,都是希望得到關於這塊的答案,引到多執行緒)
# 三、管程法實現
管程法實現的四個角色: 1. 生產者和消費者都是多執行緒; 2. 中間的緩衝區應該是一個容器,並且需要的是一個**併發容器**,java.util.concurrent包裡面已經提供了; 3. 資源,也就是各個角色來回交換的商品。 利用 Object 類的幾個方法,來實現管程法,以下是程式碼示例: ```java /** * 協作模型:生產者消費者模型實現:管程法 */ public class Cooperation1 { public static void main(String[] args) { Container container = new Container(); new Producer(container).start(); new Consumer(container).start(); } } /** * 生產者 */ class Producer extends Thread{ Container container; public Producer(Container container){ this.container = container; } @Override public void run() { //生產過程 for (int i=0; i<10; i++){ System.out.println("生產第 " + i + " 個饅頭"); container.push(new Hamburger(i)); } } } /** * 消費者 */ class Consumer extends Thread{ Container container; public Consumer(Container container){ this.container = container; } @Override public void run() { //消費過程 for (int i=0; i<10; i++){ System.out.println("消費第 " + container.pop().id + " 個饅頭"); } } } /** * 緩衝區,操作商品,並和生產者、消費者互動 */ class Container{ Hamburger[] food = new Hamburger[10]; private int count = 0; //儲存:生產 public synchronized void push(Hamburger hamburger){ if (count == food.length){ try { this.wait();//阻塞,但是等待消費者通知後會解除 } catch (InterruptedException e) { e.printStackTrace(); } } food[count++] = hamburger; this.notifyAll();//說明存在資料了,通知消費者消費 } //獲取:消費 public synchronized Hamburger pop(){ if (count ==0 ){ try { this.wait();//阻塞,直到生產者通知後會解除 } catch (InterruptedException e) { e.printStackTrace(); } } Hamburger ans = food[--count]; this.notifyAll();//存在空餘空間了,通知生產者生產 return ans; } } /** * 商品 */ class Hamburger{ int id; public Hamburger(int id) { this.id = id; } } ``` 其中的核心有這麼幾點: 1. 容器相當於一個棧,是後進先出的; 2. 容器的兩個方法對於資源的操作,一個和生產者互動,一個和消費者互動,除了 synchronized 修飾,因為兩個方法是互斥的,所以利用 wait 和 notify 方法使他們完成阻塞和解除阻塞; 3. 生產者和容器互動,新增資料; 4. 消費者和容器互動,刪除資料。 前面關於 [執行緒的阻塞問題,生命週期裡的阻塞](https://www.cnblogs.com/lifegoeson/p/13516019.html),完整的可能情況,就包含這裡的阻塞情況:

# 四、訊號燈法實現
和上一種通過容器的容量讓執行緒之間互相通知的方法不同,訊號燈法沒有用資料快取的方式,而是用**訊號燈來指示雙方**,對方是否已經準備好了要和你通訊。 下面是一個 電視直播和觀眾的程式碼示例,通過訊號燈,通知演員和觀眾直播,確保演員在演的時候,讓觀眾來看。 ```java /** * 協作模型:生產者消費者實現:訊號燈法 */ public class Cooperation2 { public static void main(String[] args) { TV tv = new TV(); new Actor(tv).start(); new Fans(tv).start(); } } /** * 生產者:演員 */ class Actor extends Thread{ TV tv; public Actor(TV tv){ this.tv = tv; } @Override public void run() { for (int i=0; i<10; i++){ if (i%2 == 0){ this.tv.play("節目 " + i); }else{ this.tv.play("廣告 " + i); } } } } /** * 消費者:觀眾 */ class Fans extends Thread{ TV tv; public Fans(TV tv){ this.tv = tv; } @Override public void run() { for (int i=0; i<10; i++){ tv.watch(); } } } /** * 共同資源:電視直播 */ class TV{ String voice; //訊號燈,如果為真則演員準備,觀眾等待 //如果為假,則觀眾就位,演員等待 boolean flag = true; //表演方法:針對生產者 public synchronized void play(String voice){ //演員等待 if (!flag){ try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } this.voice = voice; System.out.println("表演 "+voice +" ing"); //喚醒觀眾 this.notifyAll(); this.flag = !flag; } //觀看方法:針對消費者 public synchronized void watch(){ //觀眾等待 if (flag){ try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("觀看 " + voice +" ing"); this.notifyAll(); this.flag = !flag; } } ``` 可以看到,相比管程法的核心區別是: TV **沒有用一個容器儲存資料**,只是通過生產者是否生產,來決定**訊號燈**的標誌,以此**通知**消費者來消費。 顯然這兩種實現方法,有不同的適用場景,那就是決定於生產者消費者是否有資料