1. 程式人生 > >十七、併發程式設計之讀寫鎖ReentrantReadWriteLock的鎖降級

十七、併發程式設計之讀寫鎖ReentrantReadWriteLock的鎖降級

  • 鎖降級
    鎖降級是指寫鎖降級為讀鎖。
    在寫鎖沒有釋放的時候,獲取到讀鎖,再釋放寫鎖

  • 鎖升級(ReentrantReadWriteLock是不支援的。)
    把讀鎖升級為寫鎖
    在讀鎖沒有釋放的時候,獲取到寫鎖,再釋放讀鎖

  • oracle官網的對於鎖降級的示例程式碼:

 class CachedData {
   Object data;
   volatile boolean cacheValid;
   final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

   void
processCachedData() { rwl.readLock().lock(); if (!cacheValid) { rwl.readLock().unlock(); rwl.writeLock().lock(); try { if (!cacheValid) { data = ... cacheValid = true; } rwl.readLock().lock(); } finally {
rwl.writeLock().unlock(); } } try { use(data); } finally { rwl.readLock().unlock(); } } }

程式碼中聲明瞭一個volatile型別的cacheValid變數,保證其可見性。首先獲取讀鎖,如果cache不可用,則釋放讀鎖,獲取寫鎖,在更改資料之前,再檢查一次cacheValid的值,然後修改資料,將cacheValid置為true,然後在釋放寫鎖前獲取讀鎖;此時,cache中資料可用,處理cache中資料,最後釋放讀鎖。這個過程就是一個完整的鎖降級的過程,目的是保證資料可見性,如果當前的執行緒C在修改完cache中的資料後,沒有獲取讀鎖而是直接釋放了寫鎖,那麼假設此時另一個執行緒T獲取了寫鎖並修改了資料,那麼C執行緒無法感知到資料已被修改,則資料出現錯誤。如果遵循鎖降級的步驟,執行緒C在釋放寫鎖之前獲取讀鎖,那麼執行緒T在獲取寫鎖時將被阻塞,直到執行緒C完成資料處理過程,釋放讀鎖。

  • 具體的案例來演示以上過程,程式碼如下:
public class ReadWriteLockTest {
    private volatile boolean cacheValid = false;
    private int currentValue = 0;
    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private Lock readLock = lock.readLock();
    private Lock writeLock = lock.writeLock();

    /**
     * 測試用例
     */
    @Test
    public void testLockDowngrading() throws InterruptedException {
        CountDownLatch start = new CountDownLatch(1);
        CountDownLatch end = new CountDownLatch(2);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 100, TimeUnit.SECONDS, new ArrayBlockingQueue
                <Runnable>(10));
        for (int i = 0; i < 2; i ++){
            int finalI = i;
            executor.execute(new Thread(new Runnable() {
                @Override
                public void run() {
                    Thread.currentThread().setName("thread-" + finalI);
                    try {
                        start.await();
                        TimeUnit.SECONDS.sleep(finalI);
                        System.out.println("after sleep " + finalI + " seconds, excute " + Thread.currentThread().getName());
                        cacheValid = false;
                        processCachedData(finalI);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        end.countDown();
                    }

                }
            }));
        }
        start.countDown();
        end.await();

    }

    /**
     * 鎖降級過程
     * @param num
     */
    private void processCachedDataDownGrading(int num){
        readLock.lock();
        if(!cacheValid){
            //必須先釋放寫鎖
            readLock.unlock();
            writeLock.lock();
            try{
                //在更新資料之前做二次檢查
                if(!cacheValid){
                    System.out.println(Thread.currentThread().getName() + " has updated!");
                    //將資料更新為和執行緒值相同,以便驗證資料
                    currentValue = num;
                    cacheValid = true;
                    readLock.lock();
                }
            }finally {
                writeLock.unlock();
            }
        }
        try{
            //模擬5秒的處理時間,並打印出當前值,在這個過程中cacheValid可能被其他執行緒修改,鎖降級保證其他執行緒寫鎖被阻塞,資料不被改變
            TimeUnit.SECONDS.sleep(5);
            System.out.println(Thread.currentThread().getName() + ": " +  currentValue);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if(lock.getReadHoldCount() > 0){
                readLock.unlock();
            }
        }
    }

    /**
     * 無鎖降級的過程
     * @param num
     */
    private void processCachedData(int num){
        readLock.lock();
        if(!cacheValid){
            readLock.unlock();
            writeLock.lock();
            try{
                if(!cacheValid){
                    System.out.println(Thread.currentThread().getName() + " has updated!");
                    currentValue = num;
                    cacheValid = true;
                }
            }finally {
                writeLock.unlock();
            }
        }
        try{
            //模擬5秒的處理時間,並打印出當前值,在這個過程中cacheValid可能被其他執行緒修改,無鎖降級過程,其他執行緒此時可能獲取寫鎖,並更改書資料,導致後面的資料錯誤
            TimeUnit.SECONDS.sleep(5);
            System.out.println(Thread.currentThread().getName() + ": " +  currentValue);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if(lock.getReadHoldCount() > 0){
                readLock.unlock();
            }
        }
    }

}

在上述的測試程式碼中,有兩個方法,一個是processCachedDataDownGrading,該方法模擬鎖降級的過程,另一個是processCachedData,模擬無鎖降級的過程。
在主測試程式碼中,起了兩個執行緒,每個執行緒執行前先將cacheValid置為false,同時,為了模擬處理資料5秒中外部執行緒的想要執行的資料變更,兩個執行緒的實際執行行為相差1秒。
如果兩個執行緒中執行的方法是processCachedData(無鎖降級過程),那麼輸出為:
after sleep 0 seconds, excute thread-0
thread-0 has updated!
after sleep 1 seconds, excute thread-1
thread-1 has updated!
thread-0: 1
thread-1: 1

從輸出看,執行緒0馬上執行(不是指run前,指run中我們想測試的內容),執行緒1過了1秒執行,兩個執行緒中的資料均被改變,但是最終兩個執行緒中的值均為1(實際我們期望執行緒0中的資料應該為0),導致這個結果的原因就是線上程0處理資料(sleep5秒)的過程中,執行緒1獲取了寫鎖並更新了資料,從而執行緒0的資料被更新。
如果兩個執行緒中執行的方法是processCachedDataDownGrading(鎖降級過程),那麼輸出為:
after sleep 0 seconds, excute thread-0
thread-0 has updated!
after sleep 1 seconds, excute thread-1
thread-0: 0
thread-1 has updated!
thread-1: 1

從輸出看,執行緒0馬上執行,執行緒1過了1秒執行,兩個執行緒中的資料均被改變,但是最終兩個執行緒中的值分別為0和1,符合期望,因為這個過程是鎖降級的過程,執行緒0在更新資料之後,釋放寫鎖之前,獲取了讀鎖,並在處理完資料之後才將其釋放,因此執行緒1線上程0在處理資料時獲取寫鎖被阻塞,從thread-1 has updated! 語句輸出在thread-0: 0 之後也可看出,從而保證了資料0的資料沒有問題。

原文