1. 程式人生 > >【死磕Java併發】—–J.U.C之AQS(一篇就夠了)

【死磕Java併發】—–J.U.C之AQS(一篇就夠了)

作者:大明哥 
原文地址:http://cmsblogs.com

越是核心的東西越是要反覆看,本文篇幅較長,希望各位細細品讀,來回多讀幾遍理解下。

AQS簡介

java的內建鎖一直都是備受爭議的,在JDK 1.6之前,synchronized這個重量級鎖其效能一直都是較為低下,雖然在1.6後,進行大量的鎖優化策略,但是與Lock相比synchronized還是存在一些缺陷的:雖然synchronized提供了便捷性的隱式獲取鎖釋放鎖機制(基於JVM機制),但是它卻缺少了獲取鎖與釋放鎖的可操作性,可中斷、超時獲取鎖,且它為獨佔式在高併發場景下效能大打折扣。

在介紹Lock之前,我們需要先熟悉一個非常重要的元件,掌握了該元件JUC包下面很多問題都不在是問題了。該元件就是AQS。

AQS:AbstractQueuedSynchronizer,即佇列同步器。它是構建鎖或者其他同步元件的基礎框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等),JUC併發包的作者(Doug Lea)期望它能夠成為實現大部分同步需求的基礎。它是JUC併發包中的核心基礎元件。

AQS解決了子類實現同步器時涉及當的大量細節問題,例如獲取同步狀態、FIFO同步佇列。基於AQS來構建同步器可以帶來很多好處。它不僅能夠極大地減少實現工作,而且也不必處理在多個位置上發生的競爭問題。

在基於AQS構建的同步器中,只能在一個時刻發生阻塞,從而降低上下文切換的開銷,提高了吞吐量。同時在設計AQS時充分考慮了可伸縮行,因此J.U.C中所有基於AQS構建的同步器均可以獲得這個優勢。

AQS的主要使用方式是繼承,子類通過繼承同步器並實現它的抽象方法來管理同步狀態。

AQS使用一個int型別的成員變數state來表示同步狀態,當state>0時表示已經獲取了鎖,當state = 0時表示釋放了鎖。它提供了三個方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))來對同步狀態state進行操作,當然AQS可以確保對state的操作是安全的。

AQS通過內建的FIFO同步佇列來完成資源獲取執行緒的排隊工作,如果當前執行緒獲取同步狀態失敗(鎖)時,AQS則會將當前執行緒以及等待狀態等資訊構造成一個節點(Node)並將其加入同步佇列,同時會阻塞當前執行緒,當同步狀態釋放時,則會把節點中的執行緒喚醒,使其再次嘗試獲取同步狀態。

AQS主要提供瞭如下一些方法:

  • getState():返回同步狀態的當前值;

  • setState(int newState):設定當前同步狀態;

  • compareAndSetState(int expect, int update):使用CAS設定當前狀態,該方法能夠保證狀態設定的原子性;

  • tryAcquire(int arg):獨佔式獲取同步狀態,獲取同步狀態成功後,其他執行緒需要等待該執行緒釋放同步狀態才能獲取同步狀態

  • tryRelease(int arg):獨佔式釋放同步狀態;

  • tryAcquireShared(int arg):共享式獲取同步狀態,返回值大於等於0則表示獲取成功,否則獲取失敗;

  • tryReleaseShared(int arg):共享式釋放同步狀態;

  • isHeldExclusively():當前同步器是否在獨佔式模式下被執行緒佔用,一般該方法表示是否被當前執行緒所獨佔;

  • acquire(int arg):獨佔式獲取同步狀態,如果當前執行緒獲取同步狀態成功,則由該方法返回,否則,將會進入同步佇列等待,該方法將會呼叫可重寫的tryAcquire(int arg)方法;

  • acquireInterruptibly(int arg):與acquire(int arg)相同,但是該方法響應中斷,當前執行緒為獲取到同步狀態而進入到同步佇列中,如果當前執行緒被中斷,則該方法會丟擲InterruptedException異常並返回;

  • tryAcquireNanos(int arg,long nanos):超時獲取同步狀態,如果當前執行緒在nanos時間內沒有獲取到同步狀態,那麼將會返回false,已經獲取則返回true;

  • acquireShared(int arg):共享式獲取同步狀態,如果當前執行緒未獲取到同步狀態,將會進入同步佇列等待,與獨佔式的主要區別是在同一時刻可以有多個執行緒獲取到同步狀態;

  • acquireSharedInterruptibly(int arg):共享式獲取同步狀態,響應中斷;

  • tryAcquireSharedNanos(int arg, long nanosTimeout):共享式獲取同步狀態,增加超時限制;

  • release(int arg):獨佔式釋放同步狀態,該方法會在釋放同步狀態之後,將同步佇列中第一個節點包含的執行緒喚醒;

  • releaseShared(int arg):共享式釋放同步狀態;

CLH同步佇列

CLH同步佇列是一個FIFO雙向佇列,AQS依賴它來完成同步狀態的管理,當前執行緒如果獲取同步狀態失敗時,AQS則會將當前執行緒已經等待狀態等資訊構造成一個節點(Node)並將其加入到CLH同步佇列,同時會阻塞當前執行緒,當同步狀態釋放時,會把首節點喚醒(公平鎖),使其再次嘗試獲取同步狀態。

在CLH同步佇列中,一個節點表示一個執行緒,它儲存著執行緒的引用(thread)、狀態(waitStatus)、前驅節點(prev)、後繼節點(next),其定義如下:

static final class Node {
    /** 共享 */
    static final Node SHARED = new Node();
    /** 獨佔 */
    static final Node EXCLUSIVE = null;
    /**
     * 因為超時或者中斷,節點會被設定為取消狀態,被取消的節點時不會參與到競爭中的,他會一直保持取消狀態不會轉變為其他狀態;
     */
    static final int CANCELLED =  1;
    /**
     * 後繼節點的執行緒處於等待狀態,而當前節點的執行緒如果釋放了同步狀態或者被取消,將會通知後繼節點,使後繼節點的執行緒得以執行
     */
    static final int SIGNAL    = -1;
    /**
     * 節點在等待佇列中,節點執行緒等待在Condition上,當其他執行緒對Condition呼叫了signal()後,改節點將會從等待佇列中轉移到同步佇列中,加入到同步狀態的獲取中
     */
    static final int CONDITION = -2;
    /**
     * 表示下一次共享式同步狀態獲取將會無條件地傳播下去
     */
    static final int PROPAGATE = -3;
    /** 等待狀態 */
    volatile int waitStatus;
    /** 前驅節點 */
    volatile Node prev;
    /** 後繼節點 */
    volatile Node next;
    /** 獲取同步狀態的執行緒 */
    volatile Thread thread;
    Node nextWaiter;
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
    Node() {
    }
    Node(Thread thread, Node mode) {
        this.nextWaiter = mode;
        this.thread = thread;
    }
    Node(Thread thread, int waitStatus) {
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

CLH同步佇列結構圖如下:

入列

學了資料結構的我們,CLH佇列入列是再簡單不過了,無非就是tail指向新節點、新節點的prev指向當前最後的節點,當前最後一個節點的next指向當前節點。程式碼我們可以看看addWaiter(Node node)方法:

    private Node addWaiter(Node mode) {
        //新建Node
        Node node = new Node(Thread.currentThread(), mode);
        //快速嘗試新增尾節點
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            //CAS設定尾節點
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //多次嘗試
        enq(node);
        return node;
    }

addWaiter(Node node)先通過快速嘗試設定尾節點,如果失敗,則呼叫enq(Node node)方法設定尾節點

    private Node enq(final Node node) {
        //多次嘗試,直到成功為止
        for (;;) {
            Node t = tail;
            //tail不存在,設定為首節點
            if (t == null) {
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                //設定為尾節點
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

在上面程式碼中,兩個方法都是通過一個CAS方法compareAndSetTail(Node expect, Node update)來設定尾節點,該方法可以確保節點是執行緒安全新增的。在enq(Node node)方法中,AQS通過“死迴圈”的方式來保證節點可以正確新增,只有成功新增後,當前執行緒才會從該方法返回,否則會一直執行下去。

過程圖如下:

出列

CLH同步佇列遵循FIFO,首節點的執行緒釋放同步狀態後,將會喚醒它的後繼節點(next),而後繼節點將會在獲取同步狀態成功時將自己設定為首節點,這個過程非常簡單,head執行該節點並斷開原首節點的next和當前節點的prev即可,注意在這個過程是不需要使用CAS來保證的,因為只有一個執行緒能夠成功獲取到同步狀態。

過程圖如下:

同步狀態的獲取與釋放

在前面提到過,AQS是構建Java同步元件的基礎,我們期待它能夠成為實現大部分同步需求的基礎。AQS的設計模式採用的模板方法模式,子類通過繼承的方式,實現它的抽象方法來管理同步狀態,對於子類而言它並沒有太多的活要做,AQS提供了大量的模板方法來實現同步,主要是分為三類:獨佔式獲取和釋放同步狀態、共享式獲取和釋放同步狀態、查詢同步佇列中的等待執行緒情況。自定義子類使用AQS提供的模板方法就可以實現自己的同步語義。

獨佔式

獨佔式,同一時刻僅有一個執行緒持有同步狀態。

獨佔式同步狀態獲取
   acquire(int arg)方法為AQS提供的模板方法,該方法為獨佔式獲取同步狀態,但是該方法對中斷不敏感,也就是說由於執行緒獲取同步狀態失敗加入到CLH同步佇列中,後續對執行緒進行中斷操作時,執行緒不會從同步佇列中移除。程式碼如下:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

各個方法定義如下:

  • tryAcquire:去嘗試獲取鎖,獲取成功則設定鎖狀態並返回true,否則返回false。該方法自定義同步元件自己實現,該方法必須要保證執行緒安全的獲取同步狀態。

  • addWaiter:如果tryAcquire返回FALSE(獲取同步狀態失敗),則呼叫該方法將當前執行緒加入到CLH同步佇列尾部。

  • acquireQueued:當前執行緒會根據公平性原則來進行阻塞等待(自旋),直到獲取鎖為止;並且返回當前執行緒在等待過程中有沒有中斷過。

  • selfInterrupt:產生一箇中斷。

acquireQueued方法為一個自旋的過程,也就是說當前執行緒(Node)進入同步佇列後,就會進入一個自旋的過程,每個節點都會自省地觀察,當條件滿足,獲取到同步狀態後,就可以從這個自旋過程中退出,否則會一直執行下去。

如下:

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            //中斷標誌
            boolean interrupted = false;
            /*
             * 自旋過程,其實就是一個死迴圈而已
             */
            for (;;) {
                //當前執行緒的前驅節點
                final Node p = node.predecessor();
                //當前執行緒的前驅節點是頭結點,且同步狀態成功
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //獲取失敗,執行緒等待--具體後面介紹
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

從上面程式碼中可以看到,當前執行緒會一直嘗試獲取同步狀態,當然前提是隻有其前驅節點為頭結點才能夠嘗試獲取同步狀態,理由:

  • 保持FIFO同步佇列原則。

  • 頭節點釋放同步狀態後,將會喚醒其後繼節點,後繼節點被喚醒後需要檢查自己是否為頭節點。

acquire(int arg)方法流程圖如下:

獨佔式獲取響應中斷
   AQS提供了acquire(int arg)方法以供獨佔式獲取同步狀態,但是該方法對中斷不響應,對執行緒進行中斷操作後,該執行緒會依然位於CLH同步佇列中等待著獲取同步狀態。為了響應中斷,AQS提供了acquireInterruptibly(int arg)方法,該方法在等待獲取同步狀態時,如果當前執行緒被中斷了,會立刻響應中斷丟擲異常InterruptedException。

    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

首先校驗該執行緒是否已經中斷了,如果是則丟擲InterruptedException,否則執行tryAcquire(int arg)方法獲取同步狀態,如果獲取成功,則直接返回,否則執行doAcquireInterruptibly(int arg)。doAcquireInterruptibly(int arg)定義如下:

private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

doAcquireInterruptibly(int arg)方法與acquire(int arg)方法僅有兩個差別。

1.方法宣告丟擲InterruptedException異常。

2.在中斷方法處不再是使用interrupted標誌,而是直接丟擲InterruptedException異常。

獨佔式超時獲取
   AQS除了提供上面兩個方法外,還提供了一個增強版的方法:tryAcquireNanos(int arg,long nanos)。該方法為acquireInterruptibly方法的進一步增強,它除了響應中斷外,還有超時控制。即如果當前執行緒沒有在指定時間內獲取同步狀態,則會返回false,否則返回true。如下:

   public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }

tryAcquireNanos(int arg, long nanosTimeout)方法超時獲取最終是在doAcquireNanos(int arg, long nanosTimeout)中實現的,如下:

  private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        //nanosTimeout <= 0
        if (nanosTimeout <= 0L)
            return false;
        //超時時間
        final long deadline = System.nanoTime() + nanosTimeout;
        //新增Node節點
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            //自旋
            for (;;) {
                final Node p = node.predecessor();
                //獲取同步狀態成功
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                /*
                 * 獲取失敗,做超時、中斷判斷
                 */
                //重新計算需要休眠的時間
                nanosTimeout = deadline - System.nanoTime();
                //已經超時,返回false
                if (nanosTimeout <= 0L)
                    return false;
                //如果沒有超時,則等待nanosTimeout納秒
                //注:該執行緒會直接從LockSupport.parkNanos中返回,
                //LockSupport為JUC提供的一個阻塞和喚醒的工具類,後面做詳細介紹
                if (shouldParkAfterFailedAcquire(p, node) &&
                        nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                //執行緒是否已經中斷了
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

針對超時控制,程式首先記錄喚醒時間deadline ,deadline = System.nanoTime() + nanosTimeout(時間間隔)。如果獲取同步狀態失敗,則需要計算出需要休眠的時間間隔nanosTimeout(= deadline - System.nanoTime()),如果nanosTimeout <= 0 表示已經超時了,返回false,如果大於spinForTimeoutThreshold(1000L)則需要休眠nanosTimeout ,如果nanosTimeout <= spinForTimeoutThreshold ,就不需要休眠了,直接進入快速自旋的過程。原因在於 spinForTimeoutThreshold 已經非常小了,非常短的時間等待無法做到十分精確,如果這時再次進行超時等待,相反會讓nanosTimeout 的超時從整體上面表現得不是那麼精確,所以在超時非常短的場景中,AQS會進行無條件的快速自旋。

整個流程如下:

獨佔式同步狀態釋放
   當執行緒獲取同步狀態後,執行完相應邏輯後就需要釋放同步狀態。AQS提供了release(int arg)方法釋放同步狀態:

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

該方法同樣是先呼叫自定義同步器自定義的tryRelease(int arg)方法來釋放同步狀態,釋放成功後,會呼叫unparkSuccessor(Node node)方法喚醒後繼節點(如何喚醒LZ後面介紹)。

這裡稍微總結下:

在AQS中維護著一個FIFO的同步佇列,當執行緒獲取同步狀態失敗後,則會加入到這個CLH同步佇列的對尾並一直保持著自旋。在CLH同步佇列中的執行緒在自旋時會判斷其前驅節點是否為首節點,如果為首節點則不斷嘗試獲取同步狀態,獲取成功則退出CLH同步佇列。當執行緒執行完邏輯後,會釋放同步狀態,釋放後會喚醒其後繼節點。

共享式

共享式與獨佔式的最主要區別在於同一時刻獨佔式只能有一個執行緒獲取同步狀態,而共享式在同一時刻可以有多個執行緒獲取同步狀態。例如讀操作可以有多個執行緒同時進行,而寫操作同一時刻只能有一個執行緒進行寫操作,其他操作都會被阻塞。

共享式同步狀態獲取
   AQS提供acquireShared(int arg)方法共享式獲取同步狀態:

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            //獲取失敗,自旋獲取同步狀態
            doAcquireShared(arg);
    }

從上面程式可以看出,方法首先是呼叫tryAcquireShared(int arg)方法嘗試獲取同步狀態,如果獲取失敗則呼叫doAcquireShared(int arg)自旋方式獲取同步狀態,共享式獲取同步狀態的標誌是返回 >= 0 的值表示獲取成功。自選式獲取同步狀態如下:

    private void doAcquireShared(int arg) {
        /共享式節點
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //前驅節點
                final Node p = node.predecessor();
                //如果其前驅節點,獲取同步狀態
                if (p == head) {
                    //嘗試獲取同步
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

tryAcquireShared(int arg)方法嘗試獲取同步狀態,返回值為int,當其 >= 0 時,表示能夠獲取到同步狀態,這個時候就可以從自旋過程中退出。

acquireShared(int arg)方法不響應中斷,與獨佔式相似,AQS也提供了響應中斷、超時的方法,分別是:acquireSharedInterruptibly(int arg)、tryAcquireSharedNanos(int arg,long nanos),這裡就不做解釋了。

共享式同步狀態釋放
   獲取同步狀態後,需要呼叫release(int arg)方法釋放同步狀態,方法如下:

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

因為可能會存在多個執行緒同時進行釋放同步狀態資源,所以需要確保同步狀態安全地成功釋放,一般都是通過CAS和迴圈來完成的。

阻塞和喚醒執行緒

線上程獲取同步狀態時如果獲取失敗,則加入CLH同步佇列,通過通過自旋的方式不斷獲取同步狀態,但是在自旋的過程中則需要判斷當前執行緒是否需要阻塞,其主要方法在acquireQueued():

if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;

通過這段程式碼我們可以看到,在獲取同步狀態失敗後,執行緒並不是立馬進行阻塞,需要檢查該執行緒的狀態,檢查狀態的方法為 shouldParkAfterFailedAcquire(Node pred, Node node) 方法,該方法主要靠前驅節點判斷當前執行緒是否應該被阻塞,程式碼如下:

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //前驅節點
        int ws = pred.waitStatus;
        //狀態為signal,表示當前執行緒處於等待狀態,直接放回true
        if (ws == Node.SIGNAL)
            return true;
        //前驅節點狀態 > 0 ,則為Cancelled,表明該節點已經超時或者被中斷了,需要從同步佇列中取消
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } 
        //前驅節點狀態為Condition、propagate
        else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

這段程式碼主要檢查當前執行緒是否需要被阻塞,具體規則如下:

  1. 如果當前執行緒的前驅節點狀態為SINNAL,則表明當前執行緒需要被阻塞,呼叫unpark()方法喚醒,直接返回true,當前執行緒阻塞

  2. 如果當前執行緒的前驅節點狀態為CANCELLED(ws > 0),則表明該執行緒的前驅節點已經等待超時或者被中斷了,則需要從CLH佇列中將該前驅節點刪除掉,直到回溯到前驅節點狀態 <= 0 ,返回false

  3. 如果前驅節點非SINNAL,非CANCELLED,則通過CAS的方式將其前驅節點設定為SINNAL,返回false

如果 shouldParkAfterFailedAcquire(Node pred, Node node) 方法返回true,則呼叫parkAndCheckInterrupt()方法阻塞當前執行緒:

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

parkAndCheckInterrupt() 方法主要是把當前執行緒掛起,從而阻塞住執行緒的呼叫棧,同時返回當前執行緒的中斷狀態。其內部則是呼叫LockSupport工具類的park()方法來阻塞該方法。

當執行緒釋放同步狀態後,則需要喚醒該執行緒的後繼節點:

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                //喚醒後繼節點
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

呼叫unparkSuccessor(Node node)喚醒後繼節點:

    private void unparkSuccessor(Node node) {
        //當前節點狀態
        int ws = node.waitStatus;
        //當前狀態 < 0 則設定為 0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        //當前節點的後繼節點
        Node s = node.next;
        //後繼節點為null或者其狀態 > 0 (超時或者被中斷了)
        if (s == null || s.waitStatus > 0) {
            s = null;
            //從tail節點來找可用節點
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //喚醒後繼節點
        if (s != null)
            LockSupport.unpark(s.thread);
    }

可能會存在當前執行緒的後繼節點為null,超時、被中斷的情況,如果遇到這種情況了,則需要跳過該節點,但是為何是從tail尾節點開始,而不是從node.next開始呢?原因在於node.next仍然可能會存在null或者取消了,所以採用tail回溯辦法找第一個可用的執行緒。最後呼叫LockSupport的unpark(Thread thread)方法喚醒該執行緒。

LockSupport

從上面我可以看到,當需要阻塞或者喚醒一個執行緒的時候,AQS都是使用LockSupport這個工具類來完成的。

LockSupport是用來建立鎖和其他同步類的基本執行緒阻塞原語

每個使用LockSupport的執行緒都會與一個許可關聯,如果該許可可用,並且可在程序中使用,則呼叫park()將會立即返回,否則可能阻塞。如果許可尚不可用,則可以呼叫 unpark 使其可用。但是注意許可不可重入,也就是說只能呼叫一次park()方法,否則會一直阻塞。

LockSupport定義了一系列以park開頭的方法來阻塞當前執行緒,unpark(Thread thread)方法來喚醒一個被阻塞的執行緒。如下:

park(Object blocker)方法的blocker引數,主要是用來標識當前執行緒在等待的物件,該物件主要用於問題排查和系統監控。

park方法和unpark(Thread thread)都是成對出現的,同時unpark必須要在park執行之後執行,當然並不是說沒有不呼叫unpark執行緒就會一直阻塞,park有一個方法,它帶了時間戳(parkNanos(long nanos):為了執行緒排程禁用當前執行緒,最多等待指定的等待時間,除非許可可用)。

park()方法的原始碼如下:

    public static void park() {
        UNSAFE.park(false, 0L);
    }

unpark(Thread thread)方法原始碼如下:

    public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }

從上面可以看出,其內部的實現都是通過UNSAFE(sun.misc.Unsafe UNSAFE)來實現的,其定義如下:

public native void park(boolean var1, long var2);
public native void unpark(Object var1);

兩個都是native本地方法。Unsafe 是一個比較危險的類,主要是用於執行低級別、不安全的方法集合。儘管這個類和所有的方法都是公開的(public),但是這個類的使用仍然受限,你無法在自己的java程式中直接使用該類,因為只有授信的程式碼才能獲得該類的例項。

參考資料

Doug Lea:《Java併發程式設計實戰》
方騰飛:《Java併發程式設計的藝術》