1. 程式人生 > >Java執行緒(九):Condition-執行緒通訊更高效的方式

Java執行緒(九):Condition-執行緒通訊更高效的方式

        接近一週沒更新《Java執行緒》專欄了,主要是這周工作上比較忙,生活上也比較忙,呵呵,進入正題,上一篇講述了併發包下的Lock,Lock可以更好的解決執行緒同步問題,使之更面向物件,並且ReadWriteLock在處理同步時更強大,那麼同樣,執行緒間僅僅互斥是不夠的,還需要通訊,本篇的內容是基於上篇之上,使用Lock如何處理執行緒通訊。

        那麼引入本篇的主角,Condition,Condition 將 Object 監視器方法(wait、notify 和 notifyAll)分解成截然不同的物件,以便通過將這些物件與任意 Lock 實現組合使用,為每個物件提供多個等待 set (wait-set)。其中,Lock 替代了 synchronized 方法和語句的使用,Condition 替代了 Object 監視器方法的使用。下面將之前寫過的一個執行緒通訊的例子替換成用Condition實現(Java執行緒(三)),程式碼如下:

public class ThreadTest2 {
	public static void main(String[] args) {
		final Business business = new Business();
		new Thread(new Runnable() {
			@Override
			public void run() {
				threadExecute(business, "sub");
			}
		}).start();
		threadExecute(business, "main");
	}	
	public static void threadExecute(Business business, String threadType) {
		for(int i = 0; i < 100; i++) {
			try {
				if("main".equals(threadType)) {
					business.main(i);
				} else {
					business.sub(i);
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}
class Business {
	private boolean bool = true;
	private Lock lock = new ReentrantLock();
	private Condition condition = lock.newCondition(); 
	public /*synchronized*/ void main(int loop) throws InterruptedException {
		lock.lock();
		try {
			while(bool) {				
				condition.await();//this.wait();
			}
			for(int i = 0; i < 100; i++) {
				System.out.println("main thread seq of " + i + ", loop of " + loop);
			}
			bool = true;
			condition.signal();//this.notify();
		} finally {
			lock.unlock();
		}
	}	
	public /*synchronized*/ void sub(int loop) throws InterruptedException {
		lock.lock();
		try {
			while(!bool) {
				condition.await();//this.wait();
			}
			for(int i = 0; i < 10; i++) {
				System.out.println("sub thread seq of " + i + ", loop of " + loop);
			}
			bool = false;
			condition.signal();//this.notify();
		} finally {
			lock.unlock();
		}
	}
}
        在Condition中,用await()替換wait(),用signal()替換notify(),用signalAll()替換notifyAll(),傳統執行緒的通訊方式,Condition都可以實現,這裡注意,Condition是被繫結到Lock上的,要建立一個Lock的Condition必須用newCondition()方法。

        這樣看來,Condition和傳統的執行緒通訊沒什麼區別,Condition的強大之處在於它可以為多個執行緒間建立不同的Condition,下面引入API中的一段程式碼,加以說明。

class BoundedBuffer {
   final Lock lock = new ReentrantLock();//鎖物件
   final Condition notFull  = lock.newCondition();//寫執行緒條件 
   final Condition notEmpty = lock.newCondition();//讀執行緒條件 

   final Object[] items = new Object[100];//快取佇列
   int putptr/*寫索引*/, takeptr/*讀索引*/, count/*佇列中存在的資料個數*/;

   public void put(Object x) throws InterruptedException {
     lock.lock();
     try {
       while (count == items.length)//如果佇列滿了 
         notFull.await();//阻塞寫執行緒
       items[putptr] = x;//賦值 
       if (++putptr == items.length) putptr = 0;//如果寫索引寫到佇列的最後一個位置了,那麼置為0
       ++count;//個數++
       notEmpty.signal();//喚醒讀執行緒
     } finally {
       lock.unlock();
     }
   }

   public Object take() throws InterruptedException {
     lock.lock();
     try {
       while (count == 0)//如果佇列為空
         notEmpty.await();//阻塞讀執行緒
       Object x = items[takeptr];//取值 
       if (++takeptr == items.length) takeptr = 0;//如果讀索引讀到佇列的最後一個位置了,那麼置為0
       --count;//個數--
       notFull.signal();//喚醒寫執行緒
       return x;
     } finally {
       lock.unlock();
     }
   } 
 }
        這是一個處於多執行緒工作環境下的快取區,快取區提供了兩個方法,put和take,put是存資料,take是取資料,內部有個快取佇列,具體變數和方法說明見程式碼,這個快取區類實現的功能:有多個執行緒往裡面存資料和從裡面取資料,其快取佇列(先進先出後進後出)能快取的最大數值是100,多個執行緒間是互斥的,當快取佇列中儲存的值達到100時,將寫執行緒阻塞,並喚醒讀執行緒,當快取佇列中儲存的值為0時,將讀執行緒阻塞,並喚醒寫執行緒,這也是ArrayBlockingQueue的內部實現。下面分析一下程式碼的執行過程:

        1. 一個寫執行緒執行,呼叫put方法;

        2. 判斷count是否為100,顯然沒有100;

        3. 繼續執行,存入值;

        4. 判斷當前寫入的索引位置++後,是否和100相等,相等將寫入索引值變為0,並將count+1;

        5. 僅喚醒讀執行緒阻塞佇列中的一個;

        6. 一個讀執行緒執行,呼叫take方法;

        7. ……

        8. 僅喚醒寫執行緒阻塞佇列中的一個。

        這就是多個Condition的強大之處,假設快取佇列中已經存滿,那麼阻塞的肯定是寫執行緒,喚醒的肯定是讀執行緒,相反,阻塞的肯定是讀執行緒,喚醒的肯定是寫執行緒,那麼假設只有一個Condition會有什麼效果呢,快取佇列中已經存滿,這個Lock不知道喚醒的是讀執行緒還是寫執行緒了,如果喚醒的是讀執行緒,皆大歡喜,如果喚醒的是寫執行緒,那麼執行緒剛被喚醒,又被阻塞了,這時又去喚醒,這樣就浪費了很多時間。