1. 程式人生 > >執行緒安全的生產者消費者四種實現方法

執行緒安全的生產者消費者四種實現方法

問題描述

在IT技術面試過程中,我們經常會遇到生產者消費者問題(Producer-consumer problem), 這是多執行緒併發協作問題的經典案例。場景中包含三個物件,生產者(Producer),消費者(Consumer)以及一個固定大小的緩衝區(Buffer)。生產者的主要作用是不斷生成資料放到緩衝區,消費者則從緩衝區不斷消耗資料。該問題的關鍵是如何執行緒安全的操作共享資料塊,保證生產者執行緒和消費者執行緒可以正確的更新資料塊,主要考慮 1. 生產者不會在緩衝區滿時加入資料. 2. 消費者應當停止在緩衝區時消耗資料. 3. 在同一時間應當只允許一個生產者或者消費者訪問共享緩衝區(這一點是對於互斥操作訪問共享區塊的要求)。

解決方案

解決問題以上問題通常有訊號量,wait & notify, 管道或者阻塞佇列等幾種思路。本文以Java語言為例一一進行舉例講解。

訊號量

訊號量(Semaphore)也稱訊號燈,是用來控制資源被同時訪問的個數,比如控制訪問資料庫最大連線數的數量,執行緒通過acquire()獲得連線許可,完成資料操作後,通過release()釋放許可。對於生產者消費者問題來說,為了滿足執行緒安全操作的要求,同一時間我們只允許一個執行緒訪問共享資料區,因此需要一個大小為1的訊號量mutex來控制互斥操作。注意到我們還定義了notFull 和 notEmpty 訊號量,notFull用於標識當前可用區塊的空間大小,當notFull size 大於0時表明"not full", producer 可以繼續生產,等於0時表示空間已滿,無法繼續生產;同樣,對於notEmpty訊號量來說,大於0時表明 "not empty", consumer可以繼續消耗,等於0 時表明沒有產品,無法繼續消耗。notFull初始size 為5 (5個available空間可供生產),notEmpty初始為0(沒有產品可供消耗)。

   /*** 
     資料倉儲class,所有的producer和consumer共享這個class物件
   **/
    static class DataWareHouse {
       //共享資料區
        private final Queue<String> data = new LinkedList();
        //非滿鎖
        private final Semaphore notFull;
        //非空鎖
        private final Semaphore notEmpty;
        //互斥鎖
        private final Semaphore mutex;

        public DataWareHouse(int capacity) {
            this.notFull = new Semaphore(capacity);
            this.notEmpty = new Semaphore(0);
            mutex = new Semaphore(1);
        }
        public void offer(String x) throws InterruptedException {
            notFull.acquire(); //producer獲取訊號,notFull訊號量減一
            mutex.acquire(); //當前程序獲得訊號,mutex訊號量減1,其他執行緒被阻塞操作共享區塊data
            data.add(x);
            mutex.release(); //mutex訊號量+1, 其他執行緒可以繼續訊號操作共享區塊data
            notEmpty.release(); //成功生產資料,notEmpty訊號量加1
        }
        public String poll() throws InterruptedException {
            notEmpty.acquire(); //notEmpty訊號減一
            mutex.acquire();
            String result = data.poll();
            mutex.release();
            notFull.release(); //成功消耗資料, notFull訊號量加1
            return result;
        }
    }
   /**Producer執行緒**/
    static class Producer implements Runnable {
        private final DataWareHouse dataWareHouse;

        public Producer(final DataWareHouse dataWareHouse) {
            this.dataWareHouse = dataWareHouse;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(100); //生產的速度慢於消耗的速率
                    String s = UUID.randomUUID().toString();
                    System.out.println("put  data " + s);
                    dataWareHouse.offer(s);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
   /**Consumer執行緒**/
    static class Consumer implements Runnable {
        private final DataWareHouse dataWareHouse;

        public Consumer(final DataWareHouse dataWareHouse) {
            this.dataWareHouse = dataWareHouse;
        }

        @Override
        public void run() {
            while (true) {
                while (true) {
                    try {
                        System.out.println("get data " + dataWareHouse.poll());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    //測試程式碼
    public static void main(String[] args) {
        final DataWareHouse dataWareHouse = new DataWareHouse(5);
        //三個producer 持續生產
        for (int i = 0; i < 3; i++) {
            Thread t = new Thread(new Producer(dataWareHouse));
            t.start();
        }
        //三個consumer 持續消耗
        for (int i = 0; i < 3; i++) {
            Thread t = new Thread(new Consumer(dataWareHouse));
            t.start();
        }
    }

Wait 和 Notify 機制

Java Object物件類中包含三個final methods來允許執行緒之間進行通訊,告知資源的狀態。它們分別是wait(), notify(), 和notifyAll()。

wait(): 顧名思義告訴當前執行緒釋放鎖,陷入休眠狀態(waiting狀態),等待資源。wait 方法本身是一個native method,它在Java中的使用語法如下所示:

synchronized(lockObject )
{ 
    while( ! condition )
    { 
        lockObject.wait();
    }
    //take the action here;
}

notify(): 用於喚醒waiting狀態的執行緒, 同時釋放鎖,被喚醒的執行緒可以重新獲得鎖訪問資源。它的基本語法 如下

synchronized(lockObject) 
{
    //establish_the_condition;
    lockObject.notify();
    //any additional code if needed
}

notifyAll(): 不同於notify(),它用於喚醒所有處於waiting狀態的執行緒。語法如下:

synchronized(lockObject) 
{
    establish_the_condition;
    lockObject.notifyAll();
}

說完了這三個方法,來看下如何使用wait & notify(All) 來解決我們的問題。新的DataWareHouse 類如下所示:

    //producer類和consumer共享物件
    static class DataWareHouse {
        //共享資料區
        private final Queue<String> data = new LinkedList();
        private int capacity;
        private int size = 0;

        public DataWareHouse(int capacity) {
            this.capacity = capacity;
        }

        public synchronized void offer(String x) throws InterruptedException {
            while (size == capacity) { //當buffer滿時,producer進入waiting 狀態
                this.wait(); //使用this物件來加鎖
            }
            data.add(x);
            size++;
            notifyAll(); //當buffer 有資料時,喚醒所有等待的consumer執行緒
        }

        public synchronized String poll() throws InterruptedException {
            while (size == 0) {//當buffer為空時,consumer 進入等待狀態
                this.wait();
            }
            String result = data.poll();
            size--;   
            notifyAll(); //當資料被消耗,空間被釋放,通知所有等待的producer。
            return result;
        }
    }

Note: 在方法上使用synchronized 等價於在方法體內使用synchronized(this),兩者都是使用this物件作為鎖。

生產者和消費者類,以及測試程式碼和 訊號量 section 相同,不做重複列舉了。

管道

管道Pipe是實現程序或者執行緒(執行緒之間通常通過共享記憶體實現通訊,而程序則通過scoket,管道,訊息佇列等技術)之間通訊常用方式,它連線輸入流和輸出流,基於生產者- 消費者模式構建的一種技術。具體實現可以通過建立一個管道輸入流物件和管道輸出流物件,然後將輸入流和輸出流就行連結,生產者通過往管道中寫入資料,而消費者在管道資料流中讀取資料,通過這種方式就實現了執行緒之間的互相通訊。

具體實現程式碼如下所示

public class PipeSolution {
    static class DataWareHouse implements Closeable {
        private final PipedInputStream pis;
        private final PipedOutputStream pos;

        public DataWareHouse() throws IOException {
            pis = new PipedInputStream();
            pos = new PipedOutputStream();
            pis.connect(pos); //連線管道
        }
        //向管道中寫入資料
        public void offer(int val) throws IOException {
            pos.write(val);
            pos.flush();
        }
        //從管道中取資料.
        public int poll() throws IOException {
             //當管道中沒有資料,方法阻塞
            return pis.read();
        }
        //關閉管道
        @Override
        public void close() throws IOException {
            if (pis != null) {   
                pis.close();
            }
            if (pos != null) {
                pos.close();
            }
        }
    }
    //consumer類
    static class Consumer implements Runnable {
        private final DataWareHouse dataWareHouse;

        Consumer(DataWareHouse dataWareHouse) {
            this.dataWareHouse = dataWareHouse;
        }

        @Override
        public void run() {
            try {
                //消費者不斷從管道中讀取資料
                while (true) {
                    int num = dataWareHouse.poll();
                    System.out.println("get data +" + num);
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
    static class Producer implements Runnable {
        private final DataWareHouse dataWareHouse;
        private final Random random = new Random();

        Producer(DataWareHouse dataWareHouse) {
            this.dataWareHouse = dataWareHouse;
        }

        @Override
        public void run() {
            try {
                //生產者不斷向管道中寫入資料
                while (true) {
                    int num = random.nextInt(256);
                    dataWareHouse.offer(num);
                    System.out.println("put data +" + num);
                    Thread.sleep(1000);
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public static void main(String[] args) throws IOException {
            DataWareHouse dataWareHouse = new DataWareHouse();
            new Thread(new Producer(dataWareHouse)).start();
            new Thread(new Consumer(dataWareHouse)).start();
        }
    }

阻塞佇列

阻塞佇列(BlockingQueue),具有1. 當佇列滿了的時候阻塞入佇列操作 2. 當佇列空了的時候阻塞出佇列操作 3. 執行緒安全 的特性,因而阻塞佇列通常被視為實現生產消費者模式最便捷的工具,其中DataWareHouse類實現程式碼如下:

  static class DataWareHouse {
        //共享資料區
        private final BlockingQueue<String> blockingQueue;
        
        public DataWareHouse(int capacity) {
            this.blockingQueue = new ArrayBlockingQueue<>(capacity);
        }

        public void offer(String x) {
            blockingQueue.offer(x);
        }
        public String poll() {
            return blockingQueue.poll();
        }
    }

生產者和消費者類,以及測試程式碼和 訊號量 section 相同,在此不做重複列舉了。

總結

生產者消費者問題是面試中經常會遇到的題目,本文總結了幾種常見的實現方式,面試過程中通常不必要向面試官描述過多實現細節,說出每種實現方式的特點即可。希望能給大家帶來幫助。

Reference

  1. https://howtodoinjava.com/java/multi-threading/wait-notify-and-notifyall-methods/