1. 程式人生 > >【搞定Java併發程式設計】第16篇:佇列同步器AQS原始碼分析之獨佔模式

【搞定Java併發程式設計】第16篇:佇列同步器AQS原始碼分析之獨佔模式

AQS系列文章:

1、佇列同步器AQS原始碼分析之概要分析

2、佇列同步器AQS原始碼分析之獨佔模式

3、佇列同步器AQS原始碼分析之共享模式

4、佇列同步器AQS原始碼分析之Condition介面、等待佇列


本文主要講解佇列同步器AQS的獨佔模式:主要分為獨佔式同步狀態獲取(不響應中斷)、獨佔式同步狀態釋放、獨佔式獲取同步狀態(響應中斷)、獨佔式超時獲取同步狀態

目  錄:

1、獨佔式同步狀態獲取:不響應中斷

2、獨佔式同步狀態的釋放

3、獨佔式獲取同步狀態:可響應中斷

4、獨佔式超時獲取同步狀態


1、獨佔式同步狀態獲取:不響應中斷

通過呼叫同步器的acquire(int  arg)方法可以獲取同步狀態,該方法對中斷不敏感,也是由於執行緒獲取同步狀態失敗後進入同步佇列中,後續對執行緒進行中斷操作時,執行緒不會從同步佇列中移除。

// 獨佔式同步狀態獲取與釋放(不響應中斷方式獲取)
public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
        selfInterrupt();
    }
}

上訴程式碼主要完成了同步狀態獲取、節點構造、加入同步佇列以及在同步佇列中自旋等待的相關工作。

其主要邏輯是:首先呼叫自定義同步器實現的tryAcquire(int  arg)方法,該方法保證執行緒安全的獲取同步狀態,如果失敗則構造同步節點(獨佔式Node.EXCLUSIVE,同一時刻只能有一個執行緒成功獲取同步狀態)並通過addWriter(Node node)方法將該節點加入到同步佇列的尾部,最後呼叫acquireQueued(Node node, int arg)方法,使得該節點以“死迴圈”的方式獲取同步狀態。

如果獲取不到則阻塞節點中的執行緒,而被阻塞執行緒的喚醒主要依靠前驅節點的出隊或阻塞執行緒被中斷來實現。

  • 第一步:tryAcquire(int arg):嘗試去獲取同步狀態 / 鎖 
// 嘗試去獲取同步狀態(獨佔模式)
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

tryAcquire(int arg)方法需要子類去覆蓋,重寫裡面的判斷邏輯。如果獲取到了同步狀態則退出返回,否則生成節點加入同步佇列尾部。

  • 第二步:addWaiter(Node mode):將當前執行緒包裝成結點並新增到同步佇列尾部
// 將當前執行緒包裝成結點並新增到同步佇列尾部
private Node addWaiter(Node mode) {
    // 指定持有鎖的模式
    Node node = new Node(Thread.currentThread(), mode);
    // 獲取同步佇列尾結點引用
    Node pred = tail;
    // 如果尾結點不為空, 表明同步佇列已存在結點
    if (pred != null) {
        // 1.指向當前尾結點
        node.prev = pred;
        // 2.設定當前結點為尾結點
        if (compareAndSetTail(pred, node)) {
            // 3.將舊的尾結點的後繼指向新的尾結點
            pred.next = node;
            return node;
        }
    }
    // 否則表明同步佇列還沒有進行初始化
    enq(node);
    return node;
}

// 結點入隊操作
private Node enq(final Node node) {
    for (;;) {
        // 獲取同步佇列尾結點引用
        Node t = tail;
        // 如果尾結點為空說明同步佇列還沒有初始化
        if (t == null) {
            // 初始化同步佇列
            if (compareAndSetHead(new Node())) {
                tail = head;
            }
        } else {
            // 1.指向當前尾結點
            node.prev = t;
            // 2.設定當前結點為尾結點
            if (compareAndSetTail(t, node)) {
                // 3.將舊的尾結點的後繼指向新的尾結點
                t.next = node;
                return t;
            }
        }
    }
}

上訴程式碼通過使用compareAndSetTail(Node expect, Node update)方法來確保節點能夠被執行緒安全新增。

在enq(final  Node  node)方法中,同步器通過“死迴圈”來保證節點的正確新增,在“死迴圈”中只有通過CAS將節點設定成為尾結點之後,當前執行緒才能從該方法返回,否則當前執行緒不斷地嘗試設定。

節點進入同步佇列以後,就進入了一個自旋的過程,每個節點(或者說每個執行緒)都在自省地觀察,當條件滿足,獲取到了同步狀態,就可以從這個自旋過程中退出,否則依舊留在這個自旋過程中。

執行到這一步表示,獲取同步狀態失敗,進入同步佇列中排隊去了,需要說明是當前執行緒獲取同步狀態是獨佔模式還是共享模式,然後將這個執行緒掛起。

  • 第三步:acquireQueued(addWaiter(Node.EXCLUSIVE), arg):以獨佔式獲取同步狀態 / 鎖
// 獨佔式同步狀態獲取
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 獲取給定結點的前繼結點的引用
            final Node p = node.predecessor();
            // 如果當前結點是同步佇列的第一個結點, 就嘗試去獲取鎖
            if (p == head && tryAcquire(arg)) {
                // 將給定結點設定為head結點
                setHead(node);
                // 為了幫助垃圾收集, 將上一個head結點的後繼清空
                p.next = null;
                // 設定獲取成功狀態
                failed = false;
                // 返回中斷的狀態, 整個迴圈執行到這裡才是出口
                return interrupted;
            }
            // 否則說明鎖的狀態還是不可獲取, 這時判斷是否可以掛起當前執行緒
            // 如果判斷結果為真則掛起當前執行緒, 否則繼續迴圈, 在這期間執行緒不響應中斷
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
                interrupted = true;
            }
        }
    } finally {
        // 在最後確保如果獲取失敗就取消獲取
        if (failed) {
            cancelAcquire(node);
        }
    }
}

// 判斷是否可以將當前結點掛起
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 獲取前繼結點的等待狀態
    int ws = pred.waitStatus;
    // 如果前繼結點狀態為SIGNAL, 表明前繼結點會喚醒當前結點, 所以當前結點可以安心的掛起了
    if (ws == Node.SIGNAL) {
        return true;
    }

    if (ws > 0) {
        // 下面的操作是清理同步佇列中所有已取消的前繼結點
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 到這裡表示前繼結點狀態不是SIGNAL, 很可能還是等於0, 這樣的話前繼結點就不會去喚醒當前結點了 
        // 所以當前結點必須要確保前繼結點的狀態為SIGNAL才能安心的掛起自己
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

// 掛起當前執行緒
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

在acquireQueued(final  Node node, int arg)方法中,當前執行緒在“死迴圈”中嘗試獲取同步狀態,而只有前驅節點是頭節點才能夠嘗試獲取同步狀態。當頭節點的執行緒釋放了同步狀態後,將會喚醒其後繼節點,後繼節點的執行緒被喚醒後需要檢查自己的前驅節點是否是頭節點。

整個for迴圈就只有一個出口,那就是等執行緒成功的獲取到同步狀態 / 鎖之後才能出去,在沒有獲取到同步狀態 / 鎖之前就一直是掛在for迴圈的parkAndCheckInterrupt()方法裡。執行緒被喚醒後也是從這個地方繼續執行for迴圈。

節點自旋獲取同步狀態

從上圖中可以看出,節點與節點之間的迴圈檢查的過程中基本上不相互通訊,而是簡單地判斷自己的前驅是否為頭節點,這樣使得節點的釋放規則符合FIFO。

  • 第四步:selfInterrupt():中斷當前執行緒【可選項】
// 當前執行緒將自己中斷
private static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

由於上面整個執行緒一直是掛在for迴圈的parkAndCheckInterrupt()方法裡,沒有成功獲取到鎖之前不會響應任何形式的執行緒中斷,只有當執行緒成功獲取到鎖並從for迴圈出來後,才會檢視在這期間是否有人要求中斷執行緒,如果是的話再去呼叫selfInterrupt()方法將自己掛起。

獨佔式同步狀態的獲取流程,也就是acquire(int  arg)方法的呼叫流程如下圖所示:

當前執行緒獲取同步狀態並執行了相應邏輯後,就需要釋放同步狀態,使得後繼節點能夠繼續獲取同步狀態。通過呼叫同步器的release(int  arg)方法可以釋放同步狀態,該方法在釋放了同步狀態之後,會喚醒其後繼節點。


2、獨佔式同步狀態的釋放

public final boolean release(int arg) {
    // 撥動密碼鎖, 看看是否能夠開鎖
    if (tryRelease(arg)) {
        // 獲取head結點
        Node h = head;
        // 如果head結點不為空並且等待狀態不等於0就去喚醒後繼結點
        if (h != null && h.waitStatus != 0) {
            // 喚醒後繼結點
            unparkSuccessor(h);
        }
        return true;
    }
    return false;
}

// 喚醒後繼結點
private void unparkSuccessor(Node node) {
    // 獲取給定結點的等待狀態
    int ws = node.waitStatus;
    // 將等待狀態更新為0
    if (ws < 0) {
        compareAndSetWaitStatus(node, ws, 0);
    }
    // 獲取給定結點的後繼結點
    Node s = node.next;
    // 後繼結點為空或者等待狀態為取消狀態
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 從後向前遍歷佇列找到第一個不是取消狀態的結點
        for (Node t = tail; t != null && t != node; t = t.prev) {
            if (t.waitStatus <= 0) {
                s = t;
            }
        }
    }
    // 喚醒給定結點後面首個不是取消狀態的結點
    if (s != null) {
        LockSupport.unpark(s.thread);
    }
}

該方法執行時會喚醒頭節點的後繼節點執行緒,unparkSuccessor(Node  node)方法使用LockSupport來喚醒處於等待狀態的執行緒。

以上分析了獨佔式同步狀態的獲取和釋放過程,做個總結:

在獲取同步狀態時,同步器維護一個同步佇列,獲取狀態失敗的執行緒都會被加入到佇列中進行自旋;移除佇列(或停止自旋)的條件是前驅節點為頭節點且成功獲取了同步狀態。在釋放同步狀態時,同步器呼叫tryRelease(int  arg)方法釋放同步狀態,然後喚醒頭節點的後繼節點。


3、獨佔式獲取同步狀態:可響應中斷

上訴過程是獨佔式獲取同步狀態的過程,這個過程是不響應執行緒的中斷的。那怎樣響應執行緒中斷獲取同步狀態呢?

// 以可中斷模式獲取同步狀態 / 鎖(獨佔模式)
private void doAcquireInterruptibly(int arg) throws InterruptedException {
    // 將當前執行緒包裝成結點新增到同步佇列中
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            // 獲取當前結點的前繼結點
            final Node p = node.predecessor();
            // 如果p是head結點, 那麼當前執行緒就再次嘗試獲取鎖
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                //獲取鎖成功後返回
                return;
            }
            // 如果滿足條件就掛起當前執行緒, 此時響應中斷並丟擲異常
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
                // 執行緒被喚醒後如果發現中斷請求就丟擲異常
                throw new InterruptedException();
            }
        }
    } finally {
        if (failed) {
            cancelAcquire(node);
        }
    }
}

響應執行緒中斷方式和不響應執行緒中斷方式獲取鎖流程上大致上是相同的。唯一的一點區別就是執行緒從parkAndCheckInterrupt方法中醒來後會檢查執行緒是否中斷,如果是的話就丟擲InterruptedException異常,而不響應執行緒中斷獲取鎖是在收到中斷請求後只是設定一下中斷狀態,並不會立馬結束當前獲取鎖的方法,一直到結點成功獲取到鎖之後才會根據中斷狀態決定是否將自己掛起。


4、獨佔式超時獲取同步狀態

通過呼叫同步器的doAcquireNanos(int arg, long nanosTimeout)方法可以超時獲取同步狀態,即在指定的時間段內獲取同步狀態,如果獲取到同步狀態則返回true,否則返回false。該方法提供了傳統Java同步操作(比如synchronized關鍵字)所不具備的特性。

針對超時獲取,主要需要計算出需要睡眠的時間間隔nanosTimeout,為了防止過早通知,nanosTimeout計算公式為:nanosTimeout -= now - lastTime,其中now為當前喚醒時間,lastTime為上次喚醒時間,如果nanosTime大於0則表示超時時間未到,需要繼續睡眠nanosTimeout納秒,反之,則已經超時。

// 以限定超時時間獲取同步狀態/ 鎖(獨佔模式)
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    // 獲取系統當前時間
    long lastTime = System.nanoTime();
    // 將當前執行緒包裝成結點新增到同步佇列中
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            // 獲取當前結點的前繼結點
            final Node p = node.predecessor();
            //如果前繼是head結點, 那麼當前執行緒就再次嘗試獲取鎖
            if (p == head && tryAcquire(arg)) {
                // 更新head結點
                setHead(node);
                p.next = null;
                failed = false;
                return true;
            }
            // 超時時間用完了就直接退出迴圈
            if (nanosTimeout <= 0) {
                return false;
            }
            // 如果超時時間大於自旋時間, 那麼等判斷可以掛起執行緒之後就會將執行緒掛起一段時間
            if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) {
                // 將當前執行緒掛起一段時間, 之後再自己醒來
                LockSupport.parkNanos(this, nanosTimeout);
            }
            // 獲取系統當前時間
            long now = System.nanoTime();
            //超時時間每次都減去獲取鎖的時間間隔
            nanosTimeout -= now - lastTime;
            // 再次更新lastTime
            lastTime = now;
            // 在獲取鎖的期間收到中斷請求就丟擲異常
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }
        }
    } finally {
        if (failed) {
            cancelAcquire(node);
        }
    }
}

該方法在自旋過程中,當節點的前驅節點為頭節點時嘗試獲取同步狀態,如果獲取成功則從方法中返回,這個過程和獨佔式同步獲取的過程類似,但是在同步狀態獲取失敗的處理上不同。如果當前執行緒獲取同步狀態失敗,則判斷是否超時(nanosTimeout小於等於0,就代表已經超時了),如果沒有超時,重新計算nanosTimeout,然後使當前執行緒等待nanosTimeout納秒(當已到設定的超時時間,該執行緒會從LockSupport.parkNanos(Object blocker, long  nanos)方法返回)。

注意在以超時時間獲取鎖的過程中是可以響應執行緒中斷請求的。

獨佔式超時獲取同步狀態的流程

從上圖可以看出:獨佔式超時獲取同步狀態doAcquireNanos(int arg, long  nanosTimeout)和獨佔式獲取同步狀態acquire(int  args)在流程上非常相似,其主要區別在於未獲取到同步狀態時的處理邏輯。acquire(int  args)在未獲取同步狀態時,將會使當前執行緒一直處於等待狀態,而doAcquireNanos(int arg, long  nanosTimeout)會使當前執行緒等待nanosTimeout納秒,如果當前執行緒在nanosTimeout納秒沒有獲取到同步狀態,將會從等待邏輯中自動返回。


AQS系列文章:

1、佇列同步器AQS原始碼分析之概要分析

2、佇列同步器AQS原始碼分析之獨佔模式

3、佇列同步器AQS原始碼分析之共享模式

4、佇列同步器AQS原始碼分析之Condition介面、等待佇列

參考及推薦:

1、AbstractQueuedSynchronizer原始碼分析之概要分析

2、AbstractQueuedSynchronizer原始碼分析之獨佔模式

3、AbstractQueuedSynchronizer原始碼分析之共享模式

4、AbstractQueuedSynchronizer原始碼分析之條件佇列