1. 程式人生 > >線程通信之生產者消費者模型

線程通信之生產者消費者模型

釋放 另一個 gpo 停止 product @override getname -s rup

  線程通信,是指線程之間的消息傳遞。

  多個線程在操作同一個資源時,它們對共享資源的操作動作可能不同;它們共享同一個資源,互為條件,相互依賴,相互通信,從而讓任務向前推進。

  另外,在線程的同步策略中,雖然可以解決並發更新同一個資源,保障資源的安全,但不能用來實現線程間的消息傳遞。因此,線程通信與線程同步往往會融合使用。

  生產者消費者模型堪稱是線程通信中的一個典型案例,我們接下來通過生產者消費者模式來進一步認識線程通信。在此,我們先對若幹概念進行了解。  

  生產者:沒有生產之前通知消費者等待,生產產品結束之後,馬上通知消費者消費

  消費者:沒有消費之前通知生產者等待,消費產品結束之後,通知生產者繼續生產產品以供消費

  線程通信:使用java中超類Object中提供的一些方法:

1 public final void wait();  //註:long timeout=0  表示線程一直等待,直到其它線程通知
2 public final native void wait(long timeout);   //線程等待指定毫秒參數的時間,超過該時間則不再等待
3 public final void wait(long timeout, int nanos);  //線程等待指定毫秒、微妙的時間,timeout最大等待時間,以毫秒為單位,nanos額外的時間,在納秒範圍0-9999994 public final native void
notify(); //喚醒一個處於等待狀態的線程 5 public final native void notifyAll(); //喚醒同一個對象上所有調用wait()方法的線程,優先級別高的線程優先運行

  需要註意的是,上述方法只能在同步方法或者同步代碼塊中使用,否則會拋出異常。

  接下來,我們以生產A-D個產品,放入倉庫,待消費者消費後,生產者再進行生產為例,看下生產者消費者模式的運行流程。

  

  1 /**
  2  * 1.共享資源緩存和操作類
  3  */
  4 public class SharedCache {
  5     //產品,此處使用char字符,作為存儲共享數據的數據類型
6 private char cache; 7 //產品消費標識,是線程間通信的信號,為true表示未消費(生產),false表示未生產(消費) 8 private boolean flag=false; 9 /* 10 生產操作(生產者):向倉庫中添加共享數據 11 */ 12 public synchronized void addSharedCacheData(char data){ 13 //產品未消費,則生產者的生產操作等待 14 if(flag){ 15 System.out.println("產品未消費,生產者的生產操作等待"); 16 try { 17 //生產者等待 18 wait(); 19 } catch (InterruptedException e) { 20 System.out.println("Thread interrupted Exception:"+e.getMessage()); 21 } 22 } 23 //產品已消費,則生產者繼續生產 24 this.cache=data; 25 //標記已生產 26 flag=true; 27 //通知消費者已生產 28 notify(); 29 System.out.println("生產者--->產品:"+data+"已生產,等待消費者消費"); 30 } 31 /* 32 消費操作(消費者):向倉庫中獲取共享數據 33 */ 34 public synchronized char getSharedCacheData(){ 35 //如果產品未生產,則消費者等待 36 if(!flag){ 37 System.out.println("產品未生產,消費者的消費操作等待"); 38 try { 39 wait(); 40 } catch (InterruptedException e) { 41 System.out.println("Thread interrupted Exception:"+e.getMessage()); 42 } 43 } 44 //標記已消費 45 flag=false; 46 //通知生產者已消費 47 notify(); 48 System.out.println("消費者--->產品:"+this.cache+"已消費,通知生產者生產"); 49 return this.cache; 50 } 51 } 52 /** 53 * 2.生產者線程類 54 */ 55 public class Producer extends Thread{ 56 //共享緩存資源類的對象 57 private SharedCache cache; 58 //構造器,傳入共享資源類的對象 59 public Producer(SharedCache cache){ 60 this.cache=cache; 61 } 62 /* 63 生產者生產產品,放入共享資源緩存類(相當於將生產的產品放入倉庫裏) 64 生產A-D類型的產品 65 */ 66 @Override 67 public void run() { 68 for(char product=A;product<=D;product++){ 69 try { 70 sleep((int)(Math.random()*3000)); 71 } catch (InterruptedException e) { 72 System.out.println("Thread interrupted Exception:"+e.getMessage()); 73 } 74 //生產產品,放入共享緩存數據類的對象裏(相當於把生產的產品放到倉庫裏) 75 cache.addSharedCacheData(product); 76 } 77 } 78 } 79 /** 80 * 3.消費者線程類 81 */ 82 public class Consumer extends Thread{ 83 //共享緩存資源類的對象 84 private SharedCache cache; 85 //構造器,傳入共享資源類的對象 86 public Consumer(SharedCache cache){ 87 this.cache=cache; 88 } 89 /* 90 消費者消費產品,獲取共享緩存類的對象裏的數據(相當於從倉庫裏提取產品) 91 當消費到D類型的產品時即停止消費 92 */ 93 @Override 94 public void run() { 95 char product=a; 96 do{ 97 try { 98 Thread.sleep((int)(Math.random()*3000)); 99 } catch (InterruptedException e) { 100 System.out.println("Thread interrupted Exception:"+e.getMessage()); 101 } 102 //消費,從倉庫取走商品 103 product=cache.getSharedCacheData(); 104 }while (product!=D); 105 } 106 } 107 /** 108 * 4.線程通信測試類 109 */ 110 public class Test { 111 public static void main(String[] args) { 112 //生產者與消費者共享同一個資源 113 SharedCache cache = new SharedCache(); 114 //啟動消費者線程 115 new Consumer(cache).start(); 116 //啟動生產者線程 117 new Producer(cache).start(); 118 } 119 }

  運行上述的測試類後,執行結果如下:

產品未生產,消費者的消費操作等待
生產者--->產品:A已生產,等待消費者消費
消費者--->產品:A已消費,通知生產者生產
生產者--->產品:B已生產,等待消費者消費
消費者--->產品:B已消費,通知生產者生產
生產者--->產品:C已生產,等待消費者消費
產品未消費,生產者的生產操作等待
消費者--->產品:C已消費,通知生產者生產
生產者--->產品:D已生產,等待消費者消費
消費者--->產品:D已消費,通知生產者生產

  我們在上面完成的生產者消費者模型,在處理線程同步問題時,主要是用了synchronized同步方法,JDK 1.5提供了多線程升級方案,將同步synchronized替換成了顯示的Lock操作,可以實現喚醒、凍結指定的線程。

  接口Lock 實現提供了比使用 synchronized 方法和語句可獲得的更廣泛的鎖定操作。Lock 可以支持多個相關的 Condition 對象,從而在使用中更加靈活。

  接口Condition可以替代傳統的線程間通信,用await()替換wait(),用signal()替換notify(),用signalAll()替換notifyAll()。該對象可以通過Lock鎖進行獲取。可以說,傳統線程的通信方式,Condition都可以實現。

  需要註意的是,Condition是被綁定到Lock上的,要創建一個Lock的Condition必須用newCondition()方法。  

  Java.util.concurrent.lock 中的Lock 框架是鎖定的一個抽象,它允許把鎖定的實現作為 Java 類,從而為Lock 的多種實現留下了空間,各種實現可能有不同的調度算法、性能特性或者鎖定語義。

  其中,ReentrantLock 類實現了Lock ,它擁有與synchronized 相同的並發性和內存語義,還添加了類似鎖投票、定時鎖等候和可中斷鎖等候的一些特性。此外,它還提供了在激烈爭用情況下更佳的性能。

  我們接下來通過ReentrantLock 類和Condition接口的實現類來完成一個生產者消費者模型。為此,我們需要創建一個ReentrantLock類的多態對象,即建立一把鎖,然後將這把鎖與兩個Condition對象關聯。我們接下來就用Lock與Condition實現一個生產者消費者模型,實現與上述例子相似的效果,代碼具體如下:

  1 import java.util.concurrent.locks.Condition;
  2 import java.util.concurrent.locks.Lock;
  3 import java.util.concurrent.locks.ReentrantLock;
  4 /**
  5  * 共享的資源
  6  */
  7 public class Resource {
  8     private char product;
  9 //    private int count = 1;
 10     //產品消費標識,是線程間通信的信號,為true表示未消費(生產),false表示未生產(消費)
 11     private boolean flag = false;
 12     //定義一個實現Lock接口的ReentrantLock類對象
 13     private Lock lock = new ReentrantLock();
 14     /*
 15     Condition是被綁定到Lock上的,
 16     要創建一個Lock的Condition,
 17     必須用Lock對象的newCondition()方法
 18      */
 19     private Condition cond_pro = lock.newCondition();
 20     //一個lock可以有多個相關的condition
 21     private Condition cond_con = lock.newCondition();
 22     /*
 23         定義生產方法
 24      */
 25     public void produce(char product) throws InterruptedException {
 26         lock.lock();//手動加同步鎖
 27         try {
 28             while (flag) {//此時若生產完一個以後喚醒了另一個生產者,則再次判斷,避免兩個生產者同時生產
 29                 System.out.println("產品未消費,生產者的生產操作等待");
 30                 cond_pro.await();
 31             }
 32             this.product = product;
 33             //標記已生產
 34             flag = true;
 35             //通知消費者已生產
 36             cond_con.signal();//喚醒消費方法,利用了condition的signal()指定喚醒對象
 37             System.out.println("生產者"+Thread.currentThread().getName()+"--->產品:"+product+"已生產,等待消費者消費");
 38         } finally {
 39             lock.unlock();//釋放鎖
 40         }
 41     }
 42     /*
 43         定義消費方法
 44      */
 45     public void consume() throws InterruptedException {
 46         lock.lock();
 47         try {
 48             while (!flag) {
 49                 System.out.println("產品未生產,消費者的消費操作等待");
 50                 cond_con.await();
 51             }
 52             //標記已消費
 53             flag = false;
 54             //通知生產者已消費
 55             cond_pro.signal();
 56             System.out.println("消費者"+Thread.currentThread().getName()+"--->產品:"+this.product+"已消費,通知生產者生產");
 57         } finally {
 58             lock.unlock();
 59         }
 60     }
 61 }
 62 /**
 63  * 生產者
 64  */
 65 public class Producer implements Runnable{
 66     private Resource res;
 67     public Producer(Resource res){
 68         this.res=res;
 69     }
 70     @Override
 71     public void run() {
 72         char product=A;
 73         while(product<E){
 74             try {
 75                 res.produce(product);
 76             } catch (InterruptedException e) {
 77                 e.printStackTrace();
 78             }
 79             product++;
 80         }
 81     }
 82 }
 83 /**
 84  * 消費者
 85  */
 86 public class Consumer implements Runnable{
 87     private Resource res;
 88     public Consumer(Resource res){
 89         this.res=res;
 90     }
 91     @Override
 92     public void run() {
 93         char product=A;
 94         while(product<E){
 95             try {
 96                 res.consume();
 97             } catch (InterruptedException e) {
 98                 e.printStackTrace();
 99             }
100             product++;
101         }
102     }
103 }
104 /**
105  * 用ReentrantLock和Condition實現生產者消費者模型
106  */
107 public class Test {
108     //入口方法
109     public static void main(String[] args) {
110         Resource res = new Resource();//生產者與消費者共享的資源
111         Producer producer = new Producer(res);//生產者
112         Consumer consumer = new Consumer(res);//消費者
113         //生產者線程與消費者線程各創建兩個
114         Thread p1 = new Thread(producer);
115         Thread p2 = new Thread(producer);
116         Thread c1 = new Thread(consumer);
117         Thread c2 = new Thread(consumer);
118         p1.start();
119         p2.start();
120         c1.start();
121         c2.start();
122     }
123 }

  上述代碼執行結果如下:

生產者Thread-0--->產品:A已生產,等待消費者消費
產品未消費,生產者的生產操作等待
消費者Thread-2--->產品:A已消費,通知生產者生產
產品未生產,消費者的消費操作等待
生產者Thread-1--->產品:A已生產,等待消費者消費
產品未消費,生產者的生產操作等待
消費者Thread-2--->產品:A已消費,通知生產者生產
產品未生產,消費者的消費操作等待
生產者Thread-0--->產品:B已生產,等待消費者消費
產品未消費,生產者的生產操作等待
消費者Thread-3--->產品:B已消費,通知生產者生產
產品未生產,消費者的消費操作等待
生產者Thread-1--->產品:B已生產,等待消費者消費
產品未消費,生產者的生產操作等待
消費者Thread-2--->產品:B已消費,通知生產者生產
產品未生產,消費者的消費操作等待
生產者Thread-0--->產品:C已生產,等待消費者消費
產品未消費,生產者的生產操作等待
消費者Thread-3--->產品:C已消費,通知生產者生產
產品未生產,消費者的消費操作等待
生產者Thread-1--->產品:C已生產,等待消費者消費
產品未消費,生產者的生產操作等待
消費者Thread-2--->產品:C已消費,通知生產者生產
生產者Thread-0--->產品:D已生產,等待消費者消費
消費者Thread-3--->產品:D已消費,通知生產者生產
產品未生產,消費者的消費操作等待
生產者Thread-1--->產品:D已生產,等待消費者消費
消費者Thread-3--->產品:D已消費,通知生產者生產  

線程通信之生產者消費者模型