1. 程式人生 > >生產者消費者問題Java三種實現

生產者消費者問題Java三種實現

read 可執行 tran extend 模式 fit ner consumer 傳輸數據

生產者-消費者Java實現

2017-07-27

1 概述


生產者消費者問題是多線程的一個經典問題,它描述是有一塊緩沖區作為倉庫,生產者可以將產品放入倉庫,消費者則可以從倉庫中取走產品。

解決生產者/消費者問題的方法可分為兩類:

  • 采用某種機制保護生產者和消費者之間的同步;
  • 在生產者和消費者之間建立一個管道。

第一種方式有較高的效率,並且易於實現,代碼的可控制性較好,屬於常用的模式。第二種管道緩沖區不易控制,被傳輸數據對象不易於封裝等,實用性不強。

在Java中有四種方法支持同步,其中前三個是同步方法,一個是管道方法。

  • wait() / notify()方法
  • await() / signal()方法
  • BlockingQueue阻塞隊列方法
  • PipedInputStream / PipedOutputStream

本文只介紹前三種。

2 實現


2.1 wait() / notify()方法

wait() / nofity()方法是基類Object的兩個方法:

  • wait()方法:當緩沖區已滿/空時,生產者/消費者線程停止自己的執行,放棄鎖,使自己處於等等狀態,讓其他線程執行。
  • notify()方法:當生產者/消費者向緩沖區放入/取出一個產品時,向其他等待的線程發出可執行的通知,同時放棄鎖,使自己處於等待狀態。

緩沖區Storage.java代碼如下:

技術分享
import
java.util.LinkedList; public class Storage { // 倉庫最大存儲量 private final int MAX_SIZE = 100; // 倉庫存儲的載體 private LinkedList<Object> list = new LinkedList<Object>(); // 生產產品 public void produce(String producer) { synchronized (list) {
// 如果倉庫已滿 while (list.size() == MAX_SIZE) { System.out.println("倉庫已滿,【"+producer+"】: 暫時不能執行生產任務!"); try { // 由於條件不滿足,生產阻塞 list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } // 生產產品 list.add(new Object()); System.out.println("【"+producer+"】:生產了一個產品\t【現倉儲量為】:" + list.size()); list.notifyAll(); } } // 消費產品 public void consume(String consumer) { synchronized (list) { //如果倉庫存儲量不足 while (list.size()==0) { System.out.println("倉庫已空,【"+consumer+"】: 暫時不能執行消費任務!"); try { // 由於條件不滿足,消費阻塞 list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } list.remove(); System.out.println("【"+consumer+"】:消費了一個產品\t【現倉儲量為】:" + list.size()); list.notifyAll(); } } public LinkedList<Object> getList() { return list; } public void setList(LinkedList<Object> list) { this.list = list; } public int getMAX_SIZE() { return MAX_SIZE; } }
View Code

生產者Producer.java代碼如下:

技術分享
public class Producer extends Thread
{
    private String producer;
    private Storage storage;

    public Producer(Storage storage)
    {
        this.storage = storage;
    }

    @Override
    public void run()
    {
        produce(producer);
    }

    public void produce(String producer)
    {
        storage.produce(producer);
    }

    public String getProducer()
    {
        return producer;
    }

    public void setProducer(String producer)
    {
        this.producer = producer;
    }

    public Storage getStorage()
    {
        return storage;
    }

    public void setStorage(Storage storage)
    {
        this.storage = storage;
    }
}
View Code

消費者Cosumer.java代碼如下:

技術分享
public class Consumer extends Thread
{
    private String consumer;
    private Storage storage;

    public Consumer(Storage storage)
    {
        this.storage = storage;
    }

    @Override
    public void run()
    {
        consume(consumer);
    }

    public void consume(String consumer)
    {
        storage.consume(consumer);
    }

    public Storage getStorage()
    {
        return storage;
    }

    public void setStorage(Storage storage)
    {
        this.storage = storage;
    }

    public String getConsumer() {
        return consumer;
    }

    public void setConsumer(String consumer) {
        this.consumer = consumer;
    }
}
View Code

結果如下:

倉庫已空,【消費者1】: 暫時不能執行消費任務!
【生產者3】:生產了一個產品    【現倉儲量為】:1
【消費者2】:消費了一個產品    【現倉儲量為】:0
倉庫已空,【消費者3】: 暫時不能執行消費任務!
【生產者1】:生產了一個產品    【現倉儲量為】:1
【生產者4】:生產了一個產品    【現倉儲量為】:2
【生產者2】:生產了一個產品    【現倉儲量為】:3
【生產者5】:生產了一個產品    【現倉儲量為】:4
【消費者1】:消費了一個產品    【現倉儲量為】:3
【消費者3】:消費了一個產品    【現倉儲量為】:2

2.2 await() / signal()方法

await()和signal()的功能基本上和wait() / nofity()相同,完全可以取代它們,但是它們和新引入的鎖定機制Lock直接掛鉤,具有更大的靈活性。通過在Lock對象上調用newCondition()方法,將條件變量和一個鎖對象進行綁定,進而控制並發程序訪問競爭資源的安全。

緩沖區Storage.java代碼如下:

技術分享
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Storage {
    // 倉庫最大存儲量
    private final int MAX_SIZE = 100;

    // 倉庫存儲的載體
    private LinkedList<Object> list = new LinkedList<Object>();
    //
    private final Lock lock = new ReentrantLock();

    // 倉庫滿的條件變量
    private final Condition full = lock.newCondition();

    // 倉庫空的條件變量
    private final Condition empty = lock.newCondition();

    // 生產產品
    public void produce(String producer) {
        lock.lock();
        // 如果倉庫已滿
        while (list.size() == MAX_SIZE) {
            System.out.println("倉庫已滿,【" + producer + "】: 暫時不能執行生產任務!");
            try {
                // 由於條件不滿足,生產阻塞
                full.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        // 生產產品
        list.add(new Object());

        System.out.println("【" + producer + "】:生產了一個產品\t【現倉儲量為】:" + list.size());

        empty.signalAll();

        // 釋放鎖
        lock.unlock();

    }

    // 消費產品
    public void consume(String consumer) {
        // 獲得鎖
        lock.lock();

        // 如果倉庫存儲量不足
        while (list.size() == 0) {
            System.out.println("倉庫已空,【" + consumer + "】: 暫時不能執行消費任務!");
            try {
                // 由於條件不滿足,消費阻塞
                empty.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        list.remove();
        System.out.println("【" + consumer + "】:消費了一個產品\t【現倉儲量為】:" + list.size());
        full.signalAll();
        
        // 釋放鎖
        lock.unlock();

    }

    public LinkedList<Object> getList() {
        return list;
    }

    public void setList(LinkedList<Object> list) {
        this.list = list;
    }

    public int getMAX_SIZE() {
        return MAX_SIZE;
    }
}
View Code

2.3 BlockingQueue

它是一個已經在內部實現了同步的隊列,實現方式采用的是我們第2種await() / signal()方法。它可以在生成對象時指定容量大小。它用於阻塞操作的是put()和take()方法:

put()方法:類似於我們上面的生產者線程,容量達到最大時,自動阻塞。

take()方法:類似於我們上面的消費者線程,容量為0時,自動阻塞。

技術分享
import java.util.concurrent.LinkedBlockingQueue;

public class Storage {
    // 倉庫最大存儲量
    private final int MAX_SIZE = 100;

    // 倉庫存儲的載體
    private LinkedBlockingQueue<Object> list = new LinkedBlockingQueue<Object>(100);  

    // 生產產品
    public void produce(String producer) {
        // 如果倉庫已滿
        if (list.size() == MAX_SIZE) {
            System.out.println("倉庫已滿,【" + producer + "】: 暫時不能執行生產任務!");            
        }

        // 生產產品
        try {
            list.put(new Object());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        System.out.println("【" + producer + "】:生產了一個產品\t【現倉儲量為】:" + list.size());
    }

    // 消費產品
    public void consume(String consumer) {
        // 如果倉庫存儲量不足
        if (list.size() == 0) {
            System.out.println("倉庫已空,【" + consumer + "】: 暫時不能執行消費任務!");            
        }

        try {
            list.take();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("【" + consumer + "】:消費了一個產品\t【現倉儲量為】:" + list.size());        

    }

    public LinkedBlockingQueue<Object> getList() {
        return list;
    }

    public void setList(LinkedBlockingQueue<Object> list) {
        this.list = list;
    }
    public int getMAX_SIZE() {
        return MAX_SIZE;
    }
}
View Code

生產者消費者問題Java三種實現