1. 程式人生 > >多執行緒與高併發(六) Lock

多執行緒與高併發(六) Lock

之前學習瞭如何使用synchronized關鍵字來實現同步訪問,Java SE 5之後,併發包中新增了Lock介面(以及相關實現類)用來實現鎖功能,它提供了與synchronized關鍵字類似的同步功能,只是在使用時需要顯式地獲取和釋放鎖。雖然它缺少了(通過synchronized塊或者方法所提供的)隱式獲取釋放鎖的便捷性,但是卻擁有了鎖獲取與釋放的可操作性、可中斷的獲取鎖以及超時獲取鎖等多種synchronized關鍵字所不具備的同步特性。

不同於synchronized是Java語言的關鍵字,是內建特性,Lock不是Java語言內建的,Lock是一個類,通過這個類可以實現同步訪問。而且synchronized同步塊執行完成或者遇到異常是鎖會自動釋放,而lock必須呼叫unlock()方法釋放鎖,因此在finally塊中釋放鎖。

一、 Lock 介面

先看看lock介面定義了哪些方法:

void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();

這裡面lock()、tryLock()、tryLock(long time, TimeUnit unit)和lockInterruptibly()是用來獲取鎖的。這四個方法都是用來獲取鎖的,那有什麼區別呢?

lock()方法是平常使用得最多的一個方法,就是用來獲取鎖。如果鎖已被其他執行緒獲取,則進行等待。

tryLock()方法是有返回值的,它表示用來嘗試獲取鎖,如果獲取成功,則返回true,如果獲取失敗(即鎖已被其他執行緒獲取),則返回false,也就說這個方法無論如何都會立即返回。在拿不到鎖時不會一直在那等待。

tryLock(long time, TimeUnit unit)方法和tryLock()方法是類似的,只不過區別在於這個方法在拿不到鎖時會等待一定的時間,在時間期限之內如果還拿不到鎖,就返回false。如果如果一開始拿到鎖或者在等待期間內拿到了鎖,則返回true。

lockInterruptibly()方法,當通過這個方法去獲取鎖時,如果執行緒正在等待獲取鎖,則這個執行緒能夠響應中斷,即中斷執行緒的等待狀態。也就使說,當兩個執行緒同時通過lock.lockInterruptibly()想獲取某個鎖時,假若此時執行緒A獲取到了鎖,而執行緒B只有在等待,那麼對執行緒B呼叫threadB.interrupt()方法能夠中斷執行緒B的等待過程。

unLock()方法是用來釋放鎖的,這沒什麼特別需要講的。

Condition newCondition() 是用於獲取與lock繫結的等待通知元件,當前執行緒必須獲得了鎖才能進行等待,進行等待時會先釋放鎖,當再次獲取鎖時才能從等待中返回。

Lock接口裡面的方法我們已經知道,接下來實現Lock的類ReentrantLock開始學起,發現ReentrantLock並沒有多少程式碼,另外有一個很明顯的特點是:基本上所有的方法的實現實際上都是呼叫了其靜態記憶體類Sync中的方法,而Sync類繼承了AbstractQueuedSynchronizer(AQS)。

我們先學AQS相關的知識

二、AQS

AQS(以下簡稱同步器)是用來構建鎖和其他同步元件的基礎框架,它的實現主要依賴一個int成員變數來表示同步狀態,通過內建的FIFO佇列來完成排隊工作。

子類通過繼承並實現它的抽象方法來管理同步狀態,通過使用getState,setState以及compareAndSetState這三個方法對同步狀態進行更改。子類推薦被定義為自定義同步元件的靜態內部類,同步器自身沒有實現任何同步介面,它僅僅是定義了若干同步狀態的獲取和釋放方法來供自定義同步元件的使用,同步器既支援獨佔式獲取同步狀態,也可以支援共享式獲取同步狀態,這樣就可以方便的實現不同型別的同步元件。

同步器是實現鎖的關鍵,要實現鎖功能,子類繼承Lock,它定義了使用者與鎖互動的介面,就像上面那幾個介面,但是實現卻是通過同步器,同步器簡化了鎖的實現方式,實現了底層操作,如同步狀態管理,執行緒的排隊,等待和喚醒,而外面使用者去不用關心這些細節。

2.1 同步器的介面

同步器的設計模式是基於模板方法,也就是說,使用者要繼承同步器並重寫指定的方法,隨後將同步器組合在自定義同步器組合定義在自定義同步元件的實現中,並呼叫同步器提供的模板方法,而這些模板方法將會呼叫使用者重寫的方法。總結就是同步器將一些方法開放給子類進行重寫,而同步器給同步元件所提供模板方法又會重新呼叫被子類所重寫的方法

如在AQS中有此方法:

protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

而ReentrantLock中重寫了方法:

那在AQS中的acquire呼叫了這個方法,這就相當於在父類定義了一套模板,這些模板會呼叫一些可重寫的方法,這些可重寫的方法具體的實現放在了子類。

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

這就是模板方法方法的設計思路,如還有疑惑,可以去學習這種設計模式。

下面就是一些可以被重寫的方法:

 

方法名稱描述
protected boolean tryAcquire(int arg) 獨佔式獲取同步狀態,實現該方法需要查詢當前狀態並判斷同步狀態是否符合預期,然後再進行CAS設定同步狀態
protected boolean tryRelease(int arg) 獨佔式釋放同步狀態,等待獲取同步狀態的執行緒將有機會獲取同步狀態
protected int tryAcquireShared(int arg) 共享式獲取同步狀態,返回大於等於0的值,表示獲取成功,反之,獲取失敗
protected boolean tryReleaseShared(int arg) 共享式釋放同步狀態
protected boolean isHeldExclusively() 當前同步器是否在獨佔模式下被執行緒佔用,一般該方法表示是否被當前執行緒獨佔

實現自定義同步元件時,將會呼叫同步器提供的模板方法,這些(部分)模板方法與描述

方法名稱描述
void acquire(int arg) 獨佔式獲取同步狀態,如果當前執行緒獲取同步狀態成功,則由該方法返回,否則,將會進入同步佇列等待,該方法將會呼叫重寫的tryAcquire(int arg)方法
void acquireInterruptibly(int arg) 與acquire(int arg)相同,但是該方法響應中斷,當前執行緒未獲取到同步狀態而進入同步佇列中,如果當前執行緒被中斷,則該方法會丟擲InterruptedException並返回
boolean tryAcquireNanos(int arg, long nanosTimeout) 在void acquireInterruptibly(int arg)的基礎上增加了超時限制,如果當前執行緒在超時時間內沒有獲取到同步狀態,那麼將會返回false,如果獲取到了返回true
void acquireShared(int arg) 共享式的獲取同步狀態,如果當前執行緒未獲取到同步狀態,將會進入同步佇列等待,與獨佔式獲取的主要區別是在同一時刻可以有多個執行緒獲取到同步狀態
void acquireSharedInterruptibly(int arg) 與acquireShared(int arg)相同,該方法響應中斷
boolean tryAcquireSharedNanos(int arg, long nanosTimeout) 在acquireSharedInterruptibly(int arg)基礎上增加了超時限制
boolean release(int arg) 獨佔式的釋放同步狀態,該方法會在釋放同步狀態之後,將同步佇列中第一個節點包含的執行緒喚醒
boolean releaseShared(int arg) 共享式的釋放同步狀態
Collection<Thread> getQueuedThreads() 獲取等待在同步佇列上的執行緒集合

同步器提供的模板方法基本上分為3類:

  1. 獨佔式獲取與釋放同步狀態

  2. 共享式獲取與釋放同步狀態

  3. 查詢同步佇列中的等待執行緒情況。

下面看一個例子:

public class Mutex implements Lock {
 private static class Sync extends AbstractQueuedSynchronizer {
    // Reports whether in locked state
    protected boolean isHeldExclusively() {
        return getState() == 1;
    }

    // Acquires the lock if state is zero
    public boolean tryAcquire(int acquires) {
        assert acquires == 1; // Otherwise unused
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    // Releases the lock by setting state to zero
    protected boolean tryRelease(int releases) {
        assert releases == 1; // Otherwise unused
        if (getState() == 0) throw new IllegalMonitorStateException();
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    // Provides a Condition
    Condition newCondition() {
        return new ConditionObject();
    }

    // Deserializes properly
    private void readObject(ObjectInputStream s)
            throws IOException, ClassNotFoundException {
        s.defaultReadObject();
        setState(0); // reset to unlocked state
    }
}

private final Sync sync = new Sync();

@Override
public void lock() {
    sync.acquire(1);
}

@Override
public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}

@Override
public boolean tryLock() {
    return sync.tryAcquire(1);
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(time));
}

@Override
public void unlock() {
    sync.release(1);
}

@Override
public Condition newCondition() {
    return sync.newCondition();
}
}

這個例子中,獨佔鎖Mutex是一個自定義同步元件,它在同一時刻只允許一個執行緒佔有鎖。Mutex中定義了一個靜態內部類,該內部類繼承了同步器並實現了獨佔式獲取和釋放同步狀態。在tryAcquire(int acquires)方法中,如果經過CAS設定成功(同步狀態設定為1),則代表獲取了同步狀態,而在tryRelease(int releases)方法中只是將同步狀態重置為0。使用者使用Mutex時並不會直接和內部同步器的實現打交道,而是呼叫Mutex提供的方法,在Mutex的實現中,以獲取鎖的lock()方法為例,只需要在方法實現中呼叫同步器的模板方法acquire(int args)即可,當前執行緒呼叫該方法獲取同步狀態失敗後會被加入到同步佇列中等待,這樣就大大降低了實現一個可靠自定義同步元件的門檻。

2.2 同步佇列

同步器依賴內部的同步佇列(一個FIFO雙向佇列)來完成同步狀態的管理,當前執行緒獲取同步狀態失敗時,同步器會將當前執行緒以及等待狀態等資訊構造成為一個節點(Node)並將其加入同步佇列,同時會阻塞當前執行緒,當同步狀態釋放時,會把首節點中的執行緒喚醒,使其再次嘗試獲取同步狀態。

同步佇列中的節點(Node)用來儲存獲取同步狀態失敗的執行緒引用、等待狀態以及前驅和後繼節點。

volatile int waitStatus //節點狀態
volatile Node prev //當前節點/執行緒的前驅節點
volatile Node next; //當前節點/執行緒的後繼節點
volatile Thread thread;//加入同步佇列的執行緒引用
Node nextWaiter;//等待佇列中的下一個節點

看到節點的資料結構,知道這是一個雙向佇列,而在AQS中還存在兩個成員變數:

private transient volatile Node head;
private transient volatile Node tail;

AQS實際上通過頭尾指標來管理同步佇列,同時實現包括獲取鎖失敗的執行緒進行入隊,釋放鎖時對同步佇列中的執行緒進行通知等核心方法。其示意圖如下:

通過對原始碼的理解以及做實驗的方式,現在我們可以清楚的知道這樣幾點:

  1. 節點的資料結構,即AQS的靜態內部類Node,節點的等待狀態等資訊;

  2. 同步佇列是一個雙向佇列,AQS通過持有頭尾指標管理同步佇列;

三、 ReentrantLock

重入鎖ReentrantLock,顧名思義,就是支援重進入的鎖,它表示該鎖能夠支援一個執行緒對資源的重複加鎖。除此之外,該鎖的還支援獲取鎖時的公平和非公平性選擇。如果一個鎖不支援可重入,那當一個執行緒呼叫它的lock()方法獲取鎖之後,如果再次呼叫lock()方法,則該執行緒將會被自己所阻塞。

synchronized關鍵字隱式的支援重進入,比如一個synchronized修飾的遞迴方法,在方法執行時,執行執行緒在獲取了鎖之後仍能連續多次地獲得該鎖。ReentrantLock雖然沒能像synchronized關鍵字一樣支援隱式的重進入,但是在呼叫lock()方法時,已經獲取到鎖的執行緒,能夠再次呼叫lock()方法獲取鎖而不被阻塞。

3.1 實現可重入性

重進入是指任意執行緒在獲取到鎖之後能夠再次獲取該鎖而不會被鎖所阻塞,該特性的實現需要解決以下兩個問題。

  1. 執行緒再次獲取鎖。鎖需要去識別獲取鎖的執行緒是否為當前佔據鎖的執行緒,如果是,則再次成功獲取。

  2. 鎖的最終釋放。執行緒重複n次獲取了鎖,隨後在第n次釋放該鎖後,其他執行緒能夠獲取到該鎖。鎖的最終釋放要求鎖對於獲取進行計數自增,計數表示當前鎖被重複獲取的次數,而鎖被釋放時,計數自減,當計數等於0時表示鎖已經成功釋放。

ReentrantLock是通過組合自定義同步器來實現鎖的獲取與釋放,以非公平性(預設的)實現為例

核心方法為nonfairTryAcquire:

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    //1. 如果該鎖未被任何執行緒佔有,該鎖能被當前執行緒獲取
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //2.若被佔有,檢查佔有執行緒是否是當前執行緒
    else if (current == getExclusiveOwnerThread()) {
        // 3. 再次獲取,計數加一
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

該方法增加了再次獲取同步狀態的處理邏輯:通過判斷當前執行緒是否為獲取鎖的執行緒來決定獲取操作是否成功,如果是獲取鎖的執行緒再次請求,則將同步狀態值進行增加並返回true,表示獲取同步狀態成功。成功獲取鎖的執行緒再次獲取鎖,只是增加了同步狀態值,這也就要求ReentrantLock在釋放同步狀態時減少同步狀態值。

protected final boolean tryRelease(int releases) {
    //1. 同步狀態減1
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        //2. 只有當同步狀態為0時,鎖成功被釋放,返回true
        free = true;
        setExclusiveOwnerThread(null);
    }
    // 3. 鎖未被完全釋放,返回false
    setState(c);
    return free;
}

如果該鎖被獲取了n次,那麼前(n-1)次tryRelease(int releases)方法必須返回false,而只有同步狀態完全釋放了,才能返回true。可以看到,該方法將同步狀態是否為0作為最終釋放的條件,當同步狀態為0時,將佔有執行緒設定為null,並返回true,表示釋放成功。

3.2 公平與非公平獲取鎖的區別

公平鎖和非公平鎖。何謂公平性,是針對獲取鎖而言的,如果一個鎖是公平的,那麼鎖的獲取順序就應該符合請求上的絕對時間順序,滿足FIFO,ReentrantLock的構造方法無參時是構造非公平鎖

public ReentrantLock() {
    sync = new NonfairSync();
}

另外還提供了另外一種方式,可傳入一個boolean值,true時為公平鎖,false時為非公平鎖

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

在上面非公平鎖獲取時(nonfairTryAcquire方法)只是簡單的獲取了一下當前狀態做了一些邏輯處理,並沒有考慮到當前同步佇列中執行緒等待的情況。我們來看看公平鎖的處理邏輯是怎樣的,核心方法為:

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
  }
}

這段程式碼的邏輯與nonfairTryAcquire基本上一直,唯一的不同在於增加了hasQueuedPredecessors的邏輯判斷,方法名就可知道該方法用來判斷當前節點在同步佇列中是否有前驅節點的判斷,如果有前驅節點說明有執行緒比當前執行緒更早的請求資源,根據公平性,當前執行緒請求資源失敗。如果當前節點沒有前驅節點的話,再才有做後面的邏輯判斷的必要性。公平鎖每次都是從同步佇列中的第一個節點獲取到鎖,而非公平性鎖則不一定,有可能剛釋放鎖的執行緒能再次獲取到鎖。

公平鎖 VS 非公平鎖

  1. 公平鎖每次獲取到鎖為同步佇列中的第一個節點,保證請求資源時間上的絕對順序,而非公平鎖有可能剛釋放鎖的執行緒下次繼續獲取該鎖,則有可能導致其他執行緒永遠無法獲取到鎖,造成“飢餓”現象。

  2. 公平鎖為了保證時間上的絕對順序,需要頻繁的上下文切換,而非公平鎖會降低一定的上下文切換,降低效能開銷。因此,ReentrantLock預設選擇的是非公平鎖,則是為了減少一部分上下文切換,保證了系統更大的吞吐量。

四、 ReentrantReadWriteLock

之前學到的鎖都是獨佔鎖,這些鎖在同一時刻只允許一個執行緒進行訪問,而讀寫鎖在同一時刻可以允許多個讀執行緒訪問,但是在寫執行緒訪問時,所有的讀執行緒和其他寫執行緒均被阻塞。讀寫鎖維護了一對鎖,一個讀鎖和一個寫鎖,通過分離讀鎖和寫鎖,使得併發性相比一般的排他鎖有了很大提升。

除了保證寫操作對讀操作的可見性以及併發性的提升之外,讀寫鎖能夠簡化讀寫互動場景的程式設計方式。假設在程式中定義一個共享的用作快取資料結構,它大部分時間提供讀服務(例如查詢和搜尋),而寫操作佔有的時間很少,但是寫操作完成之後的更新需要對後續的讀服務可見。

一般情況下,讀寫鎖的效能都會比排它鎖好,因為大多數場景讀是多於寫的。在讀多於寫的情況下,讀寫鎖能夠提供比排它鎖更好的併發性和吞吐量。Java併發包提供讀寫鎖的實現是ReentrantReadWriteLock。

讀寫鎖主要有以下三個特性:

  1. 公平性選擇:支援非公平性(預設)和公平的鎖獲取方式,吞吐量還是非公平優於公平;

  2. 重入性:支援重入,讀鎖獲取後能再次獲取,寫鎖獲取之後能夠再次獲取寫鎖,同時也能夠獲取讀鎖;

  3. 鎖降級:遵循獲取寫鎖,獲取讀鎖再釋放寫鎖的次序,寫鎖能夠降級成為讀鎖

4.1 讀寫鎖的使用

ReadWriteLock僅定義了獲取讀鎖和寫鎖的兩個方法,即readLock()方法和writeLock()方法,而其實現——ReentrantReadWriteLock,除了介面方法之外,還提供了一些便於外界監控其內部工作狀態的方法,主要有:

int getReadLockCount()//返回當前讀鎖被獲取的次數。該次數不等於獲取讀鎖的執行緒數,如果一個執行緒連續獲取n次,那麼返回的就是n
int getReadHoldCount()//返回當前執行緒獲取讀鎖的次數
boolean isWriteLocked()//判斷寫鎖是否被獲取
int getWriteHoldCount()//返回當前寫鎖被獲取的次數

讀寫鎖使用:

public class Cache {
    static Map<String, Object> map = new HashMap<>();
    static ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
    static Lock r = reentrantReadWriteLock.readLock();
    static Lock w = reentrantReadWriteLock.writeLock();
    // 獲取一個key對應的value
    public static final Object get(String key) {
        r.lock();
        try {
            return map.get(key);
        } finally {
            r.unlock();
        }
    }
    // 設定key對應的value,並返回舊的value
    public static final Object put(String key, Object value) {
        w.lock();
        try {
            return map.put(key, value);
        } finally {
            w.unlock();
        }
    }
    // 清空所有的內容
    public static final void clear() {
        w.lock();
        try {
            map.clear();
        } finally {
            w.unlock();
        }
    }
}

Cache組合一個非執行緒安全的HashMap作為快取的實現,同時使用讀寫鎖的讀鎖和寫鎖來保證Cache是執行緒安全的。在讀操作get(String key)方法中,需要獲取讀鎖,這使得併發訪問該方法時不會被阻塞。寫操作put(String key,Object value)方法和clear()方法,在更新HashMap時必須提前獲取寫鎖,當獲取寫鎖後,其他執行緒對於讀鎖和寫鎖的獲取均被阻塞,而只有寫鎖被釋放之後,其他讀寫操作才能繼續。Cache使用讀寫鎖提升讀操作的併發性,也保證每次寫操作對所有的讀寫操作的可見性,同時簡化了程式設計方式。

4.2 實現原理

再分析下讀寫鎖的實現原理,主要的內容包括:讀寫狀態的設計,寫鎖的獲取與釋放,讀鎖的獲取與釋放以及鎖降級。

讀寫狀態的設計

讀寫鎖同樣依賴自定義同步器來實現同步功能,而讀寫狀態就是其同步器的同步狀態。回想ReentrantLock中自定義同步器的實現,同步狀態表示鎖被一個執行緒重複獲取的次數,而讀寫鎖的自定義同步器需要在同步狀態(一個整型變數)上維護多個讀執行緒和一個寫執行緒的狀態,使得該狀態的設計成為讀寫鎖實現的關鍵。

如果在一個整型變數上維護多種狀態,就一定需要“按位切割使用”這個變數,讀寫鎖將變數切分成了兩個部分,高16位表示讀,低16位表示寫,如圖:

寫鎖的獲取與釋放

寫鎖是一個支援重進入的排它鎖。如果當前執行緒已經獲取了寫鎖,則增加寫狀態。如果當前執行緒在獲取寫鎖時,讀鎖已經被獲取(讀狀態不為0)或者該執行緒不是已經獲取寫鎖的執行緒,則當前執行緒進入等待狀態:

protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    // 1. 獲取寫鎖當前的同步狀態
    int c = getState();
    // 2. 獲取寫鎖獲取的次數
    int w = exclusiveCount(c);
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)
        // 3.1 當讀鎖已被讀執行緒獲取或者當前執行緒不是已經獲取寫鎖的執行緒的話
        // 當前執行緒獲取寫鎖失敗
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        // 3.2 當前執行緒獲取寫鎖,支援可重複加鎖
        setState(c + acquires);
        return true;
    }
    // 3.3 寫鎖未被任何執行緒獲取,當前執行緒可獲取寫鎖
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

寫鎖的釋放與ReentrantLock的釋放過程基本類似,每次釋放均減少寫狀態,當寫狀態為0時表示寫鎖已被釋放,從而等待的讀寫執行緒能夠繼續訪問讀寫鎖,同時前次寫執行緒的修改對後續讀寫執行緒可見。

protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //1. 同步狀態減去寫狀態
    int nextc = getState() - releases;
    //2. 當前寫狀態是否為0,為0則釋放寫鎖
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    //3. 不為0則更新同步狀態
    setState(nextc);
    return free;
}

讀鎖的獲取與釋放

讀鎖是一個支援重進入的共享鎖,它能夠被多個執行緒同時獲取,在沒有其他寫執行緒訪問(或者寫狀態為0)時,讀鎖總會被成功地獲取,而所做的也只是(執行緒安全的)增加讀狀態。如果當前執行緒已經獲取了讀鎖,則增加讀狀態。如果當前執行緒在獲取讀鎖時,寫鎖已被其他執行緒獲取,則進入等待狀態。另外由於要增加一些外部功能,比如getReadHoldCount()方法,作用是返回當前執行緒獲取讀鎖的次數。讀狀態是所有執行緒獲取讀鎖次數的總和,而每個執行緒各自獲取讀鎖的次數只能選擇儲存在ThreadLocal中,由執行緒自身維護,這使獲取讀鎖的實現變得複雜。

protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    //1. 如果寫鎖已經被獲取並且獲取寫鎖的執行緒不是當前執行緒的話,當前
    // 執行緒獲取讀鎖失敗返回-1
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    int r = sharedCount(c);
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        //2. 當前執行緒獲取讀鎖
        compareAndSetState(c, c + SHARED_UNIT)) {
        //3. 下面的程式碼主要是新增的一些功能,比如getReadHoldCount()方法
        //返回當前獲取讀鎖的次數
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    //4. 處理在第二步中CAS操作失敗的自旋已經實現重入性
    return fullTryAcquireShared(current);
}

讀鎖的每次釋放(執行緒安全的,可能有多個讀執行緒同時釋放讀鎖)均減少讀狀態,減少的 值是(1<<16)。

鎖降級

鎖降級指的是寫鎖降級成為讀鎖。如果當前執行緒擁有寫鎖,然後將其釋放,最後再獲取讀鎖,這種分段完成的過程不能稱之為鎖降級。鎖降級是指把持住(當前擁有的)寫鎖,再獲取到讀鎖,隨後釋放(先前擁有的)寫鎖的過程。接下來看一個鎖降級的示例。因為資料不常變化,所以多個執行緒可以併發地進行資料處理,當資料變更後,如果當前執行緒感知到資料變化,則進行資料的準備工作,同時其他處理執行緒被阻塞,直到當前執行緒完成資料的準備工作:

public void processData() {
readLock.lock();
if (!update) {
// 必須先釋放讀鎖
readLock.unlock();
// 鎖降級從寫鎖獲取到開始
writeLock.lock();
try {
if (!update) {
// 準備資料的流程(略)
update = true;
}
readLock.lock();
} finally {
writeLock.unlock();
}
// 鎖降級完成,寫鎖降級為讀鎖
}
try {
// 使用資料的流程(略)
} finally {
readLock.unlock();
}
}

當資料發生變更後,update變數(布林型別且volatile修飾)被設定為false,此時所有訪問processData()方法的執行緒都能夠感知到變化,但只有一個執行緒能夠獲取到寫鎖,其他執行緒會被阻塞在讀鎖和寫鎖的lock()方法上。當前執行緒獲取寫鎖完成資料準備之後,再獲取讀鎖,隨後釋放寫鎖,完成鎖降