1. 程式人生 > >【JAVA多執行緒】如何解決一個生產者與消費者問題

【JAVA多執行緒】如何解決一個生產者與消費者問題

                             如何解決一個生產者與消費者問題

生產者與消費者問題是多執行緒同步的一個經典問題。生產者和消費者同時使用一塊緩衝區,生產者生產商品放入緩衝區,消費者從緩衝區中取出商品。我們需要保證的是,當緩衝區滿時,生產者不可生產商品;當緩衝區為空時,消費者不可取出商品。

下面介紹java中幾種解決同步問題的方式

(1)wait()與notify()

(2)Lock與Condition機制

(3)BlockingQueue阻塞佇列

【1】wait()與notify()

這兩個方法是object類中的方法

wait()用在以下場合:

(1)當緩衝區滿時,緩衝區呼叫wait()方法,使得生產者釋放鎖,當前執行緒阻塞,其他執行緒可以獲得鎖。

(2)當緩衝區空時,緩衝區呼叫wait()方法,使得消費者釋放鎖,當前執行緒阻塞,其他執行緒可以獲得鎖。

notify()用在以下場合:

(1)當緩衝區未滿時,生產者生產商品放入緩衝區,然後緩衝區呼叫notify()方法,通知上一個因wait()方法釋放鎖的執行緒現在可以去獲得鎖了,同步塊程式碼執行完成後,釋放物件鎖,此處的物件鎖,鎖住的是緩衝區。

(2)當緩衝區不為空時,消費者從緩衝區中取出商品,然後緩衝區呼叫notify()方法,通知上一個因wait()方法釋放鎖的執行緒現在可以去獲得鎖了,同步塊程式碼執行完成後,釋放物件鎖。

程式碼演示

package day1101;

import java.util.LinkedList;

/**
 * 生產者消費者問題
 */
public class ProAndCon {
    //最大容量
    public static final int MAX_SIZE = 2;
    //儲存媒介
    public static LinkedList<Integer> list = new LinkedList<>();

    class Producer implements Runnable {
        @Override
        public void run() {
            synchronized (list) {
                //倉庫容量已經達到最大值
                while (list.size() == MAX_SIZE) {
                    System.out.println("倉庫已滿,生產者" + Thread.currentThread().getName() + "不可生產.");
                    try {
                        list.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                list.add(1);
                System.out.println("生產者" + Thread.currentThread().getName() + "生產, 倉庫容量為" + list.size());
                list.notify();
            }
        }
    }

    class Consumer implements Runnable {

        @Override
        public void run() {
            synchronized (list) {
                while (list.size() == 0) {
                    System.out.println("倉庫為空,消費者" + Thread.currentThread().getName() + "不可消費.");
                    try {
                        list.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                list.removeFirst();
                System.out.println("消費者" + Thread.currentThread().getName() + "消費,倉庫容量為" + list.size());
                list.notify();
            }
        }
    }


    public static void main(String[] args) {
        ProAndCon proAndCon = new ProAndCon();
        Producer producer = proAndCon.new Producer();
        Consumer consumer = proAndCon.new Consumer();

        for (int i = 0; i < 10; i++) {
            Thread pro = new Thread(producer);
            pro.start();
            Thread con = new Thread(consumer);
            con.start();
        }
    }

}

執行結果:

【2】Lock與Condition機制

在JDK5.0之後,Java提供了Lock與Condition機制。Condition介面的await()和signal()是用來做同步的兩種方法,它們的功能基本上和Object的wait()、nofity()相同,或者說可以取代它們,但是它們和Lock機制是直接掛鉤的。通過在Lock物件上呼叫newCondition()方法,將條件變數和一個鎖物件進行繫結,進而控制併發程式訪問競爭資源的安全。

程式碼演示

package day1101;

import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ProAndCon2 {
    public static final int MAX_SIZE = 2;
    public static LinkedList<Integer> list = new LinkedList<>();
    public static Lock lock = new ReentrantLock();
    //倉庫滿的條件變數
    public static Condition full = lock.newCondition();
    //倉庫空的條件變數
    public static Condition empty = lock.newCondition();


    class Producer implements Runnable {

        @Override
        public void run() {
            lock.lock();
            while (list.size() == MAX_SIZE) {
                try {
                    System.out.println("倉庫已滿,生產者" + Thread.currentThread().getName() + "不可生產.");
                    full.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            list.add(1);
            System.out.println("生產者" + Thread.currentThread().getName() + "生產, 倉庫容量為" + list.size());
            //喚醒其他生產者與消費者執行緒
            full.signal();
            empty.signal();
            lock.unlock();
        }
    }

    class Consumer implements Runnable {

        @Override
        public void run() {
            lock.lock();
            while (list.size() == 0) {
                try {
                    System.out.println("倉庫為空,消費者" + Thread.currentThread().getName() + "不可消費.");
                    empty.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            list.removeFirst();
            System.out.println("消費者" + Thread.currentThread().getName() + "消費,倉庫容量為" + list.size());
            //喚醒其他生產者與消費者執行緒
            full.signal();
            empty.signal();
            lock.unlock();
        }
    }


    public static void main(String[] args) {
        ProAndCon2 proAndCon = new ProAndCon2();
        Producer producer = proAndCon.new Producer();
        Consumer consumer = proAndCon.new Consumer();

        for (int i = 0; i < 10; i++) {
            Thread pro = new Thread(producer);
            pro.start();
            Thread con = new Thread(consumer);
            con.start();
        }
    }


}

執行結果:

【3】使用BlockingQueue阻塞佇列

什麼是阻塞佇列?

如果向一個已經滿了的佇列中新增元素或者從空佇列中移除元素,都將會導致執行緒阻塞,執行緒一直等待到有舊元素被移除或新元素被新增的時候,才能繼續執行。符合這種情況的佇列,稱為阻塞佇列。

JDK 1.5 以後新增BlockingQueue介面,我們採用它實現類的其中兩個類,ArrayBlockingQueue或者是LinkedBlockingQueue。

這裡我們用LinkedBlockingQueue來解決生產者與消費者問題,主要用到它的兩個方法,即put()與take()

put():向阻塞佇列中新增一個元素,佇列滿時,自動阻塞。

take():從阻塞佇列中取出一個元素,佇列空時,自動阻塞。

其實LinkedBlockingQueue底層使用的仍然是Lock與Condition機制,我們從原始碼就可以看出來

put():

//..............用到了Lock與Condition機制

/** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();




//...........put方法



/**
     * Inserts the specified element at the tail of this queue, waiting if
     * necessary for space to become available.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }




//...........take方法



 public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

看得出來,LinkedBlockingQueue底層已經解決好了同步問題,我們可以很方便的使用它。

程式碼演示:

package day1024;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * 解決生產者與消費者問題
 * 採用阻塞佇列BlockingQueue
 */
public class ProAndCon3 {
    public static final int MAX_SIZE = 2;
    public static BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(MAX_SIZE);

    class Producer implements Runnable {
        @Override
        public void run() {
            if (queue.size() == MAX_SIZE) {
                System.out.println("倉庫已滿,生產者" + Thread.currentThread().getName() + "不可生產.");
            }
            try {
                queue.put(1);
                System.out.println("生產者" + Thread.currentThread().getName() + "生產, 倉庫容量為" + queue.size());

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }


    class Consumer implements Runnable {
        @Override
        public void run() {
            if (queue.size() == 0) {
                System.out.println("倉庫為空,消費者" + Thread.currentThread().getName() + "不可消費.");
            }
            try {
                queue.take();
                System.out.println("消費者" + Thread.currentThread().getName() + "消費,倉庫容量為" + queue.size());

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }


    public static void main(String[] args) {
        ProAndCon3 proAndCon = new ProAndCon3();
        Producer producer = proAndCon.new Producer();
        Consumer consumer = proAndCon.new Consumer();

        for (int i = 0; i < 10; i++) {
            Thread pro = new Thread(producer);
            pro.start();
            Thread con = new Thread(consumer);
            con.start();
        }
        
    }
}

執行結果就不貼了。