1. 程式人生 > >java並發之生產者消費者模型

java並發之生產者消費者模型

isf tof on() acc sum sca span empty poll

生產者和消費者模型是操作系統中經典的同步問題。該問題最早由Dijkstra提出,用以演示它提出的信號量機制。

經典的生產者和消費者模型的描寫敘述是:有一群生產者進程在生產產品。並將這些產品提供給消費者進程去消費。為使生產者進程與消費者進程能並發執行,在兩者之間設置了一個具有n個緩沖區的緩沖池,生產者進程將它所生產的產品放入一個緩沖區中。消費者進程可從一個緩沖區中取走產品去消費。雖然全部的生產者進程和消費者進程都是以異步方式執行的。但它們之間必須保持同步,即不同意消費者進程到一個空緩沖區去取產品,也不同意生產者進程向一個已裝滿產品且尚未被取走的緩沖區投放產品。

技術分享圖片

首先我們復習一下操作系統中同步機制中應遵循的準則:

  1. 空暇讓進:當無進程處於臨界區時,應同意一個請求進入臨界區的進程進入臨界區;
  2. 忙則等待:當已有進程進入臨界區時,其它試圖進入臨界區的進程必須等待。
  3. 有限等待:對要求訪問臨界資源的進程,應保證在有限時間內能進入自己的臨界區。以免陷入“死等”狀態。
  4. 讓權等待:當進程不能進入自己的臨界區時,應馬上釋放處理機。以免進程陷入“忙等”;

    在生產者和消費者模型中要保證一下幾點:
    1.生產者在往緩存隊列中放產品時,消費者不能取產品。
    2.消費者從緩存隊列中取產品時。生產者不能放產品。
    3.同一時刻僅僅有一個生產者能夠往緩存隊列中放產品。
    4.同一時刻僅僅有一個消費者能夠從緩存隊列中取產品。


    5.緩存隊列滿時生產者不能往緩存隊列中放產品。
    6.緩存隊列為空時消費者不能從緩存隊列中取產品。

本樣例中的緩存隊列模仿java jdk中的ArrayBlockingQueue,這是一個堵塞隊列,緩存池滿時會自己主動將生產者線程掛起,緩存池空時會自己主動將消費者線程掛起。

緩存池

public class Pool<E> {

    /**隊列最長長度*/
    private  int MaxSize = 1000;

    /**隊列默認長度*/
    private static final int defaultSize = 100;

    /**資源池*/
    private
Object[] objs ; /**隊頭*/ private int front; /**隊尾*/ private int rear; /**元素的個數*/ private int nItems; /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; private int useSize = 0; public Pool() { this(defaultSize); useSize = defaultSize; } public Pool(int size) { if(size < 0) throw new IndexOutOfBoundsException(); size = size > MaxSize ? MaxSize : size; useSize = size; objs = new Object[size]; front = 0; rear = -1; nItems = 0; lock = new ReentrantLock(true); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } /**進隊*/ private void queue(E e) { if(rear == useSize - 1) rear = -1; objs[++rear] = e; nItems++; notEmpty.signal(); } /**出隊*/ private E dequeue() { E e = (E)objs[front++]; if(front == useSize) front = 0; nItems--; notFull.signal(); return e; } /**進隊 資源池滿會將入隊線程掛起*/ public void offer(E e) throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { while(nItems == objs.length) notFull.await(); queue(e); System.out.println("學生進隊。當前池中有 " + nItems + " 名同學" ); } finally { lock.unlock(); } } /**出隊 資源池空會將出隊線程掛起*/ public E poll() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { while(nItems == 0) notEmpty.await(); E e = dequeue(); System.out.println("學生出隊,當前池中有 " + nItems + " 名同學" ); return e; } finally { lock.unlock(); } } /**是否滿*/ public boolean isFull() { final ReentrantLock lock = this.lock; lock.lock(); try { return nItems == MaxSize ? true : false; } finally { lock.unlock(); } } /**推斷是否為空*/ public boolean isEmpty() { final ReentrantLock lock = this.lock; lock.lock(); try { return nItems == 0 ? true : false; } finally { lock.unlock(); } } /**返回隊列中元素個數*/ public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return this.nItems; } finally { lock.unlock(); } } }

測試模型

public class Student {

    private String name;
    private int age;
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public int getAge() {
        return age;
    }
    public void setAge(int age) {
        this.age = age;
    }


}

主類

public class PM {

    private Pool<Student> pools = new Pool<Student>(1000);

    public static void main(String[] args) {
        PM pm = new PM();
        ExecutorService executor = Executors.newFixedThreadPool(6); 
        executor.execute(pm.new consume());
        executor.execute(pm.new consume());
        executor.execute(pm.new consume());
        executor.execute(pm.new produce());
        executor.execute(pm.new produce());
        executor.execute(pm.new produce());
    }

     class produce implements Runnable {

        @Override
        public void run() {
            while(true) {
                try {
                    pools.offer(new Student());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }

    }

    class consume implements Runnable {

        @Override
        public void run() {
            while(true) {
                try {
                    pools.poll();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

    }

}

執行結果:
技術分享圖片

java並發之生產者消費者模型