1. 程式人生 > >J.U.C之AQS:阻塞和喚醒線程

J.U.C之AQS:阻塞和喚醒線程

smart -i back ont () 而不是 受限 clh blog

此篇博客所有源碼均來自JDK 1.8

在線程獲取同步狀態時如果獲取失敗,則加入CLH同步隊列,通過通過自旋的方式不斷獲取同步狀態,但是在自旋的過程中則需要判斷當前線程是否需要阻塞,其主要方法在acquireQueued():

if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;

通過這段代碼我們可以看到,在獲取同步狀態失敗後,線程並不是立馬進行阻塞,需要檢查該線程的狀態,檢查狀態的方法為 shouldParkAfterFailedAcquire(Node pred, Node node) 方法,該方法主要靠前驅節點判斷當前線程是否應該被阻塞,代碼如下:

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //前驅節點
        int ws = pred.waitStatus;
        //狀態為signal,表示當前線程處於等待狀態,直接放回true
        if (ws == Node.SIGNAL)
            return true;
        //前驅節點狀態 > 0 ,則為Cancelled,表明該節點已經超時或者被中斷了,需要從同步隊列中取消
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } 
        //前驅節點狀態為Condition、propagate
        else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

這段代碼主要檢查當前線程是否需要被阻塞,具體規則如下:

  1. 如果當前線程的前驅節點狀態為SINNAL,則表明當前線程需要被阻塞,調用unpark()方法喚醒,直接返回true,當前線程阻塞
  2. 如果當前線程的前驅節點狀態為CANCELLED(ws > 0),則表明該線程的前驅節點已經等待超時或者被中斷了,則需要從CLH隊列中將該前驅節點刪除掉,直到回溯到前驅節點狀態 <= 0 ,返回false
  3. 如果前驅節點非SINNAL,非CANCELLED,則通過CAS的方式將其前驅節點設置為SINNAL,返回false

如果 shouldParkAfterFailedAcquire(Node pred, Node node) 方法返回true,則調用parkAndCheckInterrupt()方法阻塞當前線程:

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

parkAndCheckInterrupt() 方法主要是把當前線程掛起,從而阻塞住線程的調用棧,同時返回當前線程的中斷狀態。其內部則是調用LockSupport工具類的park()方法來阻塞該方法。

當線程釋放同步狀態後,則需要喚醒該線程的後繼節點:

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
				//喚醒後繼節點
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

調用unparkSuccessor(Node node)喚醒後繼節點:

    private void unparkSuccessor(Node node) {
        //當前節點狀態
        int ws = node.waitStatus;
        //當前狀態 < 0 則設置為 0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        //當前節點的後繼節點
        Node s = node.next;
        //後繼節點為null或者其狀態 > 0 (超時或者被中斷了)
        if (s == null || s.waitStatus > 0) {
            s = null;
            //從tail節點來找可用節點
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //喚醒後繼節點
        if (s != null)
            LockSupport.unpark(s.thread);
    }

可能會存在當前線程的後繼節點為null,超時、被中斷的情況,如果遇到這種情況了,則需要跳過該節點,但是為何是從tail尾節點開始,而不是從node.next開始呢?原因在於node.next仍然可能會存在null或者取消了,所以采用tail回溯辦法找第一個可用的線程。最後調用LockSupport的unpark(Thread thread)方法喚醒該線程。

LockSupport

從上面我可以看到,當需要阻塞或者喚醒一個線程的時候,AQS都是使用LockSupport這個工具類來完成的。

LockSupport是用來創建鎖和其他同步類的基本線程阻塞原語

每個使用LockSupport的線程都會與一個許可關聯,如果該許可可用,並且可在進程中使用,則調用park()將會立即返回,否則可能阻塞。如果許可尚不可用,則可以調用 unpark 使其可用。但是註意許可不可重入,也就是說只能調用一次park()方法,否則會一直阻塞。

LockSupport定義了一系列以park開頭的方法來阻塞當前線程,unpark(Thread thread)方法來喚醒一個被阻塞的線程。如下:

技術分享圖片

park(Object blocker)方法的blocker參數,主要是用來標識當前線程在等待的對象,該對象主要用於問題排查和系統監控。

park方法和unpark(Thread thread)都是成對出現的,同時unpark必須要在park執行之後執行,當然並不是說沒有不調用unpark線程就會一直阻塞,park有一個方法,它帶了時間戳(parkNanos(long nanos):為了線程調度禁用當前線程,最多等待指定的等待時間,除非許可可用)。

park()方法的源碼如下:

    public static void park() {
        UNSAFE.park(false, 0L);
    }

unpark(Thread thread)方法源碼如下:

    public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }

從上面可以看出,其內部的實現都是通過UNSAFE(sun.misc.Unsafe UNSAFE)來實現的,其定義如下:

public native void park(boolean var1, long var2);
public native void unpark(Object var1);

兩個都是native本地方法。Unsafe 是一個比較危險的類,主要是用於執行低級別、不安全的方法集合。盡管這個類和所有的方法都是公開的(public),但是這個類的使用仍然受限,你無法在自己的java程序中直接使用該類,因為只有授信的代碼才能獲得該類的實例。

參考資料

  1. 方騰飛:《Java並發編程的藝術》
  2. LockSupport的park和unpark的基本使用,以及對線程中斷的響應性

J.U.C之AQS:阻塞和喚醒線程