1. 程式人生 > >原始碼分析— java讀寫鎖ReentrantReadWriteLock

原始碼分析— java讀寫鎖ReentrantReadWriteLock

前言

今天看Jraft的時候發現了很多地方都用到了讀寫鎖,所以心血來潮想要分析以下讀寫鎖是怎麼實現的。

先上一個doc裡面的例子:

class CachedData {
  Object data;
  volatile boolean cacheValid;
  final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

  void processCachedData() {
      //加上一個讀鎖
    rwl.readLock().lock();
    if (!cacheValid) {
      // Must release read lock before acquiring write lock
      //必須在加寫鎖之前釋放讀鎖
      rwl.readLock().unlock();
      rwl.writeLock().lock();
      try {
        // Recheck state because another thread might have
        // acquired write lock and changed state before we did.
          //雙重檢查
        if (!cacheValid) {
            //設定值
          data = ...
          cacheValid = true;
        }
        // Downgrade by acquiring read lock before releasing write lock
          //鎖降級,反之則不行
        rwl.readLock().lock();
      } finally {
          //釋放寫鎖,但是仍然持有寫鎖
        rwl.writeLock().unlock(); // Unlock write, still hold read
      }
    }

    try {
      use(data);
    } finally {
        //釋放讀鎖
      rwl.readLock().unlock();
    }
  }
}}

我們一般例項化一個ReentrantReadWriteLock,一般是呼叫空的構造器建立,所以預設使用的是非公平鎖

public ReentrantReadWriteLock() {
    this(false);
}


public ReentrantReadWriteLock(boolean fair) {
      //預設使用的是NonfairSync
    sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
}
//分別呼叫writeLock和readLock會返回讀寫鎖例項
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

ReentrantReadWriteLock內部類Sync

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 6317671515068378041L;
      //位移量
      //在讀寫鎖中,state是一個32位的int,所以用state的高16位表示讀鎖,用低16位表示寫鎖
    static final int SHARED_SHIFT   = 16;
      //因為讀鎖是高16位,所以用1向左移動16位表示讀鎖每次鎖狀態變化的量
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
      //最大的可重入次數
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
      //用來計算低16位的寫鎖狀態
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

    //獲取高16位讀鎖state次數,重入次數
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    //獲取低16位寫鎖state次數,重入次數
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

    //用來記錄每個執行緒持有的讀鎖數量
    static final class HoldCounter {
        int count = 0;
        // Use id, not reference, to avoid garbage retention
        final long tid = getThreadId(Thread.currentThread());
    }

    
    static final class ThreadLocalHoldCounter
        extends ThreadLocal<HoldCounter> {
        public HoldCounter initialValue() {
            return new HoldCounter();
        }
    }

    private transient ThreadLocalHoldCounter readHolds;
      // 用於快取,記錄"最後一個獲取讀鎖的執行緒"的讀鎖重入次數
    private transient HoldCounter cachedHoldCounter;
      // 第一個獲取讀鎖的執行緒(並且其未釋放讀鎖),以及它持有的讀鎖數量
    private transient Thread firstReader = null;
    private transient int firstReaderHoldCount;

    Sync() {
          // 初始化 readHolds 這個 ThreadLocal 屬性
        readHolds = new ThreadLocalHoldCounter();
        setState(getState()); // ensures visibility of readHolds
    }
    ....
}
  1. 因為int是32位的,所以在ReentrantReadWriteLock中將state分為兩部分,高16位作為讀鎖的狀態控制器,低16位作為寫鎖的狀態控制器。
  2. 每次要獲取讀鎖的當前狀態都需要呼叫sharedCount傳入當前的state,將state向右移動16位來獲取
  3. 要獲取低16位則需要將1左移16位減一,獲得一個低16位全是1的數,然後和傳入的state進行取與操作獲取state的低16位的值
  4. cachedHoldCounter裡面儲存了最新的讀鎖的執行緒和呼叫次數
  5. firstReader 和 firstReaderHoldCount 將”第一個”獲取讀鎖的執行緒記錄在 firstReader 屬性中,這裡的第一個不是全域性的概念,等這個 firstReader 當前代表的執行緒釋放掉讀鎖以後,會有後來的執行緒佔用這個屬性的。

讀鎖獲取

//readLock#lock
public void lock() {
      //這裡會呼叫父類AQS的acquireShared,嘗試獲取鎖
    sync.acquireShared(1);
}
//AQS#acquireShared
public final void acquireShared(int arg) {
      //返回值小於 0 代表沒有獲取到共享鎖
    if (tryAcquireShared(arg) < 0)
          //進入到阻塞佇列,然後等待前驅節點喚醒
        doAcquireShared(arg);
}

這裡的tryAcquireShared是呼叫ReentrantReadWriteLock的內部類Sync的tryAcquireShared的方法

protected final int tryAcquireShared(int unused) {
      //獲取當前執行緒
    Thread current = Thread.currentThread();
      //獲取AQS中的state屬性值
    int c = getState();
    //exclusiveCount方法是用來獲取寫鎖狀態,不等於0代表有寫鎖
    if (exclusiveCount(c) != 0 &&
          //如果不是當前執行緒獲取的寫鎖,那麼直接返回-1
        getExclusiveOwnerThread() != current)
        return -1;
      //獲取讀鎖的鎖定次數
    int r = sharedCount(c);
      // 讀鎖獲取是否需要被阻塞
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        //因為高16位代表共享鎖,所以CAS需要加上一個SHARED_UNIT
        compareAndSetState(c, c + SHARED_UNIT)) {
        if (r == 0) {
              //記錄一下首次讀執行緒
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
               //firstReader 重入獲取讀鎖
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
              // 如果 cachedHoldCounter 快取的不是當前執行緒,設定為快取當前執行緒的 HoldCounter
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
           // return 大於 0 的數,代表獲取到了共享鎖
        return 1;
    }
    return fullTryAcquireShared(current);
}
  1. 首先會去呼叫exclusiveCount方法來檢視寫鎖是否被佔用,如果被佔用,那麼檢視當前執行緒是否是佔用讀鎖的執行緒,如果不是則返回-1。通過這裡可以看出可以先佔用讀鎖再佔用寫鎖
  2. 呼叫readerShouldBlock方法獲取是否需要阻塞讀鎖獲取,然後檢查一下高16位讀鎖重入次數是否超過了2^16-1,最後通過CAS操作將state高16進行加1操作,如果沒有其他執行緒搶佔就會成功
  3. 如果state的高16位為零,那麼就設定首次讀執行緒和首次數次數,如果不是則校驗首次讀執行緒是不是當前執行緒,是的話將firstReaderHoldCount次數加1。如果不是首次讀執行緒,那麼校驗一下最後一次讀執行緒是不是當前執行緒,不是的話就從readHolds中獲取,並將HoldCounter計數加1,如果最後讀執行緒是當前執行緒那麼計數加1

readerShouldBlock

//NonfairSync#readerShouldBlock
final boolean readerShouldBlock() {
    return apparentlyFirstQueuedIsExclusive();
}
//AQS
final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    return (h = head) != null &&
        (s = h.next)  != null &&
        !s.isShared()         &&
        s.thread != null;
}

在非公平模式中readerShouldBlock會呼叫AQS的方法,判斷當前頭節點的下一個節點,如果不是共享節點,那麼readerShouldBlock就返回true,讀鎖就會阻塞。

//FairSync#readerShouldBlock
final boolean readerShouldBlock() {
    return hasQueuedPredecessors();
}
//AQS
public final boolean hasQueuedPredecessors() {
   
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

在公平模式中會去看看佇列裡有沒有其他元素在佇列裡等待獲取鎖,如果有那麼讀鎖就進行阻塞

ReentrantReadWriteLock#fullTryAcquireShared

final int fullTryAcquireShared(Thread current) {
   
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
          //檢查是否寫鎖被佔用
        if (exclusiveCount(c) != 0) {
               //被佔用,但是佔用讀鎖執行緒不是當前執行緒,返回阻塞
            if (getExclusiveOwnerThread() != current)
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
            //檢查讀鎖是否應該被阻塞
        } else if (readerShouldBlock()) {
            // Make sure we're not acquiring read lock reentrantly
              //首次讀執行緒是當前執行緒,下面直接CAS
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                       //設定最後一次讀執行緒
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                               //如果發現 count == 0,也就是說,純屬上一行程式碼初始化的,那麼執行 remove
                            readHolds.remove();
                    }
                }
                   //如果最後讀取執行緒次數為0,那麼阻塞
                if (rh.count == 0)
                    return -1;
            }
        }
          //如果讀鎖重入次數達到上限,拋異常
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
          //嘗試CAS讀鎖重入次數加1
        if (compareAndSetState(c, c + SHARED_UNIT)) {
               // 這裡 CAS 成功,那麼就意味著成功獲取讀鎖了
            // 下面需要做的是設定 firstReader 或 cachedHoldCounter
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                  // 下面這幾行,就是將 cachedHoldCounter 設定為當前執行緒
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
              // 返回大於 0 的數,代表獲取到了讀鎖
            return 1;
        }
    }
}

這個方法主要是用來處理重入鎖操作的。首先校驗一下寫鎖是否被佔用,如果沒有被佔用則判斷當前執行緒是否是第一次讀執行緒,如果不是則判斷最後一次讀執行緒是不是當前執行緒,如果不是則從readHolds獲取,並判斷HoldCounter例項中獲取讀鎖次數如果為0,那麼就不是重入。

如果可以判斷當前執行緒是重入的,那麼則對state高16進行加1操作,操作成功,則對firstReader或cachedHoldCounter進行設定,並返回1,表示獲取到鎖。

到這裡我們看完了tryAcquireShared方法,我再把acquireShared方法貼出來:

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

下面看doAcquireShared方法:

private void doAcquireShared(int arg) {
      //例項化一個共享節點入隊
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
              //獲取當前節點的上一個前置節點
            final Node p = node.predecessor();
              //前置節點如果是頭節點,那麼代表隊列裡沒有別的節點,先呼叫tryAcquireShared嘗試獲取鎖
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                       //醒佇列中其他共享節點
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                       //響應中斷
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
              //設定前置節點waitStatus狀態
            if (shouldParkAfterFailedAcquire(p, node) &&
                  //阻塞當前執行緒
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

doAcquireShared方法中會例項化一個共享節點併入隊。如果當前節點的前置節點是頭節點,那麼直接呼叫tryAcquireShared先獲取一次鎖,如果返回大於0,那麼表示可以獲取鎖,呼叫setHeadAndPropagate喚醒佇列中其他的執行緒;如果沒有返回則會呼叫shouldParkAfterFailedAcquire方法將前置節點的waitStatus設值成SIGNAL,然後呼叫parkAndCheckInterrupt方法阻塞

AQS#setHeadAndPropagate

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
      //把node節點設值為頭節點
    setHead(node); 
      //因為是propagate大於零才進這個方法,所以這個必進這個if
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
          //獲取node的下一個節點
        Node s = node.next;
          //判斷下一個節點是否為空,或是共享節點
        if (s == null || s.isShared())
              //往下看
            doReleaseShared();
    }
}

這個方法主要是替換頭節點為當前節點,然後呼叫doReleaseShared進行喚醒節點的操作

AQS#doReleaseShared

private void doReleaseShared() { 
    for (;;) {
        Node h = head;
        // 1. h == null: 說明阻塞佇列為空
        // 2. h == tail: 說明頭結點可能是剛剛初始化的頭節點,
        //   或者是普通執行緒節點,但是此節點既然是頭節點了,那麼代表已經被喚醒了,阻塞佇列沒有其他節點了
        // 所以這兩種情況不需要進行喚醒後繼節點
        if (h != null && h != tail) {
            int ws = h.waitStatus;
               //後面的節點會把前置節點設定為Node.SIGNAL
            if (ws == Node.SIGNAL) {
                    //1
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                    // 喚醒 head 的後繼節點,也就是阻塞佇列中的第一個節點
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                        //2
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
          //3 如果被喚醒的節點已經佔領head了,那麼繼續迴圈,否則跳出迴圈
        if (h == head)                   // loop if head changed
            break;
    }
}
  1. unparkSuccessor這裡會喚醒下一個節點,那麼下一個節點也會呼叫setHeadAndPropagate進行搶佔頭節點;如果同時有當前執行緒和被喚醒的下一個執行緒同時走到這裡,那麼只會有一個成功,另一個返回false的就不進行喚醒操作
  2. 這裡CAS失敗的原因可能是一個新的節點入隊,然後將前置節點設值為了Node.SIGNAL,所以導致當前的CAS失敗
  3. 如果被喚醒的節點搶佔頭節點成功,那麼h == head 就不成立,那麼會進行下一輪的迴圈,否則就是head沒有被搶佔成功

AQS#unparkSuccessor

private void unparkSuccessor(Node node) {
    //如果當前節點小於零,那麼作為頭節點要被清除一下狀態
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    // 下面的程式碼就是喚醒後繼節點,但是有可能後繼節點取消了等待
    // 從隊尾往前找,找到waitStatus<=0的所有節點中排在最前面的
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
          // 喚醒執行緒
        LockSupport.unpark(s.thread);
}

到這裡加讀鎖的程式碼就講解完畢了

讀鎖釋放

//ReadLock
public void unlock() {
    sync.releaseShared(1);
}
// Sync
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared(); 
        return true;
    }
    return false;
}

我們先看tryReleaseShared

Sync#tryReleaseShared

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
      //如果當前是firstReader,那麼需要進行重置或重入減一
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
          // 判斷 cachedHoldCounter 是否快取的是當前執行緒,不是的話要到 ThreadLocal 中取
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
               // 這一步將 ThreadLocal remove 掉,防止記憶體洩漏。因為已經不再持有讀鎖了
            readHolds.remove();
               //unlock了幾次的話會拋異常
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    for (;;) {
        int c = getState();
           // nextc 是 state 高 16 位減 1 後的值
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // 如果 nextc == 0,那就是 state 全部 32 位都為 0,也就是讀鎖和寫鎖都空了
            // 此時這裡返回 true 的話,其實是幫助喚醒後繼節點中的獲取寫鎖的執行緒
            return nextc == 0;
    }
}

這個讀鎖的釋放,主要就是將 hold count 減 1,如果減到 0 的話,還要將 ThreadLocal 中的 remove 掉。然後是在 for 迴圈中將 state 的高 16 位減 1,如果發現讀鎖和寫鎖都釋放光了,那麼喚醒後繼的獲取寫鎖的執行緒,因為只有讀鎖是不會被阻塞的,所以等待的執行緒只可能是寫鎖的執行緒。

寫鎖的獲取

//WriteLock
public void lock() {
    sync.acquire(1);
}
//sync
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

//AQS
protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    int c = getState();
      //獲取state的低16位
    int w = exclusiveCount(c);
      //不為零說明讀鎖或寫鎖被持有了
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)
        // 看下這裡返回 false 的情況:
        //   c != 0 && w == 0: 寫鎖可用,但是有執行緒持有讀鎖(也可能是自己持有)
        //   c != 0 && w !=0 && current != getExclusiveOwnerThread(): 其他執行緒持有寫鎖
        //   也就是說,只要有讀鎖或寫鎖被佔用,這次就不能獲取到寫鎖
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
          // 這裡不需要 CAS,仔細看就知道了,能到這裡的,只可能是寫鎖重入,不然在上面的 if 就攔截了
        setState(c + acquires);
        return true;
    }
      //檢查寫鎖是否需要block
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
      //走到這裡說明寫鎖不需要block,並且CAS成功了
    setExclusiveOwnerThread(current);
    return true;
}

我們來看看writerShouldBlock

//NonfairSync
final boolean writerShouldBlock() {
    return false; // writers can always barge
}
//FairSync
final boolean writerShouldBlock() {
    return hasQueuedPredecessors();
}

如果是非公平模式,那麼 lock 的時候就可以直接用 CAS 去搶鎖,搶不到再排隊

如果是公平模式,那麼如果阻塞佇列有執行緒等待的話,就乖乖去排隊

寫鎖釋放

public void unlock() {
    sync.release(1);
}

//sync
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
          //如果獨佔鎖釋放"完全",喚醒後繼節點
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

//Sync
protected final boolean tryRelease(int releases) {
      //檢查一下持有所的執行緒是不是當前執行緒
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
      //將state減1
    int nextc = getState() - releases;
    //檢視低16位是否為0
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
          //如果為0,那麼說明寫鎖釋放
        setExclusiveOwnerThread(null);
      //設定狀態
    setState(nextc);
    return free;
}