多執行緒,生產者消費者模式經典講解,簡單易懂2
阿新 • • 發佈:2018-11-19
本模式以一個經典練習為案例:
使用2種鎖機制實現生產者和消費者模式
要求
練習(生產者消費者模式):
自定義同步容器,容器容量上限為10。可以在多執行緒中應用,並保證資料執行緒安全。
使用synchronized同步及wait()和notifyAll() 實現生產者消費者模式
邏輯圖描述:
/** * 生產者消費者 * wait¬ify * wait/notify都是和while配合應用的。可以避免多執行緒併發判斷邏輯失效問題。 */ package concurrent.t04; import java.util.LinkedList; import java.util.concurrent.TimeUnit; public class TestContainer01<E> { private final LinkedList<E> list = new LinkedList<>(); private final int MAX = 10; private int count = 0; public synchronized int getCount(){ return count; } public synchronized void put(E e){ while(list.size() == MAX){ try { this.wait(); } catch (InterruptedException e1) { e1.printStackTrace(); } } list.add(e); count++; this.notifyAll(); } public synchronized E get(){ E e = null; while(list.size() == 0){ try{ this.wait(); } catch (InterruptedException e1) { e1.printStackTrace(); } } e = list.removeFirst(); count--; this.notifyAll(); return e; } public static void main(String[] args) { final TestContainer01<String> c = new TestContainer01<>(); for(int i = 0; i < 10; i++){ new Thread(new Runnable() { @Override public void run() { for(int j = 0; j < 5; j++){ System.out.println(c.get()); } } }, "consumer"+i).start(); } try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } for(int i = 0; i < 2; i++){ new Thread(new Runnable() { @Override public void run() { for(int j = 0; j < 25; j++){ c.put("container value " + j); } } }, "producer"+i).start(); } } }
使用可重入鎖 ReentrantLock 實現生產者消費者模式
邏輯同上:
/** * 生產者消費者 * 重入鎖&條件 * 條件 - Condition, 為Lock增加條件。當條件滿足時,做什麼事情,如加鎖或解鎖。如等待或喚醒 */ package concurrent.t04; import java.io.IOException; import java.util.LinkedList; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class TestContainer02<E> { private final LinkedList<E> list = new LinkedList<>(); private final int MAX = 10; private int count = 0; private Lock lock = new ReentrantLock(); private Condition producer = lock.newCondition(); private Condition consumer = lock.newCondition(); public int getCount(){ return count; } public void put(E e){ lock.lock(); try { while(list.size() == MAX){ System.out.println(Thread.currentThread().getName() + " 等待。。。"); // 進入等待佇列。釋放鎖標記。 // 藉助條件,進入的等待佇列。 producer.await(); } System.out.println(Thread.currentThread().getName() + " put 。。。"); list.add(e); count++; // 藉助條件,喚醒所有的消費者。 consumer.signalAll(); } catch (InterruptedException e1) { e1.printStackTrace(); } finally { lock.unlock(); } } public E get(){ E e = null; lock.lock(); try { while(list.size() == 0){ System.out.println(Thread.currentThread().getName() + " 等待。。。"); // 藉助條件,消費者進入等待佇列 consumer.await(); } System.out.println(Thread.currentThread().getName() + " get 。。。"); e = list.removeFirst(); count--; // 藉助條件,喚醒所有的生產者 producer.signalAll(); } catch (InterruptedException e1) { e1.printStackTrace(); } finally { lock.unlock(); } return e; } public static void main(String[] args) { final TestContainer02<String> c = new TestContainer02<>(); for(int i = 0; i < 10; i++){ new Thread(new Runnable() { @Override public void run() { for(int j = 0; j < 5; j++){ System.out.println(c.get()); } } }, "consumer"+i).start(); } try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e1) { e1.printStackTrace(); } for(int i = 0; i < 2; i++){ new Thread(new Runnable() { @Override public void run() { for(int j = 0; j < 25; j++){ c.put("container value " + j); } } }, "producer"+i).start(); } } }
以上消費者生產者邏輯本人理解,有不對的地方,希望指出!