1. 程式人生 > >J.U.C--locks--AQS分析

J.U.C--locks--AQS分析

特征 clu 類型 spa .text south 容器 指針 tex

看一下AbstractQueuedSynchronizer(下面簡稱AQS)的子類就行知道,J.U.C中宣傳的封裝良好的同步工具類Semaphore、CountDownLatch、ReentrantLock、ReentrantReadWriteLock、FutureTask等盡管各自都有不同特征,可是其內部的實現都與AQS分不開。

所以分析AQS的實現原理對其余顯示鎖或則同步工具類的理解非常重要。

1.主要屬性和內部類

這一篇blog主要分析AQS的實現中的重要屬性和內部類。尤其是對於ReentrantLock和ReentrantReadWriteLock。其lock()方法和unlock()方法的實現終於都是由AQS同步器實現的。由此可見分析AQS類的重要性可見一斑。

在AQS中,我們先看屬性遠比看方法來的更加easy理解這個類的作用。首先看AQS類的主要屬性:

//等待隊列的頭指針
private transient volatile Node head;
//等待隊列的尾指針
private transient volatile Node tail;
//同步器的狀態位,註意這裏state是聲明了volatile。保證了可視性
private volatile int state;

凝視事實上已經告訴我們了。Node類型的 head 和 tail 是一個FIFO的wait queue。一個int類型的狀態位state。到這裏也能猜到AQS對外呈現(或者說聲明)的主要行為就是由一個狀態位和一個有序隊列來配合完畢。

state屬性

對於state狀態的管理,在AQS中僅僅通過三個方法來實現:

java.util.concurrent.locks.AbstractQueuedSynchronizer.getState();

java.util.concurrent.locks.AbstractQueuedSynchronizer.setState(int);

java.util.concurrent.locks.AbstractQueuedSynchronizer.compareAndSetState(int, int);

前面兩個函數事實上就是get和set方法。

第三個函數事實上是通過Unsafe類實現CAS設置狀態值,CAS+volatile 保證了state變量的線程安全。

Node結點

前面還提到了同步器的實現還依賴於一個FIFO的隊列。隊列中的元素Node就是保存著線程引用和線程狀態的容器,每一個線程對同步器的訪問。都可以看做是隊列中的一個節點。

Node類的源代碼不多,我直接所有粘貼出來:

static final class Node {

    static final Node SHARED = new Node();

    static final Node EXCLUSIVE = null;

    static final int CANCELLED =  1;

    static final int SIGNAL    = -1;

    static final int CONDITION = -2;

    static final int PROPAGATE = -3;

    volatile int waitStatus;

    volatile Node prev;

    volatile Node next;

    volatile Thread thread;

    Node nextWaiter;

    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
    /** 構造器 */
    Node() {    // Used to establish initial head or SHARED marker
    }
    /** 構造器 */
    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }
    /** 構造器 */
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

Node類主要有5個屬性:

volatile int waitStatus;//
volatile Node prev;//
volatile Node next;//
volatile Thread thread;//
Node nextWaiter;//

以上五個成員變量主要負責保存該節點的線程引用,同步等待隊列(下面簡稱sync隊列)的前驅和後繼節點。同一時候也包括了同步狀態。
對這5個變量的解釋例如以下:

屬性名稱 描寫敘述
int waitStatus 表示節點的狀態。當中包括的狀態有:
1.CANCELLED,值為1,表示當前的線程被取消。
2.SIGNAL,值為-1。表示當前節點的後繼節點包括的線程須要執行,也就是unpark;
3.CONDITION,值為-2。表示當前節點在等待condition,也就是在condition隊列中;
4.PROPAGATE,值為-3。表示當前場景下興許的acquireShared可以得以執行;
5.值為0,表示當前節點在sync隊列中。等待著獲取鎖。
Node prev 前驅節點。比方當前節點被取消,那就須要前驅節點和後繼節點來完畢連接。
Node next 後繼節點。

Node nextWaiter 存儲condition隊列中的後繼節點。
Thread thread 入隊列時的當前線程。



節點成為sync隊列和condition隊列構建的基礎,在同步器中就包括了sync隊列。

同步器擁有三個成員變量:sync隊列的頭結點head、sync隊列的尾節點tail和狀態state。對於鎖的獲取。請求形成節點,將其掛載在尾部。而鎖資源的轉移(釋放再獲取)是從頭部開始向後進行。對於同步器維護的狀態state,多個線程對其的獲取將會產生一個鏈式的結構。

技術分享

2.重要函數的源代碼解析

獲取鎖相關函數

acquire(int arg);//以獨占模式獲取對象,忽略中斷。
acquireInterruptibly(int arg);//以獨占模式獲取對象。假設被中斷則中止。
acquireShared(int arg);//以共享模式獲取對象,忽略中斷。
acquireSharedInterruptibly(int arg);//以共享模式獲取對象,假設被中斷則中止。

tryAcquire(int arg);//試圖在獨占模式下獲取對象狀態。
tryAcquireNanos(int arg, long nanosTimeout);//試圖以獨占模式獲取對象,假設被中斷則中止。假設到了給定超時時間,則會失敗。
tryAcquireShared(int arg);//試圖在共享模式下獲取對象狀態。
tryAcquireSharedNanos(int arg, long nanosTimeout);//試圖以共享模式獲取對象,假設被中斷則中止。假設到了給定超時時間。則會失敗。

釋放鎖相關函數

release(int arg);//以獨占模式釋放對象。
releaseShared(int arg);//以共享模式釋放對象

tryRelease(int arg);//試圖設置狀態來反映獨占模式下的一個釋放。
tryReleaseShared(int arg);//試圖設置狀態來反映共享模式下的一個釋放。

1)acquire(int arg)函數

首先看看Javadoc的定義:

以獨占模式獲取對象,忽略中斷。

通過至少調用一次 tryAcquire(int) 來實現此方法,並在成功時返回。否則在成功之前,一直調用 tryAcquire(int) 將線程加入隊列,線程可能反復被堵塞或不被堵塞。可以使用此方法來實現 Lock.lock() 方法。

可知該函數是以獨占模式獲取對象而且忽略中斷,完畢synchronized語義。

在AQS類中的源代碼例如以下:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

該函數主要完畢的邏輯例如以下:
1)首先調用tryAcquire(arg)函數嘗試獲取;
嘗試更改狀態state的值,而且保證原子性。

在tryAcquire方法中使用了同步器提供的對state操作的方法。利用compareAndSet保證僅僅有一個線程可以對狀態進行成功改動,而沒有成功改動的線程將進入sync隊列排隊。

值得註意的是這個函數在AQS中並沒有實現,而是在其繼承子類中實現(比方在ReentrantLock類中的內部類中NonfairSync和FairSync中均實現了這種方法)。

當獲取成功時,就會返回true,這時源代碼中的if語句就會直接執行if(0),也就是不滿足執行條件。

2)假設獲取不到,將當前線程構造成節點Node並加入sync隊列。
進入隊列的每一個線程都是一個節點Node,從而形成了一個雙向隊列,相似CLH隊列,這樣做的目的是線程間的通信會被限制在較小規模(也就是兩個節點左右)。

3)再次嘗試獲取。假設沒有獲取到那麽將當前線程從線程調度器上摘下。進入等待狀態。


使用LockSupport將當前線程unpark,關於LockSupport興許會具體介紹。

看看addWaiter()方法的邏輯:

凝視解釋的是:通過給定的模式和當前線程創建同步隊列結點。

源代碼例如以下:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 高速嘗試在尾部加入
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //
    enq(node);
    return node;
}

//Inserts node into queue, initializing if necessary. See picture above.
private Node enq(final Node node) {
    //死循環直至return
    for (;;) {
        Node t = tail;
        //必須初始化的步驟
        if (t == null) { 
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

上面的邏輯解釋:
1)使用當前線程構造Node;
對於一個新創建的節點須要做的是:將新節點前驅節點指向尾節點(current.prev = tail)。尾節點指向它(tail = current),原有的尾節點的後繼節點指向它(t.next = current)而這些操作要求是原子的。上面的操作是利用尾節點的設置來保證的,也就是compareAndSetTail來完畢的。

2)先行嘗試在隊尾加入;
假設尾節點已經有了。然後做例如以下操作:
(1)分配引用pred指向尾節點。
(2)調用compareAndSetTail(pred, node)將新節點更新為尾節點;
(3)直接return,返回新插入的結點。

3)假設隊尾加入失敗或者是第一個入隊的節點。
假設是第1個節點,也就是sync隊列沒有初始化,那麽會進入到enq這種方法,進入的線程可能有多個,或者說在addWaiter中沒有成功入隊的線程都將進入enq這種方法。

enq(node)函數的邏輯是確保進入的Node都會有機會順序的加入到sync隊列中,而加入的過程例如以下:
(1)假設尾節點為空,那麽原子化的分配一個頭節點。並將尾節點指向頭節點,這一步是初始化;
(2)然後是反復在addWaiter中做的工作,可是在一個for (;;)的循環中,直到當前節點入隊為止。

至此。addWaiter()方法的邏輯分析完畢。接下來就是分析(final Node node, int arg) 方法的邏輯。

進入sync隊列之後。接下來就是要進行鎖的獲取,或者說是訪問控制了。僅僅有一個線程可以在同一時刻繼續的執行。而其它的進入等待狀態。而每一個線程都是一個獨立的個體,它們自省的觀察。當條件滿足的時候(自己的前驅是頭結點而且原子性的獲取了狀態)。那麽這個線程可以繼續執行。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //獲取前驅結點
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

上述邏輯主要包括:
1. 獲取當前節點的前驅節點;
須要獲取當前節點的前驅節點。而頭結點所相應的含義是當前站有鎖且正在執行。
2. 當前驅節點是頭結點而且可以獲取狀態,代表該當前節點占有鎖;
假設滿足上述條件,那麽代表可以占有鎖。根據節點對鎖占有的含義,設置頭結點為當前節點。
3. 否則進入等待狀態。
假設沒有輪到當前節點執行。那麽將當前線程從線程調度器上摘下。也就是進入等待狀態。

這裏針對acquire做一下總結:
1. 狀態的維護;
須要在鎖定時。須要維護一個狀態(int類型)。而對狀態的操作是原子和非堵塞的,通過同步器提供的對狀態訪問的方法對狀態進行操縱,而且利用compareAndSet來確保原子性的改動。
2. 狀態的獲取;
一旦成功的改動了狀態,當前線程或者說節點,就被設置為頭節點。
3. sync隊列的維護。


在獲取資源未果的過程中條件不符合的情況下(不該自己,前驅節點不是頭節點或者沒有獲取到資源)進入睡眠狀態,停止線程調度器對當前節點線程的調度。


這時引入的一個釋放的問題,也就是說使睡眠中的Node或者說線程獲得通知的關鍵,就是前驅節點的通知,而這一個過程就是釋放。釋放會通知它的後繼節點從睡眠中返回準備執行。
下面的流程圖基本描寫敘述了一次acquire所須要經歷的過程:
技術分享

如上圖所看到的,當中的判定退出隊列的條件,判定條件是否滿足和休眠當前線程就是完畢了自旋spin的過程。

2)release(int arg)

首先看看Javadoc的定義:

以獨占模式釋放對象。假設 tryRelease(int) 返回 true。則通過消除一個或多個線程的堵塞來實現此方法。可以使用此方法來實現 Lock.unlock() 方法
源代碼例如以下:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

在unlock方法的實現中。使用了同步器的release方法。

相對於在之前的acquire方法中可以得出調用acquire,保證可以獲取到鎖(成功獲取狀態),而release則表示將狀態設置回去,也就是將資源釋放,或者說將鎖釋放。

上述邏輯主要包括:
1)嘗試釋放狀態。
tryRelease()函數可以保證原子化的將狀態設置回去。當然須要使用compareAndSet來保證。假設釋放狀態成功過之後。將會進入後繼節點的喚醒過程。
2. 喚醒當前節點的後繼節點所包括的線程。
通過LockSupport的unpark方法將休眠中的線程喚醒,讓其繼續acquire狀態。

private void unparkSuccessor(Node node) {
    // 將狀態設置為同步狀態
    int ws = node.waitStatus;
    if (ws < 0)         
        compareAndSetWaitStatus(node, ws, 0);   
    // 獲取當前節點的後繼節點,假設滿足狀態,那麽進行喚醒操作  
    // 假設沒有滿足狀態,從尾部開始找尋符合要求的節點並將其喚醒     
    Node s = node.next;     
    if (s == null || s.waitStatus &gt; 0) {
        s = null;
        for (Node t = tail; t != null &amp;&amp; t != node; t = t.prev)
            if (t.waitStatus &lt;= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

上述邏輯主要包括,該方法取出了當前節點的next引用,然後對其線程(Node)進行了喚醒。這時就僅僅有一個或合理個數的線程被喚醒。被喚醒的線程繼續進行對資源的獲取與爭奪。

回想整個資源的獲取和釋放過程:
1)在獲取時,維護了一個sync隊列。每一個節點都是一個線程在進行自旋,而根據就是自己是否是首節點的後繼而且可以獲取資源;
2)在釋放時。僅僅須要將資源還回去。然後通知一下後繼節點並將其喚醒。

這裏須要註意,隊列的維護(首節點的更換)是依靠消費者(獲取時)來完畢的,也就是說在滿足了自旋退出的條件時的一刻。這個節點就會被設置成為首節點。

至此AQS基本的兩個函數分析完畢。這兩個函數也是lock()函數和unlock()函數的核心。

J.U.C--locks--AQS分析