Java併發程式設計(4)--生產者與消費者模式介紹
一、前言
這種模式在生活是最常見的,那麼它的場景是什麼樣的呢? 下面是我假象的,假設有一個倉庫,倉庫有一個生產者和一個消費者,消費者過來消費的時候會檢測倉庫中是否有庫存,如果沒有了則等待生產,如果有就先消費直至消費完成;而生產者每天的工作就是先檢測倉庫是否有庫存,如果沒有就開始生產,滿倉了就停止生產等待消費,直至工作結束。下圖是根據假象畫的流程圖:
那麼在程式中怎麼才能達到這樣的效果呢?下面介紹三種方式實現。
二、使用notify() 和 wait()實現
相信大家這兩個方法都不陌生,它是Object類中的兩個方法,具體請看原始碼中的解釋。提醒一點就是使用notify()和wait()方法時必須擁有物件鎖
根據上面假象我這定義一下明確場景:倉庫庫存有個最大值,如果倉庫庫存已經達到最大值那麼就停止生產,小於就需要生產; 如果庫存等於0則需要等待生產停止消費。另外生產者有個生產目標,當它生產了目標數後就結束生產;消費者也是,當消費一定的資料後就結束消費,否則等待消費。
見下面程式碼:
package com.yuanfy.jmm.threads; import com.yuanfy.util.SleepUtils; import java.util.concurrent.TimeUnit; public class Factory { // 當前庫存大小 privateint size; // 庫存容量(最大庫存值) private int capacity; public Factory(int capacity) { this.capacity = capacity; } public synchronized void produce(int num) { try { System.out.println("+++++生產者【" + Thread.currentThread().getName()+ "】, 他的任務是生產" + num + "件產品."); // 當生產完成就停止 while (num > 0) { // 如果當前庫存大小大於或等於庫存容量值了,則停止生產等待消費。 if (size >= capacity) { System.out.println("+++++" + Thread.currentThread().getName() + "檢測庫存已滿,停止生產等待消費..."); // 等待消費 wait(); System.out.println("+++++" + Thread.currentThread().getName() + "開始生產..."); } // 否則繼續生產 int inc = (num + size) > capacity ? (capacity - size) : num; num -= inc; size += inc; SleepUtils.second(1); System.out.println("+++++" + Thread.currentThread().getName() + " 生產了" + inc + "件,當前庫存有" + size + "件."); // 生產後喚醒消費者 notify(); } System.out.println("+++++生產者【" + Thread.currentThread().getName() + "】 生產結束."); } catch (InterruptedException e) { e.printStackTrace(); } } public synchronized void consume(int num) { try { System.out.println("-----消費者【" + Thread.currentThread().getName() + "】, 他需要消費" + num + "件產品."); // 當消費完成則停止 while (num > 0) { // 如果當前庫存大小小於等於0,則停止消費等待生產。 if (size <= 0) { System.out.println("-----" + Thread.currentThread().getName() + " 檢測庫存已空,停止消費等待生產..."); // 等待生產 wait(); System.out.println("-----" + Thread.currentThread().getName() + " 開始消費..."); } // 否則繼續消費 int dec = (size - num) > 0 ? num : size; num -= dec; size -= dec; SleepUtils.second(1); System.out.println("-----" + Thread.currentThread().getName() + " 消費了" + dec + "件,當前有" + size + "件."); // 消費後喚醒生產者繼續生產 notify(); } System.out.println("-----消費者【" + Thread.currentThread().getName() + "】 消費結束."); } catch (InterruptedException e) { e.printStackTrace(); } } }
上面是工廠(倉庫)類,主要包含兩個任務一個是生產一個是消費,接下來建立兩個執行緒去呼叫它,如下:
package com.yuanfy.jmm.threads; /** * 生產執行緒 */ class Produce { private Factory factory; public Produce(Factory factory) { this.factory = factory; } public void produce(String name, final int num) { new Thread(new Runnable() { @Override public void run() { factory.produce(num); } }, name).start(); } } /** * 消費執行緒 */ class Consume { private Factory factory; public Consume(Factory factory) { this.factory = factory; } public void consume(String name, final int num) { new Thread(new Runnable() { @Override public void run() { factory.consume(num); } }, name).start(); } } public class ProduceConsumeDemo { public static void main(String[] args) { Factory f = new Factory(500); Consume consume = new Consume(f); consume.consume("消費執行緒",600); Produce produce = new Produce(f); produce.produce("生產執行緒",800); } }
注意上方,消費執行緒和生產執行緒都是擁有同一個工廠物件,然後進行生產和消費模式。那麼我們直接執行,結果如下:
三、使用鎖中的Condition物件進行控制
這種方式估計用的比較少,因為使用Condition必須先使用鎖Lock。這裡我只介紹怎麼用Condition物件進行控制實現生產者與消費者模式的實現。
其實它跟上面那種方法有點類似,Condition物件中await()方法表示等待,signal()方法表示喚醒(看了AQS原始碼的應該都知道有這個物件且瞭解過這兩個方法)。下面看下具體怎麼實現:
public class Factory { // 當前大小 private int size; // 總容量 private int capacity; private Lock lock; // 已滿的條件 private Condition fullCondition; // 已空的條件 private Condition emptyCondition; public Factory(int capacity) { this.capacity = capacity; lock = new ReentrantLock(); fullCondition = lock.newCondition(); emptyCondition = lock.newCondition(); } public void produce(int no) { lock.lock(); try { while (no > 0) { while (size >= capacity) { System.out.println(Thread.currentThread().getName() + " 報告倉庫已滿,等待快遞員取件..."); fullCondition.await(); System.out.println(Thread.currentThread().getName() + " 報告開始進貨..."); } int inc = (no + size) > capacity ? (capacity - size) : no; no -= inc; size += inc; System.out.println(Thread.currentThread().getName() + " 報告進貨了: " + inc + "件, 當前庫存數: " + size); emptyCondition.signal(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void consume(int no) { lock.lock(); try { while (no > 0) { while (size <= 0) { System.out.println(Thread.currentThread().getName() + " 報告倉庫已空,等待倉庫管理員進貨"); emptyCondition.await(); System.out.println(Thread.currentThread().getName() + " 報告開始取件..."); } int dec = (size - no) > 0 ? no : size; no -= dec; size -= dec; System.out.println(Thread.currentThread().getName() + " 報告取件: " + dec + ", 當前庫存數: " + size); fullCondition.signal(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }
看了上面工廠類的程式碼後是不是跟使用Object中wait()和notify()方法類似呢。 主要區別就是擁有物件的方式不一樣,這裡使用的lock進行且需要手動釋放,而第一種是需要Synchronized進行控制。
四、使用阻塞佇列進行實現
這個就很簡單了,它已經封裝好等待和喚醒的操作,所以不進行案例分享了。其中涉及到兩個重要方法put() 和 take