1. 程式人生 > >併發系列(十三)-----ReentrantReadWriteLock原始碼解析

併發系列(十三)-----ReentrantReadWriteLock原始碼解析

一 簡介

讀寫鎖如何正確的使用在上一篇的文章中簡單的介紹了,並且已經知道讀寫鎖內部維護了兩個鎖分別是讀鎖和寫鎖。下面是這兩個鎖的特點

1,寫鎖是獨佔的、排他的,當一個執行緒持有寫鎖是,其他任何執行緒都不允許獲取到鎖不管是讀鎖還是寫鎖。

2.讀鎖是共享的,允許多個執行緒同時訪問資源。

3.一個執行緒同時既可以持有讀鎖也可以寫鎖,獲取順序是先獲取寫鎖後是可以獲取讀鎖的這種現象較鎖降級,但是不可以先獲取讀鎖然後在獲取寫鎖那樣會造成死鎖,這種又叫鎖升級.

二 ReentrantReadWriteLock組成

ReentrantReadWriteLock內部構成比較多,下面是UMl圖

對上圖做一個簡單的說明

虛線箭頭 表示實現了介面 例如ReentrantReadWriteLock實現了ReadWriteLock

實現箭頭 表示繼承了某個類 例如FairSync和NofairSync繼承了sync

加號線     表示組成    例如sync由內部類HoldCounter和ThreadLocalHoldCounter構成

更據上圖我們可以知道ReentrantReadWriteLock由5個內部類構成分別是Sync,FairSync,NofairSync,WriteLock,ReadLock。Sync內部也維護了兩個類分別是HoldCounter和ThreadLocalHoldCounter兩個類,其中ThreadLocalHoldCounter繼承ThreadLcoal。這兩個類是用來服務於讀鎖的重入的。FairSync和NofairSync是Sync的兩個不同的實現版本。WriteLock和ReadLock繼承Lock介面。

三 實現原理及原始碼解析

ReentrantReadWriteLcok的是實現還是基於AQS的,其中寫鎖利用的是AQS中的獨佔模式讀鎖利用的是AQS中的共享模式。

1.Sync中資源解析

我們知道AQS中的資源是一個int型別的數值,一個資源它怎麼即表示讀鎖有表示寫鎖的呢?我們知道int型別在Java中的大小是32bit,那麼int型別的二進位制最大值就是1111 1111 1111 1111 1111 1111 1111 1111。讀寫利用將他將的高16位表示用來表示讀鎖,低16位表示的是寫鎖。例如 0000 0000 0000 0000 0000 0000 0000 0001 就表示有寫鎖存在,0000 0000 0000 0001 0000 0000 0000 0000,就表示有讀鎖存在。0000 0000 0000 0001 0000 0000 0000 0001表示既有讀鎖存在也有寫鎖的存在。

下面是原始碼

    /**
         * 對32位的int進分割
         */
        static final int SHARED_SHIFT = 16;
        /**
         * 十進位制為65536 移位運算的https://zhuanlan.zhihu.com/p/30108890 
         */
        static final int SHARED_UNIT = (1 << SHARED_SHIFT);
        /**
         * 65535 0000 0000 0000 0000 1111 1111 1111 1111
         *最大資源數
         */
        static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
        /**
         * 65535 0000 0000 0000 0000 1111 1111 1111 1111 
         * 用來計算獨佔資源
         */
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

    /**
         * 計算readLock的獲取次數包含重入的次數
         *
         * @param c 資源數
         * @return 計算好的次數
         */
        static int sharedCount(int c) {
            //將位元組向左無符號右移16位只剩下原來的高16位
            //eg  c = <0000 0000 0000 1111> 0000 0000 0000 0011
            //移位後為0000 0000 0000 0000 <0000 0000 0000 1111> 
            return c >>> SHARED_SHIFT;
        }

        /**
         * 計算writeLock的獲取次數包括重入的次數
         *
         * @param c 資源數
         * @return 計算後的獨佔資源數
         */
        static int exclusiveCount(int c) {
            //如果這個c小於65536時若&上65535時就剩下低16位,還是C
            //eg 0000 0000 0000 1111 0000 0000 0000 0011
            //相&後 0000 0000 0000 0000 <0000 0000 0000 0011>
            return c & EXCLUSIVE_MASK;
        }

上面的原始碼涉及到了二進位制的一些運算,這個大學都應該學過,關於位運算在註解中我給出一篇部落格可以自己去學習。

2.寫鎖的實現

AQS中的獨佔模式對應著讀寫鎖的寫鎖,而且我們也已經知道int型別的低16位時用來表示獨佔模式的,下面我們先看寫鎖獲取的原始碼。


        /**
         * 寫鎖資源的獲取
         *
         * @param acquires 資源數
         * @return 獲取是否成功true表示成功false表示失敗
         */
        @Override
        protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            //獲取當前資源
            int c = getState();
            //獲取當前資源低16位的值
            int w = exclusiveCount(c);
            if (c != 0) {
                //如果c != 0表表示有執行緒在持有鎖這個鎖可能是讀鎖也可能是寫鎖
                if (w == 0 || current != getExclusiveOwnerThread()) {
                //如果低16位為0,那麼表示高16位有數字,那麼就有讀鎖存在直接返回false新增到同步對列中
                //如果低16位不為0說明低16位有數字,那麼就有寫鎖存在,如果寫鎖不是當前執行緒
                //所持有的那麼直接返回false進入同步對列中去
                    return false;
                }
              
                if (w + exclusiveCount(acquires) > MAX_COUNT) {
                    throw new Error("Maximum lock count exceeded");
                }
               //程式碼走到這裡w!=0 且還是持有寫鎖的還是當前執行緒,資源數也有那麼就要重入了
               //因為是獨佔模式,而且又是當前執行緒執行所以不用管考慮併發直接設定就可以
                setState(c + acquires);
                //返回true,獲取鎖成功
                return true;
            }
            //如果執行緒獲取策略是阻塞,直接進入到同步對列中
            //如果執行緒獲取策略不是阻塞,直接使用CAS獲取資源注意這裡有可能發生併發所以使用了CAS
            if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) {
                return false;
            }
            //走到這裡說明compareAndSetState(c, c + acquires)執行成功也就是獲取資源成功
            //設定執行緒
            setExclusiveOwnerThread(current);
            return true;
        }

上面的邏輯已經說明的聽清楚的,只有一個方法需要說明writerShouldBlock() 這個方法是返回一個boolean值,這個布林值用來決定執行緒是現在獲取還是等一會獲取。

上面的原始碼時寫鎖用來獲取資源的,下面我們看一下寫鎖的資源時如何釋放的。如下原始碼

        /**
         * 釋放獨佔資源
         *
         * @param release 資源數
         * @return 寫鎖的資源全都的釋放完成返回true
         */
        @Override
        protected final boolean tryRelease(int release) {
            //是不是當前執行緒
            if (!isHeldExclusively()) {
                throw new IllegalMonitorStateException();
            }
            int nextc = getState() - release;
            //獲取低16位 如果低16位為0的話表示讀鎖沒有了
            boolean free = exclusiveCount(nextc) == 0;
            if (free) {
                //如果獨佔資源為0的話將持有執行緒變為null
                setExclusiveOwnerThread(null);
            }
            //設定資源這這裡不用考慮併發,寫鎖時獨佔模式
            setState(nextc);
            return free;
        }

原始碼的邏輯時比較簡單的不再做過多的介紹。

3.讀鎖的實現

AQS中的共享模式對應著讀鎖的實現,再看原始碼的時候我們要看一下Sync中的幾個成員變數,這幾個成員變數是為讀鎖的重入服務的。下面是原始碼。

  /**
         * 幾乎每個獲取 readLock 的執行緒都會含有一個 HoldCounter 用來記錄 執行緒 id 與 獲取 readLock 的次數 ( writeLock 的獲取是由
         * state 的低16位 及 aqs中的exclusiveOwnerThread 來進行記錄) 這裡有個注意點 第一次獲取 readLock 的執行緒使用 firstReader ,
         * firstReaderHoldCount 來進行記錄 (PS: 不對, 我們想一下為什麼不 統一用 HoldCounter 來進行記錄呢? 原 因: 所用的
         * HoldCounter 都是放在 ThreadLocal 裡面, 而很多有些場景中只有一個執行緒獲取 readLock 與 writeLock , 這種情況還用
         * ThreadLocal 的話那就有點浪費(ThreadLocal.get() 比直接 通過 reference 來獲取資料相對來說耗效能))
         */
        static final class HoldCounter {
            /**
             * readLock的獲取次數
             */
            int count = 0;
            /**
             * 執行緒的ID
             */
            final long tid = getThreadId(Thread.currentThread());
        }

        /**
         * 簡單的自定義的 ThreadLocal 來用進行記錄  readLock 獲取的次數
         */
        static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
            @Override
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }

        /**
         * readLock 獲取記錄容器 ThreadLocal(ThreadLocal 的使用過程中當 HoldCounter.count == 0 時要進行 remove ,
         * 不然很有可能導致 記憶體的洩露)
         */
        private transient ThreadLocalHoldCounter readHolds;
        /**
         * 最後一次獲取 readLock 的 HoldCounter 的快取 (PS: 還是上面的問題 有了 readHolds 為什麼還需要 cachedHoldCounter呢?
         * 非常大的場景中, 這次進行release readLock的執行緒就是上次 acquire 的執行緒, 這樣直接通過cachedHoldCounter來進行獲取, 節 省了通過
         * readHolds 的 lookup 的過程)
         */
        private transient HoldCounter cachedHoldCounter;
        /**
         * 下面兩個是用來進行記錄 第一次獲取 readLock 的執行緒的資訊 準確的說是第一次獲取 readLock 並且 沒有 release 的執行緒, 一旦執行緒進行 release
         * readLock, 則 firstReader會被置位 null
         */
        private transient Thread firstReader = null;
        private transient int firstReaderHoldCount;

關於這幾個類的作用,上面的註釋應該很詳細了。在往下我們看讀鎖的資源獲取。

   /**
         * 讀鎖的獲取
         *
         * @param unused 資源數
         * @return 大於0表示獲取成功
         */
        @Override
        protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
            //如果寫鎖存在的話當前執行緒也沒有持有寫鎖的直接不允許獲取讀鎖,新增到同步對列中
            if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) {
                return -1;
            }
            //獲取高16位的數
            int r = sharedCount(c);
            //這裡只是簡單的去獲取一下
            //檢視等待的策略是否等待,不等待的話檢查合法性在替換高16位的資源
            //這裡使用了CAS存在併發
            if (!readerShouldBlock() && r < MAX_COUNT &&
                    compareAndSetState(c, c + SHARED_UNIT)) {
                //如果讀鎖為0的話那麼說明是第一次獲取
                if (r == 0) {
                    //設定執行緒和讀鎖次數
                    firstReader = current;
                    firstReaderHoldCount = 1;
                    //沒有使用HoldCounter
                } else if (firstReader == current) {
                    //firstReader是當前執行緒的話直接次數加一
                    firstReaderHoldCount++;
                } else {
                    //不是的話獲取HoldCounter
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        //如果為null的話或者不是當前執行緒時重新在獲取一個
                        cachedHoldCounter = rh = readHolds.get();
                    } else if (rh.count == 0) {
                        readHolds.set(rh);
                    }
                    //次數加一
                    rh.count++;
                }
                //獲取資源成功
                return 1;
            }
            //簡單的獲取不到,在進行完全獲取。
            return fullTryAcquireShared(current);
        }

/**
         * 讀鎖的完全獲取資源
         *
         * @param current 當前的執行緒
         * @return 大於等於0表示獲取成功,小於0表示獲取失敗
         */
        final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;
            //for迴圈不停的獲取資源
            for (; ; ) {
                //獲取資源
                int c = getState();
                //當前有寫鎖存在的時候
                if (exclusiveCount(c) != 0) {
                    //如果讀鎖不是當前的執行緒那麼直接結束迴圈返回-1;獲取失敗
                    if (getExclusiveOwnerThread() != current) {
                        return -1;
                    }
                } else if (readerShouldBlock()) {
                    //阻塞策略true表示要阻塞
                    if (firstReader == current) {
                        //判斷當前執行緒是不是第一條
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                //獲取一個新的
                                rh = readHolds.get();
                                //檢查是否是持有讀鎖是否為0若為0的話,說明沒有獲取過
                                //直接移除
                                if (rh.count == 0) {
                                    readHolds.remove();
                                }
                            }

                        }
                        if (rh.count == 0) {
                            return -1;
                        }
                    }
                }
                //檢查資源數
                if (sharedCount(c) == MAX_COUNT) {
                    throw new Error("Maximum lock count exceeded");
                }
                //使用CAS不停的獲取
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    //程式碼走到這裡說明已經獲取成功了
                    //如果高16位的資源數是0那麼說明他是第一個
                    if (sharedCount(c) == 0) {
                        //設定第一個的執行緒為當前執行緒
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        //如果第一條執行緒又在獲取讀鎖直接將讀鎖的次數加一
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                        }
                        if (rh == null || rh.tid != getThreadId(current)) {
                            rh = readHolds.get();
                        } else if (rh.count == 0) {
                            readHolds.set(rh);
                        }
                        rh.count++;
                        cachedHoldCounter = rh;
                    }
                    return 1;
                }
            }
        }

讀鎖資源的釋放,比較簡單,就是操作讀鎖的次數,和修改資源,但是在修改資源的時候需要考慮執行緒安全,因為時併發訪問。

下面時原始碼


        /**
         * 讀鎖的釋放
         *
         * @param unused 釋放資源數
         * @return 是否釋放成功
         */
        @Override
        protected final boolean tryReleaseShared(int unused) {
            //獲取當前執行緒
            Thread current = Thread.currentThread();
            //如果firstReader等於當前執行緒的話說明是要釋放只有一條獲取寫鎖的執行緒了
            if (firstReader == current) {
                //等於1的話說明只是獲取讀鎖一次釋放了也就需要將firstReader至為null
                if (firstReaderHoldCount == 1) {
                    firstReader = null;
                } else {
                    //不等於1說明獲取讀鎖多次直接截減去1
                    firstReaderHoldCount--;
                }
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current)) {
                    rh = readHolds.get();
                }
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0) {
                        throw unmatchedUnlockException();
                    }
                }
                --rh.count;
            }
            for (; ; ) {
                //去更新資源更新高16位
                int c = getState();
                int nextc = c - SHARED_UNIT;
                //使用CAS去更新存在併發
                if (compareAndSetState(c, nextc)) {
                    return nextc == 0;
                }
            }
        }

四 鎖升級的死鎖分析

讀寫鎖的鎖升級就是,一個執行緒獲取完讀鎖沒有釋放讀鎖,就去獲取寫鎖。這樣的或會遭成死鎖。

1.一個執行緒獲取到了讀鎖,那麼資源數也就不等於0了。

2.在獲取寫鎖時如果執行緒就會走到下面程式碼中。

  if (c != 0) {
                //如果c != 0表表示有執行緒在持有鎖這個鎖可能是讀鎖也可能是寫鎖
                if (w == 0 || current != getExclusiveOwnerThread()) {
                //如果低16位為0,那麼表示高16位有數字,那麼就有讀鎖存在直接返回false新增到同步對列中
                //如果低16位不為0說明低16位有數字,那麼就有寫鎖存在,如果寫鎖不是當前執行緒
                //所持有的那麼直接返回false進入同步對列中去
                    return false;
                }
              
                if (w + exclusiveCount(acquires) > MAX_COUNT) {
                    throw new Error("Maximum lock count exceeded");
                }
               //程式碼走到這裡w!=0 且還是持有寫鎖的還是當前執行緒,資源數也有那麼就要重入了
               //因為是獨佔模式,而且又是當前執行緒執行所以不用管考慮併發直接設定就可以
                setState(c + acquires);
                //返回true,獲取鎖成功
                return true;
            }

3.當走到2的程式碼時候就會返回一個fasle,因為當前執行緒是是沒有持有讀鎖的。那麼執行緒就會新增到AQS中的同步佇列中執行緒就會waiting。

4.執行緒如果waiting狀態中讀鎖就不可能被釋放了,執行緒永遠處於waiting狀態中,造成了死鎖。

五 總結

1.資源高16為表示讀鎖,低16位表示寫鎖,通過位運算和&運算實現

2.寫鎖對應著AQS中的獨佔模式,讀鎖對應著AQS中的共享模式

3.鎖升級造成的死鎖