1. 程式人生 > >Java併發控制:ReentrantLock Condition的使用

Java併發控制:ReentrantLock Condition的使用

生產者-消費者(producer-consumer)問題,也稱作有界緩衝區(bounded-buffer)問題,兩個程序共享一個公共的固定大小的緩衝區。

其中一個是生產者,用於將訊息放入緩衝區;另外一個是消費者,用於從緩衝區中取出訊息。
問題出現在當緩衝區已經滿了,而此時生產者還想向其中放入一個新的資料項的情形,其解決方法是讓生產者此時進行休眠,等待消費者從緩衝區中取走了一個或者多個數據後再去喚醒它。
同樣地,當緩衝區已經空了,而消費者還想去取訊息,此時也可以讓消費者進行休眠,等待生產者放入一個或者多個數據時再喚醒它。

ConditionObject 監視器方法(wait、notify 和 notifyAll

)分解成截然不同的物件,以便通過將這些物件與任意 Lock 實現組合使用,為每個物件提供 waitsignal 方法。
其中,Lock 替代了 synchronized 方法和語句的使用,Condition 替代了 Object 監視器方法的使用。

Condition 中,用 await() 替換 wait(),用 signal() 替換 notify(),用signalAll() 替換 notifyAll(),傳統執行緒的通訊方式,Condition 都可以實現,這裡注意,Condition 是被繫結到 Lock 上的,要建立一個 LockCondition 必須用 newCondition()

方法。

例1

class BoundedBuffer {
   final Lock lock = new ReentrantLock();//鎖物件
   final Condition notFull  = lock.newCondition();//寫執行緒條件 
   final Condition notEmpty = lock.newCondition();//讀執行緒條件 

   final Object[] items = new Object[100];//快取佇列
   int putptr/*寫索引*/, takeptr/*讀索引*/, count/*佇列中存在的資料個數*/;

   public
void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length)//如果佇列滿了 notFull.await();//阻塞寫執行緒 items[putptr] = x;//賦值 if (++putptr == items.length) putptr = 0;//如果寫索引寫到佇列的最後一個位置了,那麼置為0 ++count;//個數++ notEmpty.signal();//喚醒讀執行緒 } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0)//如果佇列為空 notEmpty.await();//阻塞讀執行緒 Object x = items[takeptr];//取值 if (++takeptr == items.length) takeptr = 0;//如果讀索引讀到佇列的最後一個位置了,那麼置為0 --count;//個數-- notFull.signal();//喚醒寫執行緒 return x; } finally { lock.unlock(); } } }

這是一個處於多執行緒工作環境下的快取區,快取區提供了兩個方法,puttakeput 是存資料,take 是取資料,內部有個快取佇列。

這個快取區類實現的功能:有多個執行緒往裡面存資料和從裡面取資料,其快取佇列(先進先出後進後出)能快取的最大數值是100,多個執行緒間是互斥的,當快取佇列中儲存的值達到100時,將寫執行緒阻塞,並喚醒讀執行緒,當快取佇列中儲存的值為0時,將讀執行緒阻塞,並喚醒寫執行緒,這也是 ArrayBlockingQueue 的內部實現。

下面分析一下程式碼的執行過程:

  1. 一個寫執行緒執行,呼叫 put 方法;

  2. 判斷 count 是否為100,顯然沒有100;

  3. 繼續執行,存入值;

  4. 判斷當前寫入的索引位置++後,是否和100相等,相等將寫入索引值變為0,並將count+1;

  5. 僅喚醒讀執行緒阻塞佇列中的一個;

  6. 一個讀執行緒執行,呼叫take方法;

  7. ……

  8. 僅喚醒寫執行緒阻塞佇列中的一個。

這就是多個 Condition 的強大之處,假設快取佇列中已經存滿,那麼阻塞的肯定是寫執行緒,喚醒的肯定是讀執行緒,相反,阻塞的肯定是讀執行緒,喚醒的肯定是寫執行緒,那麼假設只有一個Condition會有什麼效果呢,快取佇列中已經存滿,這個Lock不知道喚醒的是讀執行緒還是寫執行緒了,如果喚醒的是讀執行緒,皆大歡喜,如果喚醒的是寫執行緒,那麼執行緒剛被喚醒,又被阻塞了,這時又去喚醒,這樣就浪費了很多時間。

例2

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Main {
    static class NumberWrapper {
        public int value = 1;
    }

    public static void main(String[] args)  {
        //初始化可重入鎖
        final Lock lock = new ReentrantLock();

        //第一個條件當螢幕上輸出到3
        final Condition reachThreeCondition = lock.newCondition();
        //第二個條件當螢幕上輸出到6
        final Condition reachSixCondition = lock.newCondition();

        //NumberWrapper只是為了封裝一個數字,一邊可以將數字物件共享,並可以設定為final
        //注意這裡不要用Integer, Integer 是不可變物件
        final NumberWrapper num = new NumberWrapper();
        //初始化A執行緒
        Thread threadA = new Thread(new Runnable() {
            @Override
            public void run() {
                //需要先獲得鎖
                lock.lock();
                try {
                    System.out.println("threadA start write");
                    //A執行緒先輸出前3個數
                    while (num.value <= 3) {
                        System.out.println(num.value);
                        num.value++;
                    }
                    //輸出到3時要signal,告訴B執行緒可以開始了
                    reachThreeCondition.signal();
                } finally {
                    lock.unlock();
                }
                lock.lock();
                try {
                    //等待輸出6的條件
                    reachSixCondition.await();
                    System.out.println("threadA start rewrite");
                    //輸出剩餘數字
                    while (num.value <= 9) {
                        System.out.println(num.value);
                        num.value++;
                    }

                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }

        });


        Thread threadB = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock.lock();

                    while (num.value <= 3) {
                        //等待3輸出完畢的訊號
                        reachThreeCondition.await();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
                try {
                    lock.lock();
                    //已經收到訊號,開始輸出4,5,6
                    System.out.println("threadB start write");
                    while (num.value <= 6) {
                        System.out.println(num.value);
                        num.value++;
                    }
                    //4,5,6輸出完畢,告訴A執行緒6輸出完了
                    reachSixCondition.signal();
                } finally {
                    lock.unlock();
                }
            }

        });

        //啟動兩個執行緒
        threadA.start();
        threadB.start();
    }
}

結果如下:

threadA start write
2
threadB start write
5
threadA start rewrite
8

基本思路就是首先要A執行緒先寫1,2,3,這時候B執行緒應該等待 reachThredCondition 訊號,而當A執行緒寫完3之後就通過signal告訴B執行緒“我寫到3了,該你了”,

這時候A執行緒要等 reachSixCondition 訊號,同時B執行緒得到通知,開始寫4,5,6,寫完4,5,6之後B執行緒通知A執行緒 reachSixCondition 條件成立了,這時候A執行緒就開始寫剩下的7,8,9了。

例3

Java官方提供的例子:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Main {
    public static void main(String[] args)  {
        final BoundedBuffer boundedBuffer = new BoundedBuffer();

        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("t1 run");
                for (int i=0;i<20;i++) {
                    try {
                        System.out.println("putting..");
                        boundedBuffer.put(Integer.valueOf(i));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }

        }) ;

        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i=0;i<20;i++) {
                    try {
                        Object val = boundedBuffer.take();
                        System.out.println(val);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }

        }) ;

        t1.start();
        t2.start();
    }

    /**
     * BoundedBuffer 是一個定長100的集合,當集合中沒有元素時,take方法需要等待,直到有元素時才返回元素
     * 當其中的元素數達到最大值時,要等待直到元素被take之後才執行put的操作
     * @author yukaizhao
     *
     */
    static class BoundedBuffer {
        final Lock lock = new ReentrantLock();
        final Condition notFull = lock.newCondition();
        final Condition notEmpty = lock.newCondition();

        final Object[] items = new Object[100];
        int putptr, takeptr, count;

        public void put(Object x) throws InterruptedException {
            System .out.println("put wait lock");
            lock.lock();
            System.out.println("put get lock");
            try {
                while (count == items.length) {
                    System.out.println("buffer full, please wait");
                    notFull.await();
                }

                items[putptr] = x;
                if (++putptr == items.length)
                    putptr = 0;
                ++count;
                notEmpty.signal();
            } finally {
                lock.unlock();
            }
        }

        public Object take() throws InterruptedException {
            System.out.println("take wait lock");
            lock.lock();
            System.out.println("take get lock");
            try {
                while (count == 0) {
                    System.out.println("no elements, please wait");
                    notEmpty.await();
                }
                Object x = items[takeptr];
                if (++takeptr == items.length)
                    takeptr = 0;
                --count;
                notFull.signal();
                return x;
            } finally {
                lock.unlock();
            }
        }
    }
}

結果如下:


t1 run
putting..
put wait lock
take wait lock
put get lock
putting..
put wait lock
take get lock
0
take wait lock
take get lock
no elements, please wait
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
put get lock
putting..
1
take wait lock
put wait lock
take get lock
2
put get lock
take wait lock
take get lock
3
take wait lock
putting..
put wait lock
take get lock
4
take wait lock
put get lock
putting..
put wait lock
take get lock
5
take wait lock
put get lock
putting..
put wait lock
take get lock
6
take wait lock
put get lock
putting..
put wait lock
take get lock
7
take wait lock
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
take get lock
8
take wait lock
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
take get lock
9
take wait lock
put get lock
putting..
put wait lock
take get lock
put get lock
putting..
put wait lock
10
take wait lock
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
take get lock
11
take wait lock
take get lock
12
take wait lock
take get lock
13
take wait lock
take get lock
14
take wait lock
take get lock
15
take wait lock
take get lock
no elements, please wait
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
put get lock
16
take wait lock
take get lock
17
take wait lock
take get lock
18
take wait lock
take get lock
19

這個示例中 BoundedBuffer 是一個固定長度的集合,

這個在其 put 操作時,如果發現長度已經達到最大長度,那麼要等待notFull訊號才能繼續 put,如果得到 notFull 訊號會像集合中新增元素,並且 put 操作會發出 notEmpty 的訊號,

而在其 take 方法中如果發現集合長度為空,那麼會等待 notEmpty 的訊號,接受到 notEmpty 訊號才能繼續 take,同時如果拿到一個元素,那麼會發出 notFull 的訊號。

如果採用 Object 類中的 wait(),notify(),notifyAll() 實現該緩衝區,當向緩衝區寫入資料之後需要喚醒”讀執行緒”時,不可能通過 notify()notifyAll() 明確的指定喚醒”讀執行緒”,而只能通過 notifyAll 喚醒所有執行緒(但是 notifyAll 無法區分喚醒的執行緒是讀執行緒,還是寫執行緒)。 但是,通過 Condition,就能明確的指定喚醒讀執行緒。

Condition 原理分析

ConditionObject 是同步器 AbstractQueuedSynchronizer 的內部類,因為 Condition 的操作需要獲取相關聯的鎖,所以作為同步器的內部類也較為合理。每個 Condition 物件都包含著一個佇列,該佇列是 Condition 物件實現等待/通知功能的關鍵。下面將分析Condition 的實現,主要包括:等待佇列、等待和通知。

等待佇列

等待佇列是一個FIFO的佇列,在佇列中的每個節點都包含了一個執行緒引用,該執行緒就是在 Condition 物件上等待的執行緒,如果一個執行緒呼叫了Condition.await()方法,那麼該執行緒將會構造成節點加入等待佇列並進入等待狀態,在 unlock() 方法後釋放鎖。
一個 Condition 包含一個等待佇列,Condition 擁有首節點(firstWaiter)和尾節點(lastWaiter)。當前執行緒呼叫 Condition.await() 方法,將會以當前執行緒構造節點,並將節點從尾部加入等待佇列,等待佇列的基本結構如下圖所示:

等待

如圖所示,Condition 擁有首尾節點的引用,而新增節點只需要將原有的尾節點 nextWaiter 指向它,並且更新尾節點即可。上述節點引用更新的過程並沒有使用CAS保證,原因在於呼叫 await() 方法的執行緒必定是獲取了鎖的執行緒,也就是說該過程是由鎖來保證執行緒安全的。在Object的監視器模型上,一個物件擁有一個同步佇列和等待佇列,而併發包中的 Lock(更確切地說是同步器)擁有一個同步佇列和多個等待佇列,其對應關係如下圖所示:

Lock

等待

呼叫 Conditionawait() 方法,會使當前執行緒進入等待佇列,同時執行緒狀態變為等待狀態,在呼叫 unlock() 方法後釋放鎖。當從 await() 方法返回時,當前執行緒一定獲取了Condition 相關聯的鎖。如果從佇列(同步佇列和等待佇列)的角度看 await() 方法,當呼叫 await() 方法時,相當於同步佇列的首節點(獲取了鎖的節點)移動到 Condition 的等待佇列中。呼叫該方法的執行緒成功獲取了鎖的執行緒,也就是同步佇列中的首節點,該方法會將當前執行緒構造成節點並加入等待佇列中,然後釋放同步狀態,喚醒同步佇列中的後繼節點,然後當前執行緒會進入等待狀態。當等待佇列中的節點被喚醒,則喚醒節點的執行緒開始嘗試獲取同步狀態。如果不是通過其他執行緒呼叫 Condition.signal() 方法喚醒,而是對等待執行緒進行中斷,則會丟擲 InterruptedException :

await

通知

呼叫 Conditionsignal() 方法,將會喚醒在等待佇列中等待時間最長的節點(首節點),在喚醒節點之前,會將節點移到同步佇列中。呼叫該方法的前置條件是當前執行緒必須獲取了鎖,接著獲取等待佇列的首節點,將其移動到同步佇列。節點從等待佇列移動到同步佇列的過程如下圖所示:

signal

signal() 方法只是將 Condition 等待佇列頭結點移出佇列,此時該執行緒節點還是阻塞的,同時將該節點的執行緒重新包裝加入同步佇列,當呼叫 unlock() 方法時,會喚醒同步佇列的第二個節點,假如這個新節點是處於第二個位置,那麼它將會被喚醒,否則,繼續阻塞。


參考自:
http://www.cnblogs.com/hongdada/p/6150699.html