1. 程式人生 > >JUC之AQS框架

JUC之AQS框架

一、簡介

1. AQS

AQS是AbstractQueuedSynchronizer的簡寫,中文名應該叫抽象佇列同步器(我給的名字,哈哈),出生於Java 1.5

Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int

value to represent state. Subclasses must define the protected methods that change this state, and which define what that state means in terms of this object being acquired or released. Given these, the other methods in this class carry out all queuing and blocking mechanics. Subclasses can maintain other state fields, but only the atomically updated int value manipulated using methods getState, setState and compareAndSetState is tracked with respect to synchronization. ——Java docs

翻開原始碼,首先是一大段的註釋,然後可以看開AQS繼承自AbstractOwnableSynchronizer。當你翻開AbstractOwnableSynchronizer時,還是先看到小段的說註釋。

A synchronizer that may be exclusively owned by a thread. This class provides a basis for creating locks and related synchronizers that may entail a notion of ownership. The AbstractOwnableSynchronizer class itself does not manage or use this information. However, subclasses and tools may use appropriately maintained values to help control and monitor access and provide diagnostics. ——Java docs

2. AOS

她生自Java 1.6

可以由執行緒以獨佔方式擁有的同步器。此類為建立鎖和相關同步器(伴隨著所有權的概念)提供了基礎。AbstractOwnableSynchronizer 類本身不管理或使用此資訊。但是,子類和工具可以使用適當維護的值幫助控制和監視訪問以及提供診斷。

簡言之,AOS(容我先如此縮寫吧)輸出的是一種概念,是一種建立獨佔式同步器所有權的概念。她自身並沒有規範所有權的管理方式,更沒有用到這些資訊(所有權的資訊)。不過呢,可以幫其子類維護所有者資訊。
可以你並不能理解上面這一小段話,不過沒關係接著看原始碼就是了。
AOS提供兩個方法一個欄位,外帶一個空構造方法(可無視之)。

/**
 * The current owner of exclusive mode synchronization.
 */
private transient Thread exclusiveOwnerThread; // transient表示該欄位不需要序列化,因為AOS實現了Serializable

protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}

protected final Thread getExclusiveOwnerThread() {
    return exclusiveOwnerThread;
}

這下明白了吧,就是AOS啥都沒有就只維護一個東西——當前誰持有獨佔式同步器。因此咱們可以當它不存在吧,繼續回到重點AQS。

You may also find the inherited methods from {@link AbstractOwnableSynchronizer} useful to keep track of the thread owning an exclusive synchronizer. You are encouraged to use them – this enables monitoring and diagnostic tools to assist users in determining which threads hold locks. – Java Docs

二、作用

Java Docs的註釋可以知道,AQS是一個框架,一個提供鎖或同步器依賴於FIFO等待佇列所必要的“基礎設施”的框架。Doug Lea之所以寫個抽象類的目的是為了簡化我們實現同步器的工作。

換句話說,

提供一個基於FIFO等待佇列,可以用於構建鎖或者其他同步裝置的基礎框架。意在能夠成為實現大部分同步需求的基礎

AQS預設提供了獨佔式共享式兩種模式,JDK對應的實現有ReentrantLockReentrantReadWriteLock。即除了提供acquire方法之外,還提供了acquireShare。帶shared都是共享模式相關操作,預設則是獨佔模式。

AQS到底提供了哪些便利呢:
1. 管理狀態
2. 實現了執行緒安全的CLH佇列
3. 實現了同步器公共方法
4. ConditionObject

三、原理

AQS自身就是一個Wait QueueCLH lock queue的變種,CLH佇列通常用於自旋鎖

The wait queue is a variant of a “CLH” (Craig, Landin, and Hagersten) lock queue. CLH locks are normally used for spinlocks. We instead use them for blocking synchronizers, but use the same basic tactic of holding some of the control information about a thread in the predecessor of its node.

我們知道Queue的底層實現是一個連結串列結構,我們也知道它可以是單鏈表,也可以是雙鏈表。AQS實現的是一個雙鏈表結構。即每個節點都儲存了前驅和後繼兩個指標,而且Head節點是dummy節點。入隊則tail前驅一個節點,再把新節點接入,然後接入tail節點。

AQS完全通過CAS解決多執行緒安全問題,即通過Unsafe完成的。具體可以自行閱讀AQS原始碼,在最後幾個方法,非常簡單。

CAS : compareAndSet,Unsafe的CAS方法的語義是先與期望值進行比較,若不相等返回false;若相等的話,則會把原值修改新值,再返回true。這個跟java.util.concurrent.atomic的原子類的CAS方法是一樣一樣的。

AQS的實現依賴LockSupport完成阻塞和喚醒,也正是LockSupport的引入使得AQS原始碼閱讀起來邏輯上有點跳躍。不過沒關係,後面我們梳理Java Lock的時候,會把LockSupport重新拿出來看的。

什麼是自旋鎖,如下示例實現的即是自旋鎖,即是迴圈等待訊號量變成期望值後完成相關操作。

// 來自 AQS#doAcquireInterruptibly
private void doAcquireInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) { // 自旋鎖
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) { // 相當於阻塞到訊號量變成期望值
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

除此之外,AQS提供一些與同步器業務相關的一些方法,如:acquire和release。另外還有一些與Queue監控相關的方法。

AQS提供的所有方法的分類列表:
1. Queuing utilities
2. Utilities for various versions of acquire
cancelAcquire
3. Main exported methods
tryAcquire
releaseShared
4. Queue inspection methods
hasQueuedThreads
5. Instrumentation and monitoring methods
getQueueLength
6. Interenal support methdos for Conditions
isOnSyncQueue
7. Instrumentation methods for conditions
owns
8. Internal methods
addConditionWaiter
9. public methods
10. for interruptible waits, we need to track whether to throw InterruptedException,….
11. Support for instrumentation
isOwnedBy
12. CAS : unsafe

四、用法

AQS的用法,可以直接參考AQS的註釋,另外也可以參考ReentrantLock的寫法。
通過比外提供兩個方法acquirerelease的原始碼可知,它們是通過呼叫對應protected的try方法完成的。

public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

註釋指出了,繼承AQS需要實現下面五個方法,

tryAcquire(int arg)
tryRelease(int arg)
tryAcquireShared(int arg)
tryReleaseShared(int arg)
isHeldExclusively()

它們帶的int引數,用來表示狀態,如下面示例中,用了10表示鎖和釋放。
如你所知,前兩個是獨佔式,中間兩個是共享式。最後是測試當是同步器是否正被持有獨佔鎖。即是已經有執行緒持有這個排他鎖,且還未釋放。

下面是AQS註釋裡提供的實現。

class Mutex implements Lock, java.io.Serializable {

    // Our internal helper class
    private static class Sync extends AbstractQueuedSynchronizer {
        // Report whether in locked state
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        // Acquire the lock if state is zero
        public boolean tryAcquire(int acquires) {
            assert acquires == 1; // Otherwise unused
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        // Release the lock by setting state to zero
        protected boolean tryRelease(int releases) {
            assert releases == 1; // Otherwise unused
            if (getState() == 0)
                throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        // Provide a Condition
        Condition newCondition() {
            return new ConditionObject();
        }

        // Deserialize properly
        private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }

    // The sync object does all the hard work. We just forward to it.
    private final Sync sync = new Sync();

    public void lock() {
        sync.acquire(1);
    }

    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    public void unlock() {
        sync.release(1);
    }

    public Condition newCondition() {
        return sync.newCondition();
    }

    public boolean isLocked() {
        return sync.isHeldExclusively();
    }

    public boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
}