1. 程式人生 > >【死磕Java並發】—–J.U.C之AQS(一篇就夠了)

【死磕Java並發】—–J.U.C之AQS(一篇就夠了)

ini tle 循環 針對 可能 width als 如果 boolean

[隱藏目錄]

  • 1 獨占式
    • 1.1 獨占式同步狀態獲取
    • 1.2 獨占式獲取響應中斷
    • 1.3 獨占式超時獲取
    • 1.4 獨占式同步狀態釋放
  • 2 共享式
    • 2.1 共享式同步狀態獲取
    • 2.2 共享式同步狀態釋放
  • 3 參考資料

此篇博客所有源碼均來自JDK 1.8

在前面提到過,AQS是構建Java同步組件的基礎,我們期待它能夠成為實現大部分同步需求的基礎。AQS的設計模式采用的模板方法模式,子類通過繼承的方式,實現它的抽象方法來管理同步狀態,對於子類而言它並沒有太多的活要做,AQS提供了大量的模板方法來實現同步,主要是分為三類:獨占式獲取和釋放同步狀態、共享式獲取和釋放同步狀態、查詢同步隊列中的等待線程情況。自定義子類使用AQS提供的模板方法就可以實現自己的同步語義。

獨占式

獨占式,同一時刻僅有一個線程持有同步狀態。

獨占式同步狀態獲取

acquire(int arg)方法為AQS提供的模板方法,該方法為獨占式獲取同步狀態,但是該方法對中斷不敏感,也就是說由於線程獲取同步狀態失敗加入到CLH同步隊列中,後續對線程進行中斷操作時,線程不會從同步隊列中移除。代碼如下:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt
(); }

各個方法定義如下:

  1. tryAcquire:去嘗試獲取鎖,獲取成功則設置鎖狀態並返回true,否則返回false。該方法自定義同步組件自己實現,該方法必須要保證線程安全的獲取同步狀態。
  2. addWaiter:如果tryAcquire返回FALSE(獲取同步狀態失敗),則調用該方法將當前線程加入到CLH同步隊列尾部。
  3. acquireQueued:當前線程會根據公平性原則來進行阻塞等待(自旋),直到獲取鎖為止;並且返回當前線程在等待過程中有沒有中斷過。
  4. selfInterrupt:產生一個中斷。

acquireQueued方法為一個自旋的過程,也就是說當前線程(Node)進入同步隊列後,就會進入一個自旋的過程,每個節點都會自省地觀察,當條件滿足,獲取到同步狀態後,就可以從這個自旋過程中退出,否則會一直執行下去。如下:

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            //中斷標誌
            boolean interrupted = false;
            /*
             * 自旋過程,其實就是一個死循環而已
             */
            for (;;) {
                //當前線程的前驅節點
                final Node p = node.predecessor();
                //當前線程的前驅節點是頭結點,且同步狀態成功
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //獲取失敗,線程等待--具體後面介紹
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

從上面代碼中可以看到,當前線程會一直嘗試獲取同步狀態,當然前提是只有其前驅節點為頭結點才能夠嘗試獲取同步狀態,理由:

  1. 保持FIFO同步隊列原則。
  2. 頭節點釋放同步狀態後,將會喚醒其後繼節點,後繼節點被喚醒後需要檢查自己是否為頭節點。

acquire(int arg)方法流程圖如下:

技術分享圖片

獨占式獲取響應中斷

AQS提供了acquire(int arg)方法以供獨占式獲取同步狀態,但是該方法對中斷不響應,對線程進行中斷操作後,該線程會依然位於CLH同步隊列中等待著獲取同步狀態。為了響應中斷,AQS提供了acquireInterruptibly(int arg)方法,該方法在等待獲取同步狀態時,如果當前線程被中斷了,會立刻響應中斷拋出異常InterruptedException。

    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

首先校驗該線程是否已經中斷了,如果是則拋出InterruptedException,否則執行tryAcquire(int arg)方法獲取同步狀態,如果獲取成功,則直接返回,否則執行doAcquireInterruptibly(int arg)。doAcquireInterruptibly(int arg)定義如下:

private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

doAcquireInterruptibly(int arg)方法與acquire(int arg)方法僅有兩個差別。1.方法聲明拋出InterruptedException異常,2.在中斷方法處不再是使用interrupted標誌,而是直接拋出InterruptedException異常。

獨占式超時獲取

AQS除了提供上面兩個方法外,還提供了一個增強版的方法:tryAcquireNanos(int arg,long nanos)。該方法為acquireInterruptibly方法的進一步增強,它除了響應中斷外,還有超時控制。即如果當前線程沒有在指定時間內獲取同步狀態,則會返回false,否則返回true。如下:

   public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }

tryAcquireNanos(int arg, long nanosTimeout)方法超時獲取最終是在doAcquireNanos(int arg, long nanosTimeout)中實現的,如下:

  private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        //nanosTimeout <= 0
        if (nanosTimeout <= 0L)
            return false;
        //超時時間
        final long deadline = System.nanoTime() + nanosTimeout;
        //新增Node節點
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            //自旋
            for (;;) {
                final Node p = node.predecessor();
                //獲取同步狀態成功
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                /*
                 * 獲取失敗,做超時、中斷判斷
                 */
                //重新計算需要休眠的時間
                nanosTimeout = deadline - System.nanoTime();
                //已經超時,返回false
                if (nanosTimeout <= 0L)
                    return false;
                //如果沒有超時,則等待nanosTimeout納秒
                //註:該線程會直接從LockSupport.parkNanos中返回,
                //LockSupport為JUC提供的一個阻塞和喚醒的工具類,後面做詳細介紹
                if (shouldParkAfterFailedAcquire(p, node) &&
                        nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                //線程是否已經中斷了
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

針對超時控制,程序首先記錄喚醒時間deadline ,deadline = System.nanoTime() + nanosTimeout(時間間隔)。如果獲取同步狀態失敗,則需要計算出需要休眠的時間間隔nanosTimeout(= deadline – System.nanoTime()),如果nanosTimeout <= 0 表示已經超時了,返回false,如果大於spinForTimeoutThreshold(1000L)則需要休眠nanosTimeout ,如果nanosTimeout <= spinForTimeoutThreshold ,就不需要休眠了,直接進入快速自旋的過程。原因在於 spinForTimeoutThreshold 已經非常小了,非常短的時間等待無法做到十分精確,如果這時再次進行超時等待,相反會讓nanosTimeout 的超時從整體上面表現得不是那麽精確,所以在超時非常短的場景中,AQS會進行無條件的快速自旋。

整個流程如下:

技術分享圖片

獨占式同步狀態釋放

當線程獲取同步狀態後,執行完相應邏輯後就需要釋放同步狀態。AQS提供了release(int arg)方法釋放同步狀態:

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

該方法同樣是先調用自定義同步器自定義的tryRelease(int arg)方法來釋放同步狀態,釋放成功後,會調用unparkSuccessor(Node node)方法喚醒後繼節點(如何喚醒LZ後面介紹)。

這裏稍微總結下:

在AQS中維護著一個FIFO的同步隊列,當線程獲取同步狀態失敗後,則會加入到這個CLH同步隊列的對尾並一直保持著自旋。在CLH同步隊列中的線程在自旋時會判斷其前驅節點是否為首節點,如果為首節點則不斷嘗試獲取同步狀態,獲取成功則退出CLH同步隊列。當線程執行完邏輯後,會釋放同步狀態,釋放後會喚醒其後繼節點。

共享式

共享式與獨占式的最主要區別在於同一時刻獨占式只能有一個線程獲取同步狀態,而共享式在同一時刻可以有多個線程獲取同步狀態。例如讀操作可以有多個線程同時進行,而寫操作同一時刻只能有一個線程進行寫操作,其他操作都會被阻塞。

共享式同步狀態獲取

AQS提供acquireShared(int arg)方法共享式獲取同步狀態:

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            //獲取失敗,自旋獲取同步狀態
            doAcquireShared(arg);
    }

從上面程序可以看出,方法首先是調用tryAcquireShared(int arg)方法嘗試獲取同步狀態,如果獲取失敗則調用doAcquireShared(int arg)自旋方式獲取同步狀態,共享式獲取同步狀態的標誌是返回 >= 0 的值表示獲取成功。自選式獲取同步狀態如下:

    private void doAcquireShared(int arg) {
        /共享式節點
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //前驅節點
                final Node p = node.predecessor();
                //如果其前驅節點,獲取同步狀態
                if (p == head) {
                    //嘗試獲取同步
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

tryAcquireShared(int arg)方法嘗試獲取同步狀態,返回值為int,當其 >= 0 時,表示能夠獲取到同步狀態,這個時候就可以從自旋過程中退出。

acquireShared(int arg)方法不響應中斷,與獨占式相似,AQS也提供了響應中斷、超時的方法,分別是:acquireSharedInterruptibly(int arg)、tryAcquireSharedNanos(int arg,long nanos),這裏就不做解釋了。

共享式同步狀態釋放

獲取同步狀態後,需要調用release(int arg)方法釋放同步狀態,方法如下:

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

因為可能會存在多個線程同時進行釋放同步狀態資源,所以需要確保同步狀態安全地成功釋放,一般都是通過CAS和循環來完成的。

參考資料

Doug Lea:《Java並發編程實戰》
方騰飛:《Java並發編程的藝術》

原文地址:http://cmsblogs.com/?p=2197

【死磕Java並發】—–J.U.C之AQS(一篇就夠了)