1. 程式人生 > >Java併發程式設計(4)--生產者與消費者模式介紹

Java併發程式設計(4)--生產者與消費者模式介紹

一、前言

  這種模式在生活是最常見的,那麼它的場景是什麼樣的呢? 下面是我假象的,假設有一個倉庫,倉庫有一個生產者和一個消費者,消費者過來消費的時候會檢測倉庫中是否有庫存,如果沒有了則等待生產,如果有就先消費直至消費完成;而生產者每天的工作就是先檢測倉庫是否有庫存,如果沒有就開始生產,滿倉了就停止生產等待消費,直至工作結束。下圖是根據假象畫的流程圖:

  那麼在程式中怎麼才能達到這樣的效果呢?下面介紹三種方式實現。

二、使用notify() 和 wait()實現

  相信大家這兩個方法都不陌生,它是Object類中的兩個方法,具體請看原始碼中的解釋。提醒一點就是使用notify()和wait()方法時必須擁有物件鎖

  根據上面假象我這定義一下明確場景:倉庫庫存有個最大值,如果倉庫庫存已經達到最大值那麼就停止生產,小於就需要生產; 如果庫存等於0則需要等待生產停止消費。另外生產者有個生產目標,當它生產了目標數後就結束生產;消費者也是,當消費一定的資料後就結束消費,否則等待消費。

  見下面程式碼:

package com.yuanfy.jmm.threads;

import com.yuanfy.util.SleepUtils;

import java.util.concurrent.TimeUnit;

public class Factory {
    // 當前庫存大小
    private
int size; // 庫存容量(最大庫存值) private int capacity; public Factory(int capacity) { this.capacity = capacity; } public synchronized void produce(int num) { try { System.out.println("+++++生產者【" + Thread.currentThread().getName()
+ "】, 他的任務是生產" + num + "件產品."); // 當生產完成就停止 while (num > 0) { // 如果當前庫存大小大於或等於庫存容量值了,則停止生產等待消費。 if (size >= capacity) { System.out.println("+++++" + Thread.currentThread().getName() + "檢測庫存已滿,停止生產等待消費..."); // 等待消費 wait(); System.out.println("+++++" + Thread.currentThread().getName() + "開始生產..."); } // 否則繼續生產 int inc = (num + size) > capacity ? (capacity - size) : num; num -= inc; size += inc; SleepUtils.second(1); System.out.println("+++++" + Thread.currentThread().getName() + " 生產了" + inc + "件,當前庫存有" + size + "件."); // 生產後喚醒消費者 notify(); } System.out.println("+++++生產者【" + Thread.currentThread().getName() + "】 生產結束."); } catch (InterruptedException e) { e.printStackTrace(); } } public synchronized void consume(int num) { try { System.out.println("-----消費者【" + Thread.currentThread().getName() + "】, 他需要消費" + num + "件產品."); // 當消費完成則停止 while (num > 0) { // 如果當前庫存大小小於等於0,則停止消費等待生產。 if (size <= 0) { System.out.println("-----" + Thread.currentThread().getName() + " 檢測庫存已空,停止消費等待生產..."); // 等待生產 wait(); System.out.println("-----" + Thread.currentThread().getName() + " 開始消費..."); } // 否則繼續消費 int dec = (size - num) > 0 ? num : size; num -= dec; size -= dec; SleepUtils.second(1); System.out.println("-----" + Thread.currentThread().getName() + " 消費了" + dec + "件,當前有" + size + "件."); // 消費後喚醒生產者繼續生產 notify(); } System.out.println("-----消費者【" + Thread.currentThread().getName() + "】 消費結束."); } catch (InterruptedException e) { e.printStackTrace(); } } }

  上面是工廠(倉庫)類,主要包含兩個任務一個是生產一個是消費,接下來建立兩個執行緒去呼叫它,如下:

package com.yuanfy.jmm.threads;

/**
 * 生產執行緒
 */
class Produce {
    private Factory factory;

    public Produce(Factory factory) {
        this.factory = factory;
    }

    public void produce(String name, final int num) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                factory.produce(num);
            }
        }, name).start();
    }
}
/**
 * 消費執行緒
 */
class Consume {
    private Factory factory;

    public Consume(Factory factory) {
        this.factory = factory;
    }

    public void consume(String name, final int num) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                factory.consume(num);
            }
        }, name).start();
    }
}

public class ProduceConsumeDemo {

    public static void main(String[] args) {
        Factory f = new Factory(500);

        Consume consume = new Consume(f);
        consume.consume("消費執行緒",600);

        Produce produce = new Produce(f);
        produce.produce("生產執行緒",800);
    }
}

  注意上方,消費執行緒和生產執行緒都是擁有同一個工廠物件,然後進行生產和消費模式。那麼我們直接執行,結果如下:

  

 三、使用鎖中的Condition物件進行控制

  這種方式估計用的比較少,因為使用Condition必須先使用鎖Lock。這裡我只介紹怎麼用Condition物件進行控制實現生產者與消費者模式的實現。

  其實它跟上面那種方法有點類似,Condition物件中await()方法表示等待,signal()方法表示喚醒(看了AQS原始碼的應該都知道有這個物件且瞭解過這兩個方法)。下面看下具體怎麼實現:

public class Factory {
    // 當前大小
    private int size;

    // 總容量
    private int capacity;

    private Lock lock;

    // 已滿的條件
    private Condition fullCondition;

    // 已空的條件
    private Condition emptyCondition;

    public Factory(int capacity) {
        this.capacity = capacity;
        lock = new ReentrantLock();
        fullCondition = lock.newCondition();
        emptyCondition = lock.newCondition();
    }

    public void produce(int no) {
        lock.lock();
        try {
            while (no > 0) {
                while (size >= capacity) {
                    System.out.println(Thread.currentThread().getName() + " 報告倉庫已滿,等待快遞員取件...");
                    fullCondition.await();
                    System.out.println(Thread.currentThread().getName() + " 報告開始進貨...");
                }
                int inc = (no + size) > capacity ? (capacity - size) : no;
                no -= inc;
                size += inc;
                System.out.println(Thread.currentThread().getName() +
                        " 報告進貨了: " + inc + "件, 當前庫存數: " + size);
                emptyCondition.signal();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void consume(int no) {
        lock.lock();
        try {
            while (no > 0) {
                while (size <= 0) {
                    System.out.println(Thread.currentThread().getName() + " 報告倉庫已空,等待倉庫管理員進貨");
                    emptyCondition.await();
                    System.out.println(Thread.currentThread().getName() + " 報告開始取件...");
                }
                int dec = (size - no) > 0 ? no : size;
                no -= dec;
                size -= dec;
                System.out.println(Thread.currentThread().getName() +
                        " 報告取件: " + dec + ", 當前庫存數: " + size);
                fullCondition.signal();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

  看了上面工廠類的程式碼後是不是跟使用Object中wait()和notify()方法類似呢。 主要區別就是擁有物件的方式不一樣,這裡使用的lock進行且需要手動釋放,而第一種是需要Synchronized進行控制。

四、使用阻塞佇列進行實現

  這個就很簡單了,它已經封裝好等待和喚醒的操作,所以不進行案例分享了。其中涉及到兩個重要方法put() 和 take