【JAVA多執行緒】如何解決一個生產者與消費者問題
如何解決一個生產者與消費者問題
生產者與消費者問題是多執行緒同步的一個經典問題。生產者和消費者同時使用一塊緩衝區,生產者生產商品放入緩衝區,消費者從緩衝區中取出商品。我們需要保證的是,當緩衝區滿時,生產者不可生產商品;當緩衝區為空時,消費者不可取出商品。
下面介紹java中幾種解決同步問題的方式
(1)wait()與notify()
(2)Lock與Condition機制
(3)BlockingQueue阻塞佇列
【1】wait()與notify()
這兩個方法是object類中的方法
wait()用在以下場合:
(1)當緩衝區滿時,緩衝區呼叫wait()方法,使得生產者釋放鎖,當前執行緒阻塞,其他執行緒可以獲得鎖。
(2)當緩衝區空時,緩衝區呼叫wait()方法,使得消費者釋放鎖,當前執行緒阻塞,其他執行緒可以獲得鎖。
notify()用在以下場合:
(1)當緩衝區未滿時,生產者生產商品放入緩衝區,然後緩衝區呼叫notify()方法,通知上一個因wait()方法釋放鎖的執行緒現在可以去獲得鎖了,同步塊程式碼執行完成後,釋放物件鎖,此處的物件鎖,鎖住的是緩衝區。
(2)當緩衝區不為空時,消費者從緩衝區中取出商品,然後緩衝區呼叫notify()方法,通知上一個因wait()方法釋放鎖的執行緒現在可以去獲得鎖了,同步塊程式碼執行完成後,釋放物件鎖。
程式碼演示
package day1101; import java.util.LinkedList; /** * 生產者消費者問題 */ public class ProAndCon { //最大容量 public static final int MAX_SIZE = 2; //儲存媒介 public static LinkedList<Integer> list = new LinkedList<>(); class Producer implements Runnable { @Override public void run() { synchronized (list) { //倉庫容量已經達到最大值 while (list.size() == MAX_SIZE) { System.out.println("倉庫已滿,生產者" + Thread.currentThread().getName() + "不可生產."); try { list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } list.add(1); System.out.println("生產者" + Thread.currentThread().getName() + "生產, 倉庫容量為" + list.size()); list.notify(); } } } class Consumer implements Runnable { @Override public void run() { synchronized (list) { while (list.size() == 0) { System.out.println("倉庫為空,消費者" + Thread.currentThread().getName() + "不可消費."); try { list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } list.removeFirst(); System.out.println("消費者" + Thread.currentThread().getName() + "消費,倉庫容量為" + list.size()); list.notify(); } } } public static void main(String[] args) { ProAndCon proAndCon = new ProAndCon(); Producer producer = proAndCon.new Producer(); Consumer consumer = proAndCon.new Consumer(); for (int i = 0; i < 10; i++) { Thread pro = new Thread(producer); pro.start(); Thread con = new Thread(consumer); con.start(); } } }
執行結果:
【2】Lock與Condition機制
在JDK5.0之後,Java提供了Lock與Condition機制。Condition介面的await()和signal()是用來做同步的兩種方法,它們的功能基本上和Object的wait()、nofity()相同,或者說可以取代它們,但是它們和Lock機制是直接掛鉤的。通過在Lock物件上呼叫newCondition()方法,將條件變數和一個鎖物件進行繫結,進而控制併發程式訪問競爭資源的安全。
程式碼演示
package day1101; import java.util.LinkedList; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ProAndCon2 { public static final int MAX_SIZE = 2; public static LinkedList<Integer> list = new LinkedList<>(); public static Lock lock = new ReentrantLock(); //倉庫滿的條件變數 public static Condition full = lock.newCondition(); //倉庫空的條件變數 public static Condition empty = lock.newCondition(); class Producer implements Runnable { @Override public void run() { lock.lock(); while (list.size() == MAX_SIZE) { try { System.out.println("倉庫已滿,生產者" + Thread.currentThread().getName() + "不可生產."); full.await(); } catch (InterruptedException e) { e.printStackTrace(); } } list.add(1); System.out.println("生產者" + Thread.currentThread().getName() + "生產, 倉庫容量為" + list.size()); //喚醒其他生產者與消費者執行緒 full.signal(); empty.signal(); lock.unlock(); } } class Consumer implements Runnable { @Override public void run() { lock.lock(); while (list.size() == 0) { try { System.out.println("倉庫為空,消費者" + Thread.currentThread().getName() + "不可消費."); empty.await(); } catch (InterruptedException e) { e.printStackTrace(); } } list.removeFirst(); System.out.println("消費者" + Thread.currentThread().getName() + "消費,倉庫容量為" + list.size()); //喚醒其他生產者與消費者執行緒 full.signal(); empty.signal(); lock.unlock(); } } public static void main(String[] args) { ProAndCon2 proAndCon = new ProAndCon2(); Producer producer = proAndCon.new Producer(); Consumer consumer = proAndCon.new Consumer(); for (int i = 0; i < 10; i++) { Thread pro = new Thread(producer); pro.start(); Thread con = new Thread(consumer); con.start(); } } }
執行結果:
【3】使用BlockingQueue阻塞佇列
什麼是阻塞佇列?
如果向一個已經滿了的佇列中新增元素或者從空佇列中移除元素,都將會導致執行緒阻塞,執行緒一直等待到有舊元素被移除或新元素被新增的時候,才能繼續執行。符合這種情況的佇列,稱為阻塞佇列。
JDK 1.5 以後新增BlockingQueue介面,我們採用它實現類的其中兩個類,ArrayBlockingQueue或者是LinkedBlockingQueue。
這裡我們用LinkedBlockingQueue來解決生產者與消費者問題,主要用到它的兩個方法,即put()與take()
put():向阻塞佇列中新增一個元素,佇列滿時,自動阻塞。
take():從阻塞佇列中取出一個元素,佇列空時,自動阻塞。
其實LinkedBlockingQueue底層使用的仍然是Lock與Condition機制,我們從原始碼就可以看出來
put():
//..............用到了Lock與Condition機制
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
//...........put方法
/**
* Inserts the specified element at the tail of this queue, waiting if
* necessary for space to become available.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
//...........take方法
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
看得出來,LinkedBlockingQueue底層已經解決好了同步問題,我們可以很方便的使用它。
程式碼演示:
package day1024;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* 解決生產者與消費者問題
* 採用阻塞佇列BlockingQueue
*/
public class ProAndCon3 {
public static final int MAX_SIZE = 2;
public static BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(MAX_SIZE);
class Producer implements Runnable {
@Override
public void run() {
if (queue.size() == MAX_SIZE) {
System.out.println("倉庫已滿,生產者" + Thread.currentThread().getName() + "不可生產.");
}
try {
queue.put(1);
System.out.println("生產者" + Thread.currentThread().getName() + "生產, 倉庫容量為" + queue.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
if (queue.size() == 0) {
System.out.println("倉庫為空,消費者" + Thread.currentThread().getName() + "不可消費.");
}
try {
queue.take();
System.out.println("消費者" + Thread.currentThread().getName() + "消費,倉庫容量為" + queue.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
ProAndCon3 proAndCon = new ProAndCon3();
Producer producer = proAndCon.new Producer();
Consumer consumer = proAndCon.new Consumer();
for (int i = 0; i < 10; i++) {
Thread pro = new Thread(producer);
pro.start();
Thread con = new Thread(consumer);
con.start();
}
}
}
執行結果就不貼了。