1. 程式人生 > >Java併發程式設計(2) AbstractQueuedSynchronizer的設計與實現

Java併發程式設計(2) AbstractQueuedSynchronizer的設計與實現

一 前言

  上一篇分析AQS的內部結構,其中有介紹AQS是什麼,以及它的內部結構的組成,那麼今天就來分析下前面說的內部結構在AQS中的具體作用(主要在具體實現中體現)。

二 AQS的介面和簡單示例

  上篇有說到AQS是抽象類,而它的設計是基於模板方法模式的,也就是說:使用者需要繼承同步器並重寫指定的方法,隨後將同步器組合在自定義同步元件的實現中,並呼叫其提供的模板方法。其中需要子類重寫的方法與描述如下表:

方法名稱 描述
protected boolean tryAcquire(int arg)

嘗試以獨佔模式獲取。 此方法應查詢物件的狀態是否允許以獨佔模式獲取它,如果是,則獲取它。

實現該方法需要查詢當前狀態並判斷同步狀態是否預期,然後進行CAS設定同步狀態。

protected boolean tryRelease(int arg)

嘗試釋放獨佔式的同步狀態。

等待獲取同步狀態的執行緒將有機會獲取同步狀態。

protected int tryAcquireShared(int arg)

嘗試以共享模式獲取。 此方法應查詢物件的狀態是否允許在共享模式下獲取它,如果是,則獲取。

實現該方法需要查詢當前狀態並判斷同步狀態是否預期,然後進行CAS設定同步狀態。

protected boolean tryReleaseShared(int arg)

嘗試釋放共享式的同步狀態。

protected boolean isHeldExclusively() 表示當前同步器是否在獨佔模式下被執行緒佔用。

  在重寫上面這些方法時,可能需要下面這三個方法(注意其中state是使用volatile關鍵字修飾的)

方法名 描述
protected final int getState()  獲取當前的同步狀態
protected final void setState(int newState)  設定當前同步狀態
protected final boolean compareAndSetState(int expect, int update)
使用CAS設定當前狀態,該方法能保證狀態設定的原子性

  其實前面這些都不需要關心,因為這些一般都是在自定義同步元件中實現。自定義同步元件除了重寫第一個表格那些方法外,AQS還為其提供了一些公共方法(或者說模板方法),這些才是關鍵,也是重中之重。下面我先簡單列出以及其方法描述,後面一一分析:

方法名稱 描述
public final void acquire(int arg)

獨佔式獲取同步狀態,忽略中斷。

如果當前執行緒獲取同步狀態成功,則由該方法返回;否則將會進入同步佇列等待(

即上篇說的Node節點佇列)。

該方法將會呼叫重寫的tryAcquire(int args)方法。

public final void acquireInterruptibly(int arg)

與acquire(int args)方法一樣,但是該方法響應中斷(從方法名就大概知道意思了吧。)

當前執行緒未獲取到同步狀態而進入同步佇列中,如果當前執行緒被中斷,則該方法會丟擲InterruptedException異常

public final boolean release(int arg)

獨佔式的釋放同步狀態, 該方法會在釋放同步狀態後將同步佇列中第一個節點包含的執行緒喚醒。

該方法會呼叫tryRelease(int args)方法

public final void acquireShared(int arg)

共享式獲取同步狀態,忽略中斷。

如果當前執行緒獲取同步狀態成功,則由該方法返回;否則將會進入同步佇列等待

(即上篇說的Node節點佇列)。

與獨佔式獲取的主要區別是在同一時刻可以有多個執行緒獲取到同步狀態。

該方法將會呼叫重寫的tryAcquireShare(int args)方法。

public final void acquireSharedInterruptibly(int arg) 與acquireInterruptibly方法相同
public final boolean releaseShared(int arg)  共享式的釋放同步狀態

該方法會呼叫tryReleaseShared(int args)方法

  根據上面提供的方法,同步器主要提供兩種模式:獨佔式和共享式。顧名思義,獨佔式表示同一時刻只有一個執行緒才會獲取到同步狀態,而其他執行緒都得等待;而共享式就允許同一時刻可以多個執行緒獲取到同步狀態。至於示例的話,大家可以檢視原始碼類上註釋的Mutx類,表示一個自定義的獨佔鎖。下面我還是直接貼出示例程式碼。

class Mutex implements Lock, java.io.Serializable {
    // 內部類,自定義同步器
    private static class Sync extends AbstractQueuedSynchronizer {
        // 是否處於佔用狀態
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }
        // 當狀態為0的時候獲取鎖
        public boolean tryAcquire(int acquires) {
            assert acquires == 1; // Otherwise unused
            if (compareAndSetState(0, 1)) {
                // 將當前執行緒設定為獨佔執行緒
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        // 釋放鎖,將狀態設定為0
        protected boolean tryRelease(int releases) {
            assert releases == 1; // 斷言
            if (getState() == 0) throw new IllegalMonitorStateException();
            // 將執行緒或狀態 重置為初始值
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        // 返回一個Condition,每個condition都包含了一個condition佇列
        Condition newCondition() { return new ConditionObject(); }
    }
    // 僅需要將操作代理到Sync上即可
    private final Sync sync = new Sync();
    public void lock()                { sync.acquire(1); }
    public boolean tryLock()          { return sync.tryAcquire(1); }
    public void unlock()              { sync.release(1); }
    public Condition newCondition()   { return sync.newCondition(); }
    public boolean isLocked()         { return sync.isHeldExclusively(); }
    public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
}
View Code

  看了下自定義的獨佔鎖Metux(上面程式碼來自原始碼),寫個案例測試下它到底是否是獨佔鎖(大家應該知道怎麼測試吧)。

public class MutexTest {

    private Lock lock ;
    private MutexTest(Lock lock) {
        this.lock = lock;
    }

    public void runTask() {
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + " 執行任務中...");
            Thread.sleep(3000);
            System.out.println(Thread.currentThread().getName() + " 任務執行完成。");
        } catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        Lock lock = new Mutex();
        final MutexTest test = new MutexTest(lock);
        for (int i = 0; i < 5; i ++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    test.runTask();
                }
            }, "執行緒" + i).start();
        }
    }
}
View Code

  執行該案例從列印結果中可以看出,同一時刻只有一個執行緒在執行(這就是獨佔鎖的特性)。

執行緒0 執行任務中...
執行緒0 任務執行完成。
執行緒2 執行任務中...
執行緒2 任務執行完成。
執行緒1 執行任務中...
執行緒1 任務執行完成。
執行緒3 執行任務中...
執行緒3 任務執行完成。
執行緒4 執行任務中...
執行緒4 任務執行完成。

三 AQS的核心函式分析

  關於獲取和釋放下面只分析acquire函式和release函式,因為其他都與這個函式類似。

1、acquire函式

    /**
     * 獨佔式獲取同步狀態,忽略中斷。
     */
    public final void acquire(int arg) {
        /**
         * 1 呼叫子類的tryAcquiref(arg)方法,如果獲取成功則直接返回,否則以獨佔模式建立節點加入等待佇列
         */
        if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

  acquire函式主要功能都放在這三個方法中:

  • tryAcquire(arg) 子類提供實現
  • addWaiter(Node) 主要是將節點新增到等待佇列中。
  • acquireQueue(Node, int) 主要是提取等待佇列中能獲取同步狀態的節點(遵循FIFO)。

  1.2 下面先分析下addWaiter(Node)函式:

/**
 * 2 根據給定模式為當前執行緒建立並排隊節點。
 */
private Node addWaiter(Node mode) {
    // 2.1 根據跟定的模式和當前執行緒建立節點。(在這就用的上Node了)
    Node node = new Node(Thread.currentThread(), mode);
    // 2.2 嘗試下快速通道:判斷tail節點是否為空,如果不為空就直接新增到尾節點後面。
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        // 2.2.1 進入到這個方法說明執行緒並沒有獲取鎖,所以需要CAS保證原子性
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 2.3 如果是第一個入隊的節點或者compareAndSetTail設定失敗,那麼就進入enq()方法
    enq(node);
    return node;
}
/**
 * 將節點插入佇列,必要時進行初始化。
 */
private Node enq(final Node node) {
    // 自旋,直至設定新增尾節點成功。
    for (;;) {
        Node t = tail;
        if (t == null) {
            // 2.3.1 尾節點為空,則需要初始化佇列(同理採取CAS保證原子性)
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 2.3.2 尾節點不為空,則將節點設定成尾節點(同理採取CAS保證原子性)
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

  上述邏輯主要包括:使用當前執行緒建立節點,然後將當前節點新增到同步佇列中。其中設定節點都是利用CAS設定,保證原子性。

  具體流程:

  a 先行嘗試在隊尾新增(如果尾節點不為空)(另外這一步很重要,如果尾節點存在就可以以最短路徑O(1)的效果來完成執行緒入隊,是最大化減少開銷的一種方式):

    • 分配引用prev指向尾節點;
    • 將節點的前驅節點更新為尾節點(current.prev = tail);
    • 如果尾節點是prev,那麼將當尾節點設定為該節點(tail = current,原子更新);
    • prev的後繼節點指向當前節點(prev.next = current)。

  b 如果是第一個入隊的節點或者compareAndSetTail設定失敗:

    • 如果尾節點為空,則需要初始化佇列(同理採取CAS保證原子性),繼續自旋判斷;

    •  重複上面a步驟將節點嘗試新增至尾節點後,直接新增成功。      

  1.3 進入sync佇列之後,接下來就是要進行同步狀態的獲取,下面請看acquireQueue(Node, arg)函式: 

/**
 * 3 對於已經在佇列中的執行緒,以獨佔不間斷模式獲取。
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 同樣採取自旋直至條件滿足
        for (;;) {
            // 3.1 獲取當前節點的前驅節點
            final Node p = node.predecessor();
            // 3.2 判斷前驅節點是否為頭節點,並此時是否可以獲取到同步狀態
            if (p == head && tryAcquire(arg)) {
                // 3.2.1 若上面條件滿足,則將當前節點設定為頭節點。
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

  上述邏輯主要包括:

    • 獲取當前節點的前驅節點;需要獲取當前節點的前驅節點,而頭結點所對應的含義是當前佔有鎖且正在執行。
    • 當前驅節點是頭結點並且能夠獲取狀態,代表該當前節點佔有鎖;

      如果滿足上述條件,那麼代表能夠佔有鎖,根據節點對鎖佔有的含義,設定頭結點為當前節點。

    • 否則進入等待狀態。

      如果沒有輪到當前節點執行,那麼將當前執行緒從執行緒排程器上摘下,也就是進入等待狀態。也就是呼叫shouldParkAfterFailedAcquire和parkAndCheckInterrupt函式

   下面看下它是怎麼將不滿足節點摘下來進入等待狀態的。

/**
 * 檢查並更新獲取失敗的節點的狀態。
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * 狀態處於SIGNAL狀態(-1),表示後繼節點隨時可以upark
         */
        return true;
    if (ws > 0) {
        /*
         * ws > 0表示處於CANCELLED狀態,則需要跳過找到node節點前面不處於取消狀態的節點。
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * 此時ws為PROPAGATE -3 或者是0 表示無狀態,(為CONDITION -2時,表示此節點在condition queue中)
         * 比較並設定前驅結點的狀態為SIGNAL
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    /**
     * 此時還不確定Node的前置節點是否處於SIGNAL狀態
     * 所以不支援park操作
     */
    return false;
}

/**
 * 進行park操作並且返回該執行緒是否被中斷
 */
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

   上述主要邏輯包括:

    • 如果前置節點的狀態是Signal狀態,則返回true。
    • 如果前置節點處於取消狀態,則跳過這種取消節點,找到不是前面不是取消狀態的節點然後返回false;
    • 如果前置節點處於<0的狀態,則利用CAS將其狀態設定成Signal狀態,然後返回false.
    • 經過上面步驟後,如果返回true,則說明可以中斷執行緒進入等待。

   那麼acquire函式分析到這就結束了,估計看了一遍還是不太清晰流程那麼就多看幾遍。下面也對這個流程進行總結下:

2、release函式  

/**
 * 以獨佔模式釋放
 */
public final boolean release(int arg) {
    // tryRelease由子類實現
    if (tryRelease(arg)) {
        // 獲取頭結點
        Node h = head;
        // 頭結點不為空並且頭結點狀態不為0
        if (h != null && h.waitStatus != 0)
            // 釋放頭結點的後繼結點
            unparkSuccessor(h);
        return true;
    }
    return false;
}

/**
 * 喚醒後繼節點
 */
private void unparkSuccessor(Node node) {
    // 獲取節點狀態
    int ws = node.waitStatus;
    // 如果節點狀態小於,則將其設定為初始狀態。
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    // 如果節點狀態是取消或節點為空,則從尾部向後移動以找到實際未取消的繼任者。
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

  上述邏輯主要包括:

    • 嘗試釋放狀態;

      tryRelease能夠保證原子化的將狀態設定回去,當然需要使用compareAndSet來保證。如果釋放狀態成功過之後,將會進入後繼節點的喚醒過程。

    •  喚醒當前節點的後繼節點所包含的執行緒。

      通過LockSupport的unpark方法將休眠中的執行緒喚醒,讓其繼續acquire狀態。

四 總結(獲取與釋放過程)

  1. 在獲取時,維護了一個sync佇列,每個節點都是一個執行緒在進行自旋,而依據就是自己是否是首節點的後繼並且能夠獲取資源;(重點,不清楚的可以看上面的流程圖)
  2. 在釋放時,僅僅需要將資源還回去,然後通知一下後繼節點並將其喚醒。
  3. 這裡需要注意,佇列的維護(首節點的更換)是依靠消費者(獲取時)來完成的,也就是說在滿足了自旋退出的條件時的一刻,這個節點就會被設定成為首節點。

  另外送大家一碗心靈雞湯:)

我從不相信什麼懶洋洋的自由,我向往的自由是通過勤奮和努力實現更廣闊的人生,那樣的自由才是珍貴的、有價值的。我相信一萬小時定律,我從來不相信天上掉餡餅的靈感和坐等的成就。做一個自由又自律的人,靠勢必實現的決心認真地活著。