1. 程式人生 > >ReentrantLock —— Condition 執行緒通訊更高效的方式

ReentrantLock —— Condition 執行緒通訊更高效的方式

        接近一週沒更新《Java執行緒》專欄了,主要是這周工作上比較忙,生活上也比較忙,呵呵,進入正題,上一篇講述了併發包下的Lock,Lock可以更好的解決執行緒同步問題,使之更面向物件,並且ReadWriteLock在處理同步時更強大,那麼同樣,執行緒間僅僅互斥是不夠的,還需要通訊,本篇的內容是基於上篇之上,使用Lock如何處理執行緒通訊。

        那麼引入本篇的主角,Condition,Condition 將 Object 監視器方法(wait、notify 和 notifyAll)分解成截然不同的物件,以便通過將這些物件與任意 Lock 實現組合使用,為每個物件提供多個等待 set (wait-set)。其中,Lock 替代了 synchronized 方法和語句的使用,Condition 替代了 Object 監視器方法的使用。下面將之前寫過的一個執行緒通訊的例子替換成用Condition實現(Java執行緒(三)),程式碼如下:

    public class ThreadTest2 {  
        public static void main(String[] args) {  
            final Business business = new Business();  
            new Thread(new Runnable() {  
                @Override  
                public void run() {  
                    threadExecute(business, "sub");  
                }  
            }).start();  
            threadExecute(business, "main");  
        }     
        public static void threadExecute(Business business, String threadType) {  
            for(int i = 0; i < 100; i++) {  
                try {  
                    if("main".equals(threadType)) {  
                        business.main(i);  
                    } else {  
                        business.sub(i);  
                    }  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
        }  
    }  
    class Business {  
        private boolean bool = true;  
        private Lock lock = new ReentrantLock();  
        private Condition condition = lock.newCondition();   
        public /*synchronized*/ void main(int loop) throws InterruptedException {  
            lock.lock();  
            try {  
                while(bool) {                 
                    condition.await();//this.wait();  
                }  
                for(int i = 0; i < 100; i++) {  
                    System.out.println("main thread seq of " + i + ", loop of " + loop);  
                }  
                bool = true;  
                condition.signal();//this.notify();  
            } finally {  
                lock.unlock();  
            }  
        }     
        public /*synchronized*/ void sub(int loop) throws InterruptedException {  
            lock.lock();  
            try {  
                while(!bool) {  
                    condition.await();//this.wait();  
                }  
                for(int i = 0; i < 10; i++) {  
                    System.out.println("sub thread seq of " + i + ", loop of " + loop);  
                }  
                bool = false;  
                condition.signal();//this.notify();  
            } finally {  
                lock.unlock();  
            }  
        }  
    }  

        在Condition中,用await()替換wait(),用signal()替換notify(),用signalAll()替換notifyAll(),傳統執行緒的通訊方式,Condition都可以實現,這裡注意,Condition是被繫結到Lock上的,要建立一個Lock的Condition必須用newCondition()方法。

        這樣看來,Condition和傳統的執行緒通訊沒什麼區別,Condition的強大之處在於它可以為多個執行緒間建立不同的Condition,下面引入API中的一段程式碼,加以說明。

    class BoundedBuffer {  
       final Lock lock = new ReentrantLock();//鎖物件  
       final Condition notFull  = lock.newCondition();//寫執行緒條件   
       final Condition notEmpty = lock.newCondition();//讀執行緒條件   
      
       final Object[] items = new Object[100];//快取佇列  
       int putptr/*寫索引*/, takeptr/*讀索引*/, count/*佇列中存在的資料個數*/;  
      
       public void put(Object x) throws InterruptedException {  
         lock.lock();  
         try {  
           while (count == items.length)//如果佇列滿了   
             notFull.await();//阻塞寫執行緒  
           items[putptr] = x;//賦值   
           if (++putptr == items.length) putptr = 0;//如果寫索引寫到佇列的最後一個位置了,那麼置為0  
           ++count;//個數++  
           notEmpty.signal();//喚醒讀執行緒  
         } finally {  
           lock.unlock();  
         }  
       }  
      
       public Object take() throws InterruptedException {  
         lock.lock();  
         try {  
           while (count == 0)//如果佇列為空  
             notEmpty.await();//阻塞讀執行緒  
           Object x = items[takeptr];//取值   
           if (++takeptr == items.length) takeptr = 0;//如果讀索引讀到佇列的最後一個位置了,那麼置為0  
           --count;//個數--  
           notFull.signal();//喚醒寫執行緒  
           return x;  
         } finally {  
           lock.unlock();  
         }  
       }   
     }  

        這是一個處於多執行緒工作環境下的快取區,快取區提供了兩個方法,put和take,put是存資料,take是取資料,內部有個快取佇列,具體變數和方法說明見程式碼,這個快取區類實現的功能:有多個執行緒往裡面存資料和從裡面取資料,其快取佇列(先進先出後進後出)能快取的最大數值是100,多個執行緒間是互斥的,當快取佇列中儲存的值達到100時,將寫執行緒阻塞,並喚醒讀執行緒,當快取佇列中儲存的值為0時,將讀執行緒阻塞,並喚醒寫執行緒,這也是ArrayBlockingQueue的內部實現。下面分析一下程式碼的執行過程:

        1. 一個寫執行緒執行,呼叫put方法;

        2. 判斷count是否為100,顯然沒有100;

        3. 繼續執行,存入值;

        4. 判斷當前寫入的索引位置++後,是否和100相等,相等將寫入索引值變為0,並將count+1;

        5. 僅喚醒讀執行緒阻塞佇列中的一個;

        6. 一個讀執行緒執行,呼叫take方法;

        7. ……

        8. 僅喚醒寫執行緒阻塞佇列中的一個。

        這就是多個Condition的強大之處,假設快取佇列中已經存滿,那麼阻塞的肯定是寫執行緒,喚醒的肯定是讀執行緒,相反,阻塞的肯定是讀執行緒,喚醒的肯定是寫執行緒,那麼假設只有一個Condition會有什麼效果呢,快取佇列中已經存滿,這個Lock不知道喚醒的是讀執行緒還是寫執行緒了,如果喚醒的是讀執行緒,皆大歡喜,如果喚醒的是寫執行緒,那麼執行緒剛被喚醒,又被阻塞了,這時又去喚醒,這樣就浪費了很多時間。