1. 程式人生 > >Java多種方式解決生產者消費者問題(十分詳細)

Java多種方式解決生產者消費者問題(十分詳細)

一、問題描述

生產者消費者問題(Producer-consumer problem),也稱有限緩衝問題(Bounded-buffer problem),是一個多執行緒同步問題的經典案例。生產者生成一定量的資料放到緩衝區中,然後重複此過程;與此同時,消費者也在緩衝區消耗這些資料。生產者和消費者之間必須保持同步,要保證生產者不會在緩衝區滿時放入資料,消費者也不會在緩衝區空時消耗資料。不夠完善的解決方法容易出現死鎖的情況,此時程序都在等待喚醒。

示意圖:
生產者消費者

二、解決方法

思路

  1. 採用某種機制保護生產者和消費者之間的同步。有較高的效率,並且易於實現,程式碼的可控制性較好,屬於常用的模式。

  2. 在生產者和消費者之間建立一個管道。管道緩衝區不易控制,被傳輸資料物件不易於封裝等,實用性不強。

解決問題的核心

   保證同一資源被多個執行緒併發訪問時的完整性。常用的同步方法是採用訊號或加鎖機制,保證資源在任意時刻至多被一個執行緒訪問。

Java能實現的幾種方法

  1. wait() / notify()方法

  2. await() / signal()方法

  3. BlockingQueue阻塞佇列方法

  4. 訊號量

  5. 管道

三、程式碼實現

1. wait() / notify()方法

當緩衝區已滿時,生產者執行緒停止執行,放棄鎖,使自己處於等狀態,讓其他執行緒執行;
當緩衝區已空時,消費者執行緒停止執行,放棄鎖,使自己處於等狀態,讓其他執行緒執行。

當生產者向緩衝區放入一個產品時,向其他等待的執行緒發出可執行的通知,同時放棄鎖,使自己處於等待狀態;
當消費者從緩衝區取出一個產品時,向其他等待的執行緒發出可執行的通知,同時放棄鎖,使自己處於等待狀態。

倉庫Storage.java

import java.util.LinkedList;

public class Storage {

    // 倉庫容量
    private final int MAX_SIZE = 10;
    // 倉庫儲存的載體
    private LinkedList<Object> list = new LinkedList<>();

    public
void produce() { synchronized (list) { while (list.size() + 1 > MAX_SIZE) { System.out.println("【生產者" + Thread.currentThread().getName() + "】倉庫已滿"); try { list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } list.add(new Object()); System.out.println("【生產者" + Thread.currentThread().getName() + "】生產一個產品,現庫存" + list.size()); list.notifyAll(); } } public void consume() { synchronized (list) { while (list.size() == 0) { System.out.println("【消費者" + Thread.currentThread().getName() + "】倉庫為空"); try { list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } list.remove(); System.out.println("【消費者" + Thread.currentThread().getName() + "】消費一個產品,現庫存" + list.size()); list.notifyAll(); } } }

生產者

public class Producer implements Runnable{
    private Storage storage;

    public Producer(){}

    public Producer(Storage storage , String name){
        this.storage = storage;
    }

    @Override
    public void run(){
        while(true){
            try{
                Thread.sleep(1000);
                storage.produce();
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
    }
}

消費者

public class Consumer implements Runnable{
    private Storage storage;

    public Consumer(){}

    public Consumer(Storage storage , String name){
        this.storage = storage;
    }

    @Override
    public void run(){
        while(true){
            try{
                Thread.sleep(3000);
                storage.consume();
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
    }
}

主函式

public class Main {

    public static void main(String[] args) {
        Storage storage = new Storage();
        Thread p1 = new Thread(new Producer(storage));
        Thread p2 = new Thread(new Producer(storage));
        Thread p3 = new Thread(new Producer(storage));

        Thread c1 = new Thread(new Consumer(storage));
        Thread c2 = new Thread(new Consumer(storage));
        Thread c3 = new Thread(new Consumer(storage));

        p1.start();
        p2.start();
        p3.start();
        c1.start();
        c2.start();
        c3.start();
    }
}

執行結果

【生產者p1】生產一個產品,現庫存1
【生產者p2】生產一個產品,現庫存2
【生產者p3】生產一個產品,現庫存3
【生產者p1】生產一個產品,現庫存4
【生產者p2】生產一個產品,現庫存5
【生產者p3】生產一個產品,現庫存6
【生產者p1】生產一個產品,現庫存7
【生產者p2】生產一個產品,現庫存8
【消費者c1】消費一個產品,現庫存7
【生產者p3】生產一個產品,現庫存8
【消費者c2】消費一個產品,現庫存7
【消費者c3】消費一個產品,現庫存6
【生產者p1】生產一個產品,現庫存7
【生產者p2】生產一個產品,現庫存8
【生產者p3】生產一個產品,現庫存9
【生產者p1】生產一個產品,現庫存10
【生產者p2】倉庫已滿
【生產者p3】倉庫已滿
【生產者p1】倉庫已滿
【消費者c1】消費一個產品,現庫存9
【生產者p1】生產一個產品,現庫存10
【生產者p3】倉庫已滿
。。。。。。以下省略

一個生產者執行緒執行produce方法,睡眠1s;一個消費者執行一次consume方法,睡眠3s。此次實驗過程中,有3個生產者和3個消費者,也就是我們說的多對多的情況。倉庫的容量為10,可以看出消費的速度明顯慢於生產的速度,符合設定。

注意:

notifyAll()方法可使所有正在等待佇列中等待同一共享資源的“全部”執行緒從等待狀態退出,進入可執行狀態。此時,優先順序最高的哪個執行緒最先執行,但也有可能是隨機執行的,這要取決於JVM虛擬機器的實現。即最終也只有一個執行緒能被執行,上述執行緒優先順序都相同,每次執行的執行緒都不確定是哪個,後來給執行緒設定優先順序後也跟預期不一樣,還是要看JVM的具體實現吧。

2. await() / signal()方法

在JDK5中,用ReentrantLock和Condition可以實現等待/通知模型,具有更大的靈活性。通過在Lock物件上呼叫newCondition()方法,將條件變數和一個鎖物件進行繫結,進而控制併發程式訪問競爭資源的安全。

在這裡只需改動Storage類

import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Storage {

    // 倉庫最大儲存量
    private final int MAX_SIZE = 10;
    // 倉庫儲存的載體
    private LinkedList<Object> list = new LinkedList<Object>();
    // 鎖
    private final Lock lock = new ReentrantLock();
    // 倉庫滿的條件變數
    private final Condition full = lock.newCondition();
    // 倉庫空的條件變數
    private final Condition empty = lock.newCondition();

    public void produce()
    {
        // 獲得鎖
        lock.lock();
        while (list.size() + 1 > MAX_SIZE) {
            System.out.println("【生產者" + Thread.currentThread().getName()
                     + "】倉庫已滿");
            try {
                full.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        list.add(new Object());
        System.out.println("【生產者" + Thread.currentThread().getName() 
                 + "】生產一個產品,現庫存" + list.size());

        // 喚醒其他所有執行緒、釋放鎖
        full.signalAll();
        empty.signalAll();
        lock.unlock();
    }

    public void consume()
    {
        // 獲得鎖
        lock.lock();
        while (list.size() == 0) {
            System.out.println("【消費者" + Thread.currentThread().getName()
                     + "】倉庫為空");
            try {
                empty.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        list.remove();
        System.out.println("【消費者" + Thread.currentThread().getName()
                 + "】消費一個產品,現庫存" + list.size());

        // 喚醒其他所有執行緒、釋放鎖
        full.signalAll();
        empty.signalAll();
        lock.unlock();
    }
}

執行結果與wait()/notify()類似

3. BlockingQueue阻塞佇列方法

BlockingQueue是JDK5.0的新增內容,它是一個已經在內部實現了同步的佇列,實現方式採用的是我們第2種await() / signal()方法。它可以在生成物件時指定容量大小,用於阻塞操作的是put()和take()方法。

put()方法:類似於我們上面的生產者執行緒,容量達到最大時,自動阻塞。
take()方法:類似於我們上面的消費者執行緒,容量為0時,自動阻塞。

import java.util.concurrent.LinkedBlockingQueue;

public class Storage {

    // 倉庫儲存的載體
    private LinkedBlockingQueue<Object> list = new LinkedBlockingQueue<>(10);

    public void produce() {
        try{
            list.put(new Object());
            System.out.println("【生產者" + Thread.currentThread().getName()
                    + "】生產一個產品,現庫存" + list.size());
        } catch (InterruptedException e){
            e.printStackTrace();
        }
    }

    public void consume() {
        try{
            list.take();
            System.out.println("【消費者" + Thread.currentThread().getName()
                    + "】消費了一個產品,現庫存" + list.size());
        } catch (InterruptedException e){
            e.printStackTrace();
        }
    }
}

可能會出現put()或take()和System.out.println()輸出不匹配的情況,是由於它們之間沒有同步造成的。BlockingQueue可以放心使用,這可不是它的問題,只是在它和別的物件之間的同步有問題。

4. 訊號量

Semaphore是一種基於計數的訊號量。它可以設定一個閾值,基於此,多個執行緒競爭獲取許可訊號,做完自己的申請後歸還,超過閾值後,執行緒申請許可訊號將會被阻塞。Semaphore可以用來構建一些物件池,資源池之類的,比如資料庫連線池,我們也可以建立計數為1的Semaphore,將其作為一種類似互斥鎖的機制,這也叫二元訊號量,表示兩種互斥狀態。計數為0的Semaphore是可以release的,然後就可以acquire(即一開始使執行緒阻塞從而完成其他執行。)。

import java.util.LinkedList;
import java.util.concurrent.Semaphore;

public class Storage {

    // 倉庫儲存的載體
    private LinkedList<Object> list = new LinkedList<Object>();
    // 倉庫的最大容量
    final Semaphore notFull = new Semaphore(10);
    // 將執行緒掛起,等待其他來觸發
    final Semaphore notEmpty = new Semaphore(0);
    // 互斥鎖
    final Semaphore mutex = new Semaphore(1);

    public void produce()
    {
        try {
            notFull.acquire();
            mutex.acquire();
            list.add(new Object());
            System.out.println("【生產者" + Thread.currentThread().getName()
                    + "】生產一個產品,現庫存" + list.size());
        }
        catch (Exception e) {
            e.printStackTrace();
        } finally {
            mutex.release();
            notEmpty.release();
        }
    }

    public void consume()
    {
        try {
            notEmpty.acquire();
            mutex.acquire();
            list.remove();
            System.out.println("【消費者" + Thread.currentThread().getName()
                    + "】消費一個產品,現庫存" + list.size());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            mutex.release();
            notFull.release();
        }
    }
}

5. 管道

一種特殊的流,用於不同執行緒間直接傳送資料,一個執行緒傳送資料到輸出管道,另一個執行緒從輸入管道中讀資料。

inputStream.connect(outputStream)或outputStream.connect(inputStream)作用是使兩個Stream之間產生通訊連結,這樣才可以將資料進行輸出與輸入。

這種方式只適用於兩個執行緒之間通訊,不適合多個執行緒之間通訊。

1. PipedInputStream / PipedOutputStream (操作位元組流)

Producer

import java.io.IOException;
import java.io.PipedOutputStream;

public class Producer implements Runnable {

    private PipedOutputStream pipedOutputStream;

    public Producer() {
        pipedOutputStream = new PipedOutputStream();
    }

    public PipedOutputStream getPipedOutputStream() {
        return pipedOutputStream;
    }

    @Override
    public void run() {
        try {
            for (int i = 1; i <= 5; i++) {
                pipedOutputStream.write(("This is a test, Id=" + i + "!\n").getBytes());
            }
            pipedOutputStream.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Consumer

import java.io.IOException;
import java.io.PipedInputStream;

public class Consumer implements Runnable {
    private PipedInputStream pipedInputStream;

    public Consumer() {
        pipedInputStream = new PipedInputStream();
    }

    public PipedInputStream getPipedInputStream() {
        return pipedInputStream;
    }

    @Override
    public void run() {
        int len = -1;
        byte[] buffer = new byte[1024];
        try {
            while ((len = pipedInputStream.read(buffer)) != -1) {
                System.out.println(new String(buffer, 0, len));
            }
            pipedInputStream.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

主函式

import java.io.IOException;

public class Main {

    public static void main(String[] args) {
        Producer p = new Producer();
        Consumer c = new Consumer();
        Thread t1 = new Thread(p);
        Thread t2 = new Thread(c);
        try {
            p.getPipedOutputStream().connect(c.getPipedInputStream());
            t2.start();
            t1.start();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
2. PipedReader / PipedWriter (操作字元流)

Producer

import java.io.IOException;
import java.io.PipedWriter;

public class Producer implements Runnable {

    private PipedWriter pipedWriter;

    public Producer() {
        pipedWriter = new PipedWriter();
    }

    public PipedWriter getPipedWriter() {
        return pipedWriter;
    }

    @Override
    public void run() {
        try {
            for (int i = 1; i <= 5; i++) {
                pipedWriter.write("This is a test, Id=" + i + "!\n");
            }
            pipedWriter.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Consumer

import java.io.IOException;
import java.io.PipedReader;

public class Consumer implements Runnable {
    private PipedReader pipedReader;

    public Consumer() {
        pipedReader = new PipedReader();
    }

    public PipedReader getPipedReader() {
        return pipedReader;
    }

    @Override
    public void run() {
        int len = -1;
        char[] buffer = new char[1024];
        try {
            while ((len = pipedReader.read(buffer)) != -1) {
                System.out.println(new String(buffer, 0, len));
            }
            pipedReader.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

主函式

import java.io.IOException;

public class Main {

    public static void main(String[] args) {
        Producer p = new Producer();
        Consumer c = new Consumer();
        Thread t1 = new Thread(p);
        Thread t2 = new Thread(c);
        try {
            p.getPipedWriter().connect(c.getPipedReader());
            t2.start();
            t1.start();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

想檢視上面幾種方式的完整程式碼,請點選這裡:生產者消費者問題的一些實驗

參考