1. 程式人生 > >Java 多執行緒與併發(六):AQS

Java 多執行緒與併發(六):AQS

我們前面幾張提到過,JUC 這個包裡面的工具類的底層就是使用 CAS 和 volatile 來保證執行緒安全的,整個 JUC 包裡面的類都是基於它們構建的。今天我們介紹一個非常重要的同步器,這個類是 JDK 在 CAS 和 volatile 的基礎上為我們提供的一個同步工具類。

背景

AbstractQueuedSynchronizer,JDK 1.5 引入了 JUC 包,這個包提供了一些列支援併發的元件,這些元件是一些列同步器,他們主要完成以下功能:

  • 內部狀態的管理和更新,比如表示一個鎖的狀態是獲取還是釋放。
  • 執行緒同步狀態阻塞。
  • 執行緒同步狀態釋放。

AQS 是一個小框架,基於這個框架我們可以實現很多的同步器,ReentrantLock,CountDownLatch,Semaphore 等都是基於 AQS 實現的。

功能

  • 獨佔鎖:每次只有一個執行緒能夠持有鎖,比如前面給大家演示的 ReentrantLock 就是以獨佔方式實現的互斥鎖。
  • 共享鎖:允許多個執行緒同時獲取鎖,併發訪問共享資源,比如 ReentrantReadWriteLock。

設計思想

同步器的核心方法是 acquire 和 release 操作。

acquire

while(當前同步器的狀態不允許獲取操作){

​ 如果當前執行緒不再佇列中,將其加入佇列

​ 阻塞當前執行緒

}

執行緒如果位於佇列中,將其移出佇列

release

更新同步器的狀態

if(新的狀態允許某個被阻塞的執行緒獲取成功)

​ 解除佇列中一個或多個執行緒的阻塞狀態。

從上面的操作思想中我們可以提出三大關鍵操作:同步器狀態變更,執行緒阻塞和釋放,插入和移出佇列。由此可以引申出三個基本元件:

  • 同步器狀態的原子性管理
  • 執行緒阻塞與解除阻塞
  • 佇列的管理

同步狀態
AQS 類使用 int 值來儲存同步狀態,並且暴露出 getState,setState 和 compareAndSet 操作來讀取和更新這個同步狀態。執行緒通過修改(加/減指定的數量)碼是否成功來決定當前執行緒是否成功獲取到同步狀態。

State 被宣告成了 volatile,保證了可見性和有序性。又通過 CAS 指令來實現 compareAndSet ,使得當且僅當同步狀態擁有一個一致的期望值的時候,才會被原子地設定成新值,這樣就保證了同步狀態的原子性。

阻塞
直到 JSR166,阻塞執行緒和解除執行緒阻塞都是基於 Java 的內建管程。

JUC 包使用 LockSupport 類來解決這個問題。LockSupport.park 阻塞當前執行緒直到有 LockSupport.unpark 方法被呼叫。

佇列
整個框架的核心就是如何管理執行緒阻塞佇列,該佇列是嚴格的 FIFO 佇列,因此不支援執行緒優先順序的同步。同步佇列的最佳選擇是自身沒有使用底層鎖來構造的非阻塞資料結構。這裡採用了 CLH 鎖。

CLH佇列實際並不那麼像佇列,它的入隊和出隊與實際的業務密切相關。它是一個連結串列佇列。用過 AQS 的兩個欄位 head(頭節點) 和 tail(尾節點)來存取,這兩個欄位初始化的時候都指向了一個空節點。
入隊操作:

CLH 佇列是 FIFO 佇列,所以新的節點來到的時候,是要插入到當前佇列的尾節點之後。當一個執行緒獲取到同步狀態之後,其他執行緒無法獲取,轉而被構造成節點加入到同步佇列中,而且這個加入佇列的過程必須要保證執行緒安全,因此使用了 CAS方法,它需要傳遞當前執行緒認為的尾節點和當前節點,只有設定成功後,當前節點才正式與之前的尾節點建立關聯。

出隊操作:

因為是 FIFO 佇列,所以能成功獲取到 AQS 同步狀態的必定是首節點,首節點的執行緒在釋放同步狀態時,會喚醒後續節點,而後續節點會在獲取 AQS 同步狀態成功的時候將自己設定為首屆點。設定首節點是由獲取同步成功的執行緒來完成的,所以不需要像入隊這樣的 CAS 操作。

條件佇列

上一節是 AQS 的同步佇列,這一節是條件佇列。AQS 只有一個同步佇列,但是可以有多個條件佇列。AQS 框架提供了一個 ConditionObject 類,給維護獨佔同步的類以及實現 Lock 介面的類使用。

ConditionObject 類 和 AQS 共用了內部節點,有自己單獨的條件佇列。Singal 操作是通過將節點從條件佇列轉移到同步佇列來實現的。
singal:

await:

方法結構

元件 資料結構
同步狀態 volatile int state
阻塞 LockSupport類
佇列 Node節點
條件佇列 ConditionObject

原始碼

我們通過獨佔式同步狀態的釋放和獲取,以及共享式同步狀態的釋放和獲取來看看 AQS 是如何實現的。

獨佔式

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

上述程式碼主要完成了同步狀態的獲取,節點構造,加入同步佇列以及在同步佇列中自旋等待等相關工作。

  1. 呼叫子類實現的 tryAcquire 方法,該方法保證執行緒安全同時獲取同步狀態。
  2. 獲取同步狀態失敗,則構造獨佔式同步節點。
  3. 通過 addWriter 將該節點加入到同步佇列的尾部。
  4. 最後通過 acquireQueued 方法,使得該節點以自選的方式獲取同步狀態。

來看看節點構造和加入佇列的實現:

private Node addWaiter(Node mode) {
        // 當前執行緒構造成Node節點
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        // 嘗試快速在尾節點後新增節點 提升演算法效率 先將尾節點指向pred
        Node pred = tail;
        if (pred != null) {
            //尾節點不為空  當前執行緒節點的前驅節點指向尾節點
            node.prev = pred;
            //併發處理 尾節點有可能已經不是之前的節點 所以需要CAS更新
            if (compareAndSetTail(pred, node)) {
                //CAS更新成功 當前執行緒為尾節點 原先尾節點的後續節點就是當前節點
                pred.next = node;
                return node;
            }
        }
        //第一個入隊的節點或者是尾節點後續節點新增失敗時進入enq
        enq(node);
        return node;
    }
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                //尾節點為空  第一次入隊  設定頭尾節點一致 同步佇列的初始化
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                //所有的執行緒節點在構造完成第一個節點後 依次加入到同步佇列中
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

節點進入同步佇列後,就進入了一個自旋的過程,每個執行緒節點都在自旋地觀察,當條件滿足,獲取到了同步狀態,就可以從自旋過程中退出,否則依舊自旋。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //獲取當前執行緒節點的前驅節點
            final Node p = node.predecessor();
            //前驅節點為頭節點且成功獲取同步狀態
            if (p == head && tryAcquire(arg)) {
                //設定當前節點為頭節點
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //是否阻塞
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

shouldParkAfterFailedAcquire 和 parkAndCheckInterrupt 阻塞執行緒的過程。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //前驅節點的狀態決定後續節點的行為
     int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*前驅節點為-1 後續節點可以被阻塞
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*前驅節點是初始或者共享狀態就設定為-1 使後續節點阻塞
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
private final boolean parkAndCheckInterrupt() {
        //阻塞執行緒
        LockSupport.park(this);
        return Thread.interrupted();
    }


當獲取同步狀態成功之後,對於鎖這種併發元件而言,就意味著當前執行緒獲取到了鎖。

再看 release 方法:

head節點表示獲取鎖成功的節點,當頭結點在釋放同步狀態時,會喚醒後繼節點,如果後繼節點獲得鎖成功,會把自己設定為頭結點,節點的變化過程如下。修改head節點指向下一個獲得鎖的節點,新的獲得鎖的節點,將prev的指標指向null。

public final boolean release(int arg) {
        if (tryRelease(arg)) {//同步狀態釋放成功
            Node h = head;
            if (h != null && h.waitStatus != 0)
                //直接釋放頭節點
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*尋找符合條件的後續節點
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            //喚醒後續節點
            LockSupport.unpark(s.thread);
    }

總結:在獲取同步狀態時,同步器維護一個同步佇列,獲取狀態失敗的執行緒都會被加入到佇列中進行自旋。移除的條件是前驅節點是頭節點並且成功獲取了同步狀態。釋放時,會喚醒頭節點的後繼節點。

應用

ReentrantLock:ReentrantLock 類使用 AQS 同步狀態來儲存鎖重複持有的次數。當鎖被一個執行緒獲取時,ReentrantLock 也會記錄下當前獲得鎖的執行緒表示,以便檢查是否重複獲取。

ReentrantReadWriteLock:ReentrantReadWriteLock 使用 AQS 同步狀態中的 16 為來儲存寫鎖的持有次數,剩下的 16 為來儲存讀鎖的持有次數。WriteLock 的構建方式和 ReentrantLock 一樣。ReadLock 則通過使用 acquireShared 方法來支援同時允許多個讀執行緒。

Semaphore:訊號量使用 AQS 同步狀態來儲存訊號量當前計數。它裡面定義的 acquireShared 方法會減少計數,當計數為非正值時阻塞執行緒。tryRelease 會增加技術,在計數為正值時還要解除執行緒的阻塞。

CountDownLatch:使用 AQS 同步狀態來表示計數。當該計數為 0 時,所有的 acquire 方法才能通過。

FutureTask:使用 AQS 的同步狀態來表示某個非同步計算任務的執行狀態(初始化,執行中,被取消和完成)。設定(FutureTask 的 set 方法)或取消(FutureTask 的 cancel 方法)一個 FutureTask 時會呼叫 AQS 的 release 操作。等待計算結果的執行緒阻塞解除是通過 AQS 的 acquire 實現的。

SynchronousQueues:SynchronousQueues類使用了內部的等待節點,這些節點可以用於協調生產者和消費者。同時,它使用AQS同步狀態來控制當某個消費者消費當前一項時,允許一個生產者繼續生產,反之亦然。

流程圖

  1. 多執行緒併發修改同步狀態,修改成功的執行緒標記為擁有同步狀態。

  2. 獲取失敗的執行緒,加入到同步佇列的隊尾;加入到佇列中後,如果當前節點的前驅節點為頭節點再次嘗試獲取同步狀態(下文程式碼:p == head && tryAcquire(arg))。

  3. 如果頭節點的下一個節點嘗試獲取同步狀態失敗後,會進入等待狀態;其他節點則繼續自旋。

  4. 當執行緒執行完相應邏輯後,需要釋放同步狀態,使後繼節點有機會同步狀態(讓出資源,讓排隊的執行緒使用)。這時就需要呼叫release(int arg)方法。呼叫該方法後,會喚醒後繼節點。

  5. 後繼節點獲取同步狀態成功,頭節點出隊。需要注意的事,出隊操作是間接的,有節點獲取到同步狀態時,會將當前節點設定為head,而原本的head設定為null。

  6. 當同步佇列中頭節點喚醒後繼節點時,此時可能有其他執行緒嘗試獲取同步狀態。

  7. 假設獲取成功,將會被設定為頭節點。

  8. 頭節點後續節點獲取同步狀態失敗。

  9. 共享模式和獨佔模式最主要的區別是在支援同一時刻有多個執行緒同時獲取同步狀態。為了避免帶來額外的負擔,在上文中提到的同步佇列中都是用獨佔模式進行講述,其實同步佇列中的節點應該是獨佔和共享節點並存的。

  10. 共享節點嘗試獲取同步狀態。

  11. 當一個同享節點獲取到同步狀態,並喚醒後面等待的共享狀態的結果如下圖所示:

  12. 最後,獲取到同步狀態的執行緒執行完畢,同步佇列中只有一個獨佔節點:

總結

  1. AQS通過一個int同步狀態碼,和一個(先進先出)佇列來控制多個執行緒訪問資源
  2. 支援獨佔和共享兩種模式獲取同步狀態碼
  3. 當執行緒獲取同步狀態失敗會被加入到同步佇列中
  4. 當執行緒釋放同步狀態,會喚醒後繼節點來獲取同步狀態
  5. 共享模式下的節點獲取到同步狀態或者釋放同步狀態時,不僅會喚醒後繼節點,還會向後傳播,喚醒所有同步節點
  6. 使用volatile關鍵字保證狀態碼線上程間的可見性,CAS操作保證修改狀態碼過程的原子性。