1. 程式人生 > >Java Executor併發框架(十四)Executor框架執行緒池使用原始方式實現生產者消費者模式

Java Executor併發框架(十四)Executor框架執行緒池使用原始方式實現生產者消費者模式

       我們可以利用wait()來讓一個執行緒在某些條件下暫停執行。例如,在生產者消費者模型中,生產者執行緒在緩衝區為滿的時候,消費者在緩衝區為空的時 候,都應該暫停執行。如果某些執行緒在等待某些條件觸發,那當那些條件為真時,你可以用 notify 和 notifyAll 來通知那些等待中的執行緒重新開始執行。不同之處在於,notify 僅僅通知一個執行緒,並且我們不知道哪個執行緒會收到通知,然而 notifyAll 會通知所有等待中的執行緒。換言之,如果只有一個執行緒在等待一個訊號燈,notify和notifyAll都會通知到這個執行緒。但如果多個執行緒在等待這個信 號燈,那麼notify只會通知到其中一個,而其它執行緒並不會收到任何通知,而notifyAll會喚醒所有等待中的執行緒。在這篇文章中你將會學到如何使用 wait、notify 和 notifyAll 來實現執行緒間的通訊,從而解決生產者消費者問題。

二、如何使用wait

       儘管關於wait和notify的概念很基礎,它們也都是Object類的函式,但用它們來寫程式碼卻並不簡單。如果你在面試中讓應聘者來手寫程式碼, 用wait和notify解決生產者消費者問題,我幾乎可以肯定他們中的大多數都會無所適從或者犯下一些錯誤,例如在錯誤的地方使用 synchronized 關鍵詞,沒有對正確的物件使用wait,或者沒有遵循規範的程式碼方法。說實話,這個問題對於不常使用它們的程式設計師來說確實令人感覺比較頭疼。

       第一個問題就是,我們怎麼在程式碼裡使用wait()呢?因為wait()並不是Thread類下的函式,我們並不能使用 Thread.call()。事實上很多Java程式設計師都喜歡這麼寫,因為它們習慣了使用Thread.sleep(),所以他們會試圖使用wait() 來達成相同的目的,但很快他們就會發現這並不能順利解決問題。正確的方法是對在多執行緒間共享的那個Object來使用wait。在生產者消費者問題中,這 個共享的Object就是那個緩衝區佇列。

       第二個問題是,既然我們應該在synchronized的函式或是物件裡呼叫wait,那哪個物件應該被synchronized呢?答案是,那個 你希望上鎖的物件就應該被synchronized,即那個在多個執行緒間被共享的物件。在生產者消費者問題中,應該被synchronized的就是那個 緩衝區佇列。永遠在迴圈(loop)裡呼叫 wait 和 notify,不是在 If 語句。

       現在你知道wait應該永遠在被synchronized的背景下和那個被多執行緒共享的物件上呼叫,下一個一定要記住的問題就是,你應該永遠在 while迴圈,而不是if語句中呼叫wait。因為執行緒是在某些條件下等待的——在我們的例子裡,即“如果緩衝區佇列是滿的話,那麼生產者執行緒應該等 待”,你可能直覺就會寫一個if語句。但if語句存在一些微妙的小問題,導致即使條件沒被滿足,你的執行緒你也有可能被錯誤地喚醒。所以如果你不線上程被喚 醒後再次使用while迴圈檢查喚醒條件是否被滿足,你的程式就有可能會出錯——例如在緩衝區為滿的時候生產者繼續生成資料,或者緩衝區為空的時候消費者 開始小號資料。所以記住,永遠在while迴圈而不是if語句中使用wait!


基於以上認知,下面這個是使用wait和notify函式的規範程式碼模板:

// The standard idiom for calling the wait method in Java 
synchronized (sharedObject) { 
    while (condition) { 
         sharedObject.wait(); 
        // (Releases lock, and reacquires on wakeup) 
    } 
    // do action based upon condition e.g. take or put into queue 
} 
就像我之前說的一樣,在while迴圈裡使用wait的目的,是線上程被喚醒的前後都持續檢查條件是否被滿足。如果條件並未改變,wait被呼叫之前notify的喚醒通知就來了,那麼這個執行緒並不能保證被喚醒,有可能會導致死鎖問題。

三、wait, notify, notifyAll 範例

下面我們提供一個使用wait和notify的範例程式。在這個程式裡,我們使用了上文所述的一些程式碼規範。我們有兩個執行緒,分別名為 MyProducer(生產者)和MyCustomer(消費者),他們分別繼承了Runnable。Main執行緒開始了生產者和消費者執行緒,並聲明瞭一個LinkedList作為緩衝區佇列(在Java中,LinkedList實現了佇列的介面)。生產者在無限迴圈中持續往 LinkedList裡插入隨機整數直到LinkedList滿。我們在while(queue.isEmpty())迴圈語句中檢查這個條件。請注意到我們在做這個檢查條件之前已經在佇列物件上使用了synchronized關鍵詞,因而其它執行緒不能在 我們檢查條件時改變這個佇列。如果佇列滿了,那麼MyProducer執行緒會在MyCustomer執行緒消耗掉佇列裡的任意一個整數,並用notify來通知MyProducer執行緒之前持續等待。在我們的例子中,wait和notify都是使用在同一個共享物件上的。
package com.npf.thread.test;

import java.util.LinkedList;

public class MyProducer implements Runnable{

	private LinkedList<Integer> queue;
	
	public MyProducer(LinkedList<Integer> queue) {
		super();
		this.queue = queue;
	}

	@Override
	public void run() {
		for(int i =0 ;i<10;i++){
			synchronized (queue) {
				while(!queue.isEmpty()){
					try {
						queue.wait();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
				System.out.println("producer..."+i);
				queue.add(i);
				queue.notifyAll();
			}
		}
	}
}


package com.npf.thread.test;

import java.util.LinkedList;

public class MyCustomer implements Runnable{

	private LinkedList<Integer> queue;
	
	public MyCustomer(LinkedList<Integer> queue) {
		super();
		this.queue = queue;
	}

	@Override
	public void run() {
		for(int i =0 ;i<10;i++){
			synchronized (queue) {
				while(queue.isEmpty()){
					try {
						queue.wait();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
				System.out.println("customer..."+i);
				queue.remove();
				queue.notifyAll();
			}
		}
	}

}



測試程式碼:
package com.npf.thread.test;

import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Client {
	
	public static void main(String[] args) {
		
		LinkedList<Integer> queue = new LinkedList<Integer>();
		
		MyProducer producer = new MyProducer(queue);
		
		MyCustomer customer = new MyCustomer(queue);
		
		ExecutorService executor = Executors.newFixedThreadPool(4);
		
		executor.submit(customer);
		
		executor.submit(producer);
		
		executor.shutdown();
		
	}

}


四、測試結果


五、結論

1. 你可以使用wait和notify函式來實現執行緒間通訊。你可以用它們來實現多執行緒(>3)之間的通訊。

2. 永遠在synchronized的函式或物件裡使用wait、notify和notifyAll,不然Java虛擬機器會生成 IllegalMonitorStateException。

3. 永遠在while迴圈裡而不是if語句下使用wait。這樣,迴圈會線上程睡眠前後都檢查wait的條件,並在條件實際上並未改變的情況下處理喚醒通知。

4. 永遠在多執行緒間共享的物件(在生產者消費者模型裡即緩衝區佇列)上使用wait。

5. 基於前文提及的理由,更傾向用 notifyAll(),而不是 notify()。

這是關於Java裡如何使用wait, notify和notifyAll的所有重點啦。你應該只在你知道自己要做什麼的情況下使用這些函式,不然Java裡還有很多其它的用來解決同步問題的方 案。例如,如果你想使用生產者消費者模型的話,你也可以使用BlockingQueue,它會幫你處理所有的執行緒安全問題和流程控制。如果你想要某一個線 程等待另一個執行緒做出反饋再繼續執行,你也可以使用CycliBarrier或者CountDownLatch。如果你只是想保護某一個資源的話,你也可 以使用Semaphore。