1. 程式人生 > >Java並發AQS原理分析(一)

Java並發AQS原理分析(一)

jpg 子類 ole success ces || pro 同步 無法

我們說的AQS就是AbstractQueuedSynchronizer,他在java.util.concurrent.locks包下,這個類是Java並發的一個核心類。第一次知道有這個類是在看可重入鎖ReentrantLock中,在ReentrantLock中有一個內部類Sync繼承於AbstractQueuedSynchronizer,是ReentrantLock的核心實現。在並發包中的鎖幾乎都是基於AQS來構建的,但是在看源碼的時候就會發現他們並沒有直接繼承AbstractQueuedSynchronizer,而是通過內部類Sync實現。

abstract static class Sync extends AbstractQueuedSynchronizer

這裏註意的是AbstractQueuedSynchronizer是一個抽象類,定義了基本的框架。AQS核心是用一個變量state來表示狀態.
AQS也就是AbstractQueuedSynchronizer這個類只是定義了一個隊列管理線程,對於線程的狀態是子類維護的,我們可以理解為師一個同步隊列,當有線程獲取鎖失敗時(多線程爭用資源被阻塞時會進入此隊列),線程會被添加到隊列的隊尾

總結:

  • AQS只是負責管理線程阻塞隊列。
  • 線程的阻塞和喚醒

同步器是實現鎖的關鍵(例如AQS隊列同步器),利用同步器實現鎖的定義。鎖匙面向用戶的,它定義了使用者和鎖交互的接口,但是隱藏了實現的細節。同步器則是鎖的實現,所以他是在鎖的背後默默做著貢獻,用戶不能直接的接觸到他,他簡化了鎖的實現方式,屏蔽了同步狀態管理、線程之間的排隊、等待、喚醒等操作。這樣設計很好的隔離了使用者和實現者關註的領域。

技術分享圖片

上面的表示了隊列的形態,head表示隊列的頭節點,tail表示隊列的尾節點。在源碼中他們的定義使用volatile定義的。使用volatile關鍵字保證了變量在內存中的可見性,詳見:volatile關鍵字解析。保證某個線程在出隊入隊時被其他線程看到。

private transient volatile Node head;//頭節點
private transient volatile Node tail;//尾節點

AbstractQueuedSynchronizer這個類中還有一個內部類Node,用於構建隊列元素的節點類。


在AQS中定義了兩種資源共享方式:

  • Exclusive:獨占式
  • Share:共享式

    當以獨占模式獲取時,嘗試通過其他線程獲取不能成功。 多線程獲取的共享模式可能(但不需要)成功。 當共享模式獲取成功時,下一個等待線程(如果存在)也必須確定它是否也可以獲取。 在不同模式下等待的線程共享相同的FIFO隊列。

在不同的實現類中為了實現不同的功能,會采用不同的共享方式,例如可重入鎖ReentrantLock采用的就是獨占鎖。
AQS的不同實現類,不需要關註線程等待隊列的維護和管理(線程阻塞入隊、喚醒出隊),在AQS中這些是已經定義好的,不同的同步器只需要對以下方法進行實現即可:

//獨占方式嘗試獲取資源
protected boolean tryAcquire(int arg)
//獨占方式嘗試釋放資源
protected boolean tryRelease(int arg)
//共享方式嘗試獲取資源,返回值0表示成功但是沒有剩余資源,負數表示失敗,正數表示成功且有剩余資源
protected int tryAcquireShared(int arg)
//共享方式嘗試釋放資源
protected boolean tryReleaseShared(int arg)

所有自定義的同步器只需要確定自己是那種資源貢獻方式即可:共享式、獨占式。也可以同時實現共享式和獨占式ReentrantReadWriteLock讀寫鎖,多個線程可以同時進行讀操作,但是只能有一個線程進行寫操作。


獨占模式同步狀態獲取:

首先先從代碼開始執行的地方看:

以獨占模式獲取資源,忽略中斷。(如果獲取到資源,直接返回結果,否則進入等待隊列,等待再次獲取資源。) 通過調用至少一次tryAcquire(int)實現,成功返回。 否則線程排隊,可能會重復阻塞和解除阻塞,直到成功才調用tryAcquire(int)

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

方法執行的順序:

  • 調用tryAcquire()方法嘗試去獲取資源,具體在子類中進行實現
  • 調用addWaiter()方法把當前線程標記為獨占式,並加入到隊列的尾部

這裏需要講一下addWaiter()方法中的第一個參數,線程等待隊列中的元素都是利用Node這個內部類存儲的,在Node中有兩個成員變量分別聲明了資源共享方式:

        static final Node SHARED = new Node();//共享式
        static final Node EXCLUSIVE = null;//獨占式
  • 調用acquireQueued()方法,讓線程在隊列中等待獲取資源,獲取資源後返回,如果在這個等待過程中線程被中斷過,返回true,否則返回false

在方法中首先調用tryAcquire(int)方法,該方法在AbstractQueuedSynchronizer並沒有實現,需要子類去實現:

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

第二步調用addWaiter()方法:該方法是負責維護線程等待隊列的方法,所以在AbstractQueuedSynchronizer中實現了該方法:具體是創建了一個節點類,把節點放在隊尾,如果失敗調用enq(node)方法(隊尾節點為空)。

addWaiter()方法:

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

上面的方法判斷,如果添加到隊尾失敗
enq()方法:

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            //如果隊列為空(隊尾元素為空)創建節點添加進去
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    //把tail指向head
                    tail = head;
            } else {
                //正常添加到隊尾
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

在上面的代碼中添加節點都用到了比較和交換(CAS,可以說是一種在並發環境下的解決方法),compareAndSetTail()方法能夠確保節點能被安全的添加進隊列中,在多線程環境下無法保證一個元素被正確的添加到隊列的尾部。因為進入隊列的元素都是放在隊尾的,為了保證數據的正確性,所以在設置尾節點的時候使用CAS
第三步調用acquireQueued()方法,目的是為了在隊列中等待被喚醒使用資源,因為之前的操作失敗後,線程會被放入隊尾,隊列是先進先出的結構,所以在隊尾的線程必須等待被喚醒。方法中主要有一個死循環,我們稱他叫自旋,只有當條件滿足的時候,獲得同步狀態,退出自旋。
acquireQueued()方法:

final boolean acquireQueued(final Node node, int arg) {
        //設置成功標記
        boolean failed = true;
        try {
            //設置中斷標記
            boolean interrupted = false;
            for (;;) {
                //獲得node的前驅節點
                final Node p = node.predecessor();
                //判斷前驅結點是否是頭節點
                if (p == head && tryAcquire(arg)) {
                    //把node設置為頭結點
                    setHead(node);
                    //把p節點的前驅設置為null,見下面的解釋
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //判斷是否繼續等待
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

把p節點的前驅設置為null,也就是之前的head節點,在上面源碼中後面的註釋標記為help GC功能,解釋一下:在調用上面的setHead()方法的時候,方法的內部已經將當前節點的前驅結點設置為null,在這裏再次設置一遍,為了保證當前節點的前驅結點順利被回收(當前節點設置為頭節點,那麽之前的頭節點就要被釋放,模擬一個正常的出隊過程)。自己畫圖更好理解。

setHead()方法:

    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

這裏分析上面調用的acquireQueued()方法

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //獲取前驅節點的狀態
        int ws = pred.waitStatus;
        //如果當前節點狀態值為SIGNAL這個值,代表當前線程應該被掛起,等待被喚醒
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            //如果大於0代表將當前節點的前驅節點移除
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //小於0時把前驅結點狀態值設置為SIGNAL,目的是為了前驅判斷後將當前節點掛起(通知自己一下)
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

在這裏我們需要看一下Node這個類中定義的關於狀態值的定義:

        //表示線程已取消,作廢狀態
        static final int CANCELLED =  1;
        //表示後繼節點應該等待當前節點釋放資源後喚醒其後繼節點
        static final int SIGNAL    = -1;
        //表示當前正處於等待狀態
        static final int CONDITION = -2;
        //表示狀態需要向後傳播
        static final int PROPAGATE = -3;
  • CANCELLED 取消狀態
  • SIGNAL 等待觸發狀態
  • CONDITION 等待條件狀態
  • PROPAGATE 狀態需要向後傳播

等待隊列是FIFO先進先出,只有前一個節點的狀態為SIGNAL時,當前節點的線程才能被掛起。 所以在方法調用的時候把前驅結點設置為SIGNAL。
因為前一節點被置為SIGNAL說明後面有線程需要執行,但是還輪不到它後面的線程執行,後面線程一定要找一個前驅節點不為CANCEL的節點,然後把它設置為SIGNAL然後原地掛起,等待喚醒。 因為SIGNAL執行完了會喚醒緊接著的後面一個。


總結:
AQS中定義的acquire()模板方法,具體通過調用子類中的tryAcquire()方法嘗試去獲取資源,成功則返回,失敗調用addWaiter()將當前線程添加到阻塞隊列的隊尾,同時標記為獨占狀態。acquireQueued()方法通過自旋獲取同步狀態(該方法使線程在等待隊列中等待休息,當有機會時嘗試獲取資源),節點嘗試獲取資源的條件是當前節點的前驅節點是頭節點,嘗試獲取到資源後才返回,在整個等待過程中如果發生過中斷,不做響應,在獲取資源後調用selfInterrupt()方法設置中斷。
技術分享圖片


獨占模式下同步狀態的釋放:

上面根據源碼分析了獨占模式下獲得鎖的過程主要調用了模板方法acquire()方法向下分析,接著我們分析它的相反的方法,獨占模式下釋放鎖的過程,還是一個模板方法release()

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            //得到頭節點
            Node h = head;
            //判斷頭節點不為空,狀態值符合條件
            if (h != null && h.waitStatus != 0)
                //喚醒下一個等待線程
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

tryRelease()方法依然需要子類去自己實現

protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

unparkSuccessor()方法:

private void unparkSuccessor(Node node) {
        //獲得當前線程的狀態值
        int ws = node.waitStatus;
        if (ws < 0)
            //小於0時置零
            compareAndSetWaitStatus(node, ws, 0);
        //獲得當前節點的後繼節點
        Node s = node.next;
        //判斷為空和狀態值是否大於0
        if (s == null || s.waitStatus > 0) {
            s = null;
            //從尾節點向前遍歷,需要喚醒的線程通常是存儲在下一個節點中的
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            //喚醒線程
            LockSupport.unpark(s.thread);
    }

unpark()方法喚醒的是等待隊列中最前面的線程,之後會再次執行上面的過程。

總結:在獲取同步狀時,在使用者的角度看在使用鎖時,同步器會維護一個同步隊列,獲取狀態失敗的線程會被加入這個隊列並進行自旋;當該節點的前驅節點是頭節點的時候並且獲得了同步狀態時移出隊列。在釋放的時候,調用tryRelease()釋放並喚醒後繼節點。

Java並發AQS原理分析(一)