1. 程式人生 > >Java設計模式—生產者消費者模式(阻塞佇列實現)

Java設計模式—生產者消費者模式(阻塞佇列實現)

生產者消費者模式是併發、多執行緒程式設計中經典的設計模式,生產者和消費者通過分離的執行工作解耦,簡化了開發模式,生產者和消費者可以以不同的速度生產和消費資料。這篇文章我們來看看什麼是生產者消費者模式,這個問題也是多執行緒面試題中經常被提及的。如何使用阻塞佇列(Blocking Queue)解決生產者消費者模式,以及使用生產者消費者模式的好處。

真實世界中的生產者消費者模式

生產者和消費者模式在生活當中隨處可見,它描述的是協調與協作的關係。比如一個人正在準備食物(生產者),而另一個人正在吃(消費者),他們使用一個共用的桌子用於放置盤子和取走盤子,生產者準備食物,如果桌子上已經滿了就等待,消費者(那個吃的)等待如果桌子空了的話。這裡桌子就是一個共享的物件。在Java Executor框架自身實現了生產者消費者模式它們分別負責新增和執行任務。

生產者消費者模式的好處

它的確是一種實用的設計模式,常用於編寫多執行緒或併發程式碼。下面是它的一些優點:

  1.  它簡化的開發,你可以獨立地或併發的編寫消費者和生產者,它僅僅只需知道共享物件是誰
  2. 生產者不需要知道誰是消費者或者有多少消費者,對消費者來說也是一樣
  3.  生產者和消費者可以以不同的速度執行
  4.  分離的消費者和生產者在功能上能寫出更簡潔、可讀、易維護的程式碼

多執行緒中的生產者消費者問題

生產者消費者問題是一個流行的面試題,面試官會要求你實現生產者消費者設計模式,以至於能讓生產者應等待如果佇列或籃子滿了的話,消費者等待如果佇列或者籃子是空的。這個問題可以用不同的方式來現實,經典的方法是使用wait和notify方法在生產者和消費者執行緒中合作,在佇列滿了或者佇列是空的條件下阻塞,Java5的阻塞佇列(BlockingQueue)資料結構更簡單,因為它隱含的提供了這些控制,現在你不需要使用wait和nofity在生產者和消費者之間通訊了,阻塞佇列的put()方法將阻塞如果佇列滿了,佇列take()方法將阻塞如果佇列是空的。在下部分我們可以看到程式碼例子。

使用阻塞佇列實現生產者消費者模式

阻塞佇列實現生產者消費者模式超級簡單,它提供開箱即用支援阻塞的方法put()和take(),開發者不需要寫困惑的wait-nofity程式碼去實現通訊。BlockingQueue 一個介面,Java5提供了不同的現實,如ArrayBlockingQueue和LinkedBlockingQueue,兩者都是先進先出(FIFO)順序。而ArrayLinkedQueue是自然有界的,LinkedBlockingQueue可選的邊界。下面這是一個完整的生產者消費者程式碼例子,對比傳統的wait、nofity程式碼,它更易於理解。

點選(此處)摺疊或開啟

  1. import
     java.util.concurrent.BlockingQueue;
  2. import java.util.concurrent.LinkedBlockingQueue;
  3. import java.util.logging.Level;
  4. import java.util.logging.Logger;
  5. public class ProducerConsumerPattern {
  6.     public static void main(String args[]){
  7.      //Creating shared object
  8.      BlockingQueue sharedQueue = new LinkedBlockingQueue();
  9.      //Creating Producer and Consumer Thread
  10.      Thread prodThread = new Thread(new Producer(sharedQueue));
  11.      Thread consThread = new Thread(new Consumer(sharedQueue));
  12.      //Starting producer and Consumer thread
  13.      prodThread.start();
  14.      consThread.start();
  15.     }
  16. }
  17. //Producer Class in java
  18. class Producer implements Runnable {
  19.     private final BlockingQueue sharedQueue;
  20.     public Producer(BlockingQueue sharedQueue) {
  21.         this.sharedQueue = sharedQueue;
  22.     }
  23.     @Override
  24.     public void run() {
  25.         for(int i=0; i<10; i++){
  26.             try {
  27.                 System.out.println("Produced: " + i);
  28.                 sharedQueue.put(i);
  29.             } catch (InterruptedException ex) {
  30.                 Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
  31.             }
  32.         }
  33.     }
  34. }
  35. //Consumer Class in Java
  36. class Consumer implements Runnable{
  37.     private final BlockingQueue sharedQueue;
  38.     public Consumer (BlockingQueue sharedQueue) {
  39.         this.sharedQueue = sharedQueue;
  40.     }
  41.     @Override
  42.     public void run() {
  43.         while(true){
  44.             try {
  45.                 System.out.println("Consumed: "+ sharedQueue.take());
  46.             } catch (InterruptedException ex) {
  47.                 Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
  48.             }
  49.         }
  50.     }
  51. }
  52. Output:
  53. Produced: 0
  54. Produced: 1
  55. Consumed: 0
  56. Produced: 2
  57. Consumed: 1
  58. Produced: 3
  59. Consumed: 2
  60. Produced: 4
  61. Consumed: 3
  62. Produced: 5
  63. Consumed: 4
  64. Produced: 6
  65. Consumed: 5
  66. Produced: 7
  67. Consumed: 6
  68. Produced: 8
  69. Consumed: 7
  70. Produced: 9
  71. Consumed: 8
  72. Consumed: 9