生產者與消費者模型

20180827.jpg
前言簡介
生產者和消費者問題是執行緒模型中的經典問題:生產者和消費者在同一時間段內共用同一個儲存空間,生產者往儲存空間中新增產品,消費者從儲存空間中取走產品,當儲存空間為空時,消費者阻塞,當儲存空間滿時,生產者阻塞。

595523d4de338.png
舉例說明:
- 你把信寫好——相當於生產者製造資料
- 你把信放入郵筒——相當於生產者把資料放入緩衝區
- 郵遞員把信從郵筒取出——相當於消費者把資料取出緩衝區
- 郵遞員把信拿去郵局做相應的處理——相當於消費者處理資料
具體實現方式
為什麼要使用生產者和消費者模式
線上程世界裡,生產者就是生產資料的執行緒,消費者就是消費資料的執行緒。在多執行緒開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產資料。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。
生產者和消費者問題的不同實現方式
1. 不完善的實現(會導致死鎖)
int itemCount = 0;//總數量 procedure producer() {//生產者 while (true) { item = produceItem();//生產一個 if (itemCount == BUFFER_SIZE) {//生產滿則睡眠 sleep(); } putItemIntoBuffer(item);//緩衝區放入一個 itemCount = itemCount + 1; if (itemCount == 1) { wakeup(consumer);//喚醒消費者 } } } procedure consumer() {//消費者 while (true) { if (itemCount == 0) {//消費完則睡眠 sleep(); } item = removeItemFromBuffer();//緩衝區減少一個 itemCount = itemCount - 1; if (itemCount == BUFFER_SIZE - 1) { wakeup(producer);//喚醒生產者 } consumeItem(item);//消費一個 } }
上面程式碼中的問題在於它可能導致競爭條件,進而引發死鎖。考慮下面的情形:
- 消費者把最後一個 itemCount 的內容讀出來,注意它現在是零。消費者返回到while的起始處,現在進入 if 塊;
- 就在呼叫sleep之前,CPU決定將時間讓給生產者,於是消費者在執行 sleep 之前就被中斷了,生產者開始執行;
- 生產者生產出一項資料後將其放入緩衝區,然後在 itemCount 上加 1;
- 由於緩衝區在上一步加 1 之前為空,生產者嘗試喚醒消費者;
- 遺憾的是,消費者並沒有在休眠,喚醒指令不起作用。當消費者恢復執行的時候,執行 sleep,一覺不醒。出現這種情況的原因在於,消費者只能被生產者在 itemCount 為 1 的情況下喚醒;
- 生產者不停地迴圈執行,直到緩衝區滿,隨後進入休眠。
由於兩個執行緒都進入了永遠的休眠,死鎖情況出現了。因此,該演算法是不完善的。
2. 使用訊號燈的演算法
semaphore fillCount = 0; // 生產的專案 總存量 semaphore emptyCount = BUFFER_SIZE; // 剩餘空間 procedure producer() { while (true) { item = produceItem();//生產 down(emptyCount);//減少剩餘空間 putItemIntoBuffer(item);//緩衝區增加 up(fillCount);//增加存量 } } procedure consumer() { while (true) { down(fillCount);//減少存量 item = removeItemFromBuffer();//緩衝區減少 up(emptyCount);//增加剩餘空間 consumeItem(item);//消費 } }
上述方法在只有一個生產者和一個消費者時能解決問題。對於多個生產者或者多個消費者共享緩衝區的情況,該演算法也會導致競爭條件,出現兩個或以上的程序同時讀或寫同一個緩衝區槽的情況。
為了解決這個問題,需要在保證同一時刻只有一個生產者能夠執行 putItemIntoBuffer()。也就是說,需要尋找一種方法來互斥地執行臨界區的程式碼。為了達到這個目的,可引入一個二值訊號燈 mutex,其值只能為 1 或者 0。如果把執行緒放入 down(mutex) 和 up(mutex) 之間,就可以限制只有一個執行緒能被執行。多生產者、消費者的解決演算法如下
semaphore mutex = 1; semaphore fillCount = 0; semaphore emptyCount = BUFFER_SIZE; procedure producer() { while (true) { item = produceItem(); down(emptyCount); down(mutex);//獲取鎖 putItemIntoBuffer(item); up(mutex);//釋放鎖 up(fillCount); } } procedure consumer() { while (true) { down(fillCount); down(mutex); item = removeItemFromBuffer(); up(mutex); up(emptyCount); consumeItem(item); } }
3. 使用管程的演算法
monitor ProducerConsumer { int itemCount condition full; condition empty; procedure add(item) { while (itemCount == BUFFER_SIZE) wait(full); putItemIntoBuffer(item); itemCount = itemCount + 1; if (itemCount == 1) notify(empty); } procedure remove() { while (itemCount == 0) wait(empty); item = removeItemFromBuffer(); itemCount = itemCount - 1; if (itemCount == BUFFER_SIZE - 1) notify(full); return item; } } procedure producer() { while (true) { item = produceItem() ProducerConsumer.add(item) } } procedure consumer() { while (true) { item = ProducerConsumer.remove() consumeItem(item) } }
注意程式碼中 while 語句的用法,都是用在測試緩衝區是否已滿或空的時候。當存在多個消費者時,有可能造成競爭條件的情況是:某一消費者在一項資料被放入緩衝區中時被喚醒,但是另一消費者已經在管程上等待了一段時間並移除了這項資料。如果 while 語句被改成 if,則會出現放入緩衝區的資料項過多,或移除空緩衝區中的元素的情況。
java的5種實現方式
1. wait()和notify()方法的實現
這也是最簡單最基礎的實現,緩衝區滿和為空時都呼叫wait()方法等待,當生產者生產了一個產品或者消費者消費了一個產品之後會喚醒所有執行緒。
/** * @author shangjing * @date 2018/11/22 3:26 PM * @describe wait,notify實現 */ public class WaitTest { private static int count = 0; private static final int buffCount = 10; private static String lock = "lock"; class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (lock) { while (count == buffCount) { try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } count++; System.out.println(Thread.currentThread().getName() + "-生產者生產,數量為:" + count); lock.notifyAll(); } } } } class Consumer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (lock) { while (count == 0) { try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } count--; System.out.println(Thread.currentThread().getName() + "-消費者消費,數量為:"+ count); lock.notifyAll(); } } } } public static void main(String[] args) { WaitTest waitTest = new WaitTest(); new Thread(waitTest.new Producer()).start(); new Thread(waitTest.new Consumer()).start(); new Thread(waitTest.new Producer()).start(); new Thread(waitTest.new Consumer()).start(); new Thread(waitTest.new Producer()).start(); new Thread(waitTest.new Consumer()).start(); } }
2. 可重入鎖ReentrantLock的實現
java.util.concurrent.lock 中的 Lock 框架是鎖定的一個抽象,通過對lock的lock()方法和unlock()方法實現了對鎖的顯示控制,而synchronize()則是對鎖的隱性控制。
可重入鎖,也叫做遞迴鎖,指的是同一執行緒 外層函式獲得鎖之後 ,內層遞迴函式仍然有獲取該鎖的程式碼,但不受影響,簡單來說,該鎖維護這一個與獲取鎖相關的計數器,如果擁有鎖的某個執行緒再次得到鎖,那麼獲取計數器就加1,函式呼叫結束計數器就減1,然後鎖需要被釋放兩次才能獲得真正釋放。已經獲取鎖的執行緒進入其他需要相同鎖的同步程式碼塊不會被阻塞。
/** * @author shangjing * @date 2018/11/22 3:53 PM * @describe */ public class LockTest { private static int count = 0; private static final int buffCount = 10; private static Lock lock = new ReentrantLock(); //建立兩個條件變數,一個為緩衝區非滿,一個為緩衝區非空 private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } lock.lock(); try { while (count == buffCount) { try { notFull.await(); } catch (InterruptedException e) { e.printStackTrace(); } } count++; System.out.println(Thread.currentThread().getName() + "-生產者生產,數量為:" + count); notEmpty.signal(); } finally { lock.unlock(); } } } } class Consumer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } lock.lock(); try { while (count == 0) { try { notEmpty.await(); } catch (InterruptedException e) { e.printStackTrace(); } } count--; System.out.println(Thread.currentThread().getName() + "-消費者消費,數量為:"+ count); notFull.signal(); } finally { lock.unlock(); } } } } public static void main(String[] args) { LockTest lockTest = new LockTest(); new Thread(lockTest.new Producer()).start(); new Thread(lockTest.new Consumer()).start(); new Thread(lockTest.new Producer()).start(); new Thread(lockTest.new Consumer()).start(); new Thread(lockTest.new Producer()).start(); new Thread(lockTest.new Consumer()).start(); } }
3. 阻塞佇列BlockingQueue的實現(最簡單)
BlockingQueue即阻塞佇列,從阻塞這個詞可以看出,在某些情況下對阻塞佇列的訪問可能會造成阻塞。被阻塞的情況主要有如下兩種:
當佇列滿了的時候進行入佇列操作
當佇列空了的時候進行出佇列操作
因此,當一個執行緒對已經滿了的阻塞佇列進行入隊操作時會阻塞,除非有另外一個執行緒進行了出隊操作,當一個執行緒對一個空的阻塞佇列進行出隊操作時也會阻塞,除非有另外一個執行緒進行了入隊操作。
從上可知,阻塞佇列是執行緒安全的。
/** * @author shangjing * @date 2018/11/22 4:05 PM * @describe */ public class BlockingQueueTest { private static int count = 0; private final BlockingQueue blockingQueue = new LinkedBlockingQueue(10); class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } try { blockingQueue.put(1); count++; System.out.println(Thread.currentThread().getName() + "-生產者生產,數量為:" + count); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Consumer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } try { blockingQueue.take(); count--; System.out.println(Thread.currentThread().getName() + "-消費者消費,數量為:"+ count); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void main(String[] args) { BlockingQueueTest blockingQueueTest = new BlockingQueueTest(); new Thread(blockingQueueTest.new Producer()).start(); new Thread(blockingQueueTest.new Consumer()).start(); new Thread(blockingQueueTest.new Producer()).start(); new Thread(blockingQueueTest.new Consumer()).start(); new Thread(blockingQueueTest.new Producer()).start(); new Thread(blockingQueueTest.new Consumer()).start(); } }
4. 訊號量Semaphore的實現
Semaphore(訊號量)是用來控制同時訪問特定資源的執行緒數量,它通過協調各個執行緒,以保證合理的使用公共資源,在作業系統中是一個非常重要的問題,可以用來解決哲學家就餐問題。Java中的Semaphore維護了一個許可集,一開始先設定這個許可集的數量,可以使用acquire()方法獲得一個許可,當許可不足時會被阻塞,release()新增一個許可。在下列程式碼中,還加入了另外一個mutex訊號量,維護生產者消費者之間的同步關係,保證生產者和消費者之間的交替進行
/** * @author shangjing * @date 2018/11/22 4:20 PM * @describe */ public class SemaphoreTest { private static int count = 0; //建立三個訊號量 private final Semaphore notFull = new Semaphore(10); private final Semaphore notEmpty = new Semaphore(0); private final Semaphore mutex = new Semaphore(1); class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } try { notFull.acquire();//獲取許可 mutex.acquire(); count++; System.out.println(Thread.currentThread().getName() + "-生產者生產,數量為:" + count); } catch (InterruptedException e) { e.printStackTrace(); }finally { mutex.release();//釋放 notEmpty.release(); } } } } class Consumer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } try { notEmpty.acquire(); mutex.acquire(); count--; System.out.println(Thread.currentThread().getName() + "-消費者消費,數量為:"+ count); } catch (InterruptedException e) { e.printStackTrace(); } finally { mutex.release(); notFull.release(); } } } } public static void main(String[] args) { SemaphoreTest semaphoreTest = new SemaphoreTest(); new Thread(semaphoreTest.new Producer()).start(); new Thread(semaphoreTest.new Consumer()).start(); new Thread(semaphoreTest.new Producer()).start(); new Thread(semaphoreTest.new Consumer()).start(); new Thread(semaphoreTest.new Producer()).start(); new Thread(semaphoreTest.new Consumer()).start(); } }
5. 管道輸入輸出流PipedInputStream和PipedOutputStream實現
在java的io包下,PipedOutputStream和PipedInputStream分別是管道輸出流和管道輸入流。
它們的作用是讓多執行緒可以通過管道進行執行緒間的通訊。在使用管道通訊時,必須將PipedOutputStream和PipedInputStream配套使用。
使用方法:先建立一個管道輸入流和管道輸出流,然後將輸入流和輸出流進行連線,用生產者執行緒往管道輸出流中寫入資料,消費者在管道輸入流中讀取資料,這樣就可以實現了不同執行緒間的相互通訊,但是這種方式在生產者和生產者、消費者和消費者之間不能保證同步,也就是說在一個生產者和一個消費者的情況下是可以生產者和消費者之間交替執行的,多個生成者和多個消費者者之間則不行
/** * @author shangjing * @date 2018/11/22 4:29 PM * @describe */ public class PipedTest { private final PipedInputStream pis = new PipedInputStream(); private final PipedOutputStream pos = new PipedOutputStream(); { try { pis.connect(pos); } catch (IOException e) { e.printStackTrace(); } } class Producer implements Runnable { @Override public void run() { try { while(true) { Thread.sleep(1000); int num = (int) (Math.random() * 255); System.out.println(Thread.currentThread().getName() + "生產者生產了一個數字,該數字為: " + num); pos.write(num); pos.flush(); } } catch (Exception e) { e.printStackTrace(); } finally { try { pos.close(); pis.close(); } catch (IOException e) { e.printStackTrace(); } } } } class Consumer implements Runnable { @Override public void run() { try { while(true) { Thread.sleep(1000); int num = pis.read(); System.out.println("消費者消費了一個數字,該數字為:" + num); } } catch (Exception e) { e.printStackTrace(); } finally { try { pos.close(); pis.close(); } catch (IOException e) { e.printStackTrace(); } } } } public static void main(String[] args) { PipedTest pipedTest = new PipedTest(); new Thread(pipedTest.new Producer()).start(); new Thread(pipedTest.new Consumer()).start(); } }
消費者生產者並行的優化實現
上面的實現方式生產者和消費者是互斥的,效率並不是最好。可以採用多個生產者(多個消費者)序列執行,生產者與消費者之間並行執行,提升效率。
更高併發效能的Lock實現:
需要兩個鎖 CONSUME_LOCK與PRODUCE_LOCK,CONSUME_LOCK控制消費者執行緒併發出隊,PRODUCE_LOCK控制生產者執行緒併發入隊;相應需要兩個條件變數NOT_EMPTY與NOT_FULL,NOT_EMPTY負責控制消費者執行緒的狀態(阻塞、執行),NOT_FULL負責控制生產者執行緒的狀態(阻塞、執行)。以此讓優化消費者與消費者(或生產者與生產者)之間是序列的;消費者與生產者之間是並行的。