1. 程式人生 > >併發系列(八)-----AQS詳解獨佔模式資源的獲取與釋放

併發系列(八)-----AQS詳解獨佔模式資源的獲取與釋放

一 簡介

上一篇總結了AQS的整體架構,以及它的組成,和它們之間的關係。AQS主要的三部分分別是volatile修飾的變數、同步佇列和等待佇列其中,同步佇列在上篇總結中已經介紹過了,不知道的話可以點這裡AQS的框架組成以及同步佇列原始碼解析。這一片文章主要總結獨佔模式下資源的獲取。

二 資源的獲取原始碼解析

在上一篇總結中,最後過原始碼的時候看到addWater(),同時我們也提出兩個猜想獲取資源的兩種方式

猜想一:執行緒上來就直接獲取,如果獲取成功的話那就執行了,獲取失敗的話被封裝成節點新增到同步佇列中

猜想二:執行緒一上來看看佇列中有沒有要同步的節點,如果有的話那就不獲取資源了直接新增到同步佇列中,等待上一個節點喚醒。

現在看一下操作stste的方法有哪些


    /**
     * 返回當前同步狀態
     */
    protected final int getState() {
        return state;
    }

    /**
     * 設定當前的同步狀態
     */
    protected final void setState(int newState) {
        state = newState;
    }

    /**
     * 使用CAS來更新state的值
     *
     * @param except 期望值
     * @param update 更新值
     * @return 更新是否成功
     */
    protected final boolean compareAndSetState(int except, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, except, update);
    }

AQS下面這個方法就是來獲取資源state的

    /**
     * 以獨佔模式獲取,忽略中斷。實現 至少呼叫一次{@link #tryAcquire},
     * 成功迴歸。 否則,執行緒可能會排隊      
     *
     * @param arg 資源請求
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
            selfInterrupt();
        }
    }

看到上面的程式碼我們看到了比較熟悉的方法就是addWaier(),這個方法返回一個封裝好執行緒的節點,被當作引數傳遞到acquireQueued()這個方法中。但是acquireQueued執行不執行取決與前面的tryAcquire()這個方法。當tryAcquire()返回false的時候才會去執行acquireQueued()這個方法。再看一下tryAcquire()這個方法。

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

盡然丟擲一個異常,在第一遍的文章說過AQS是基於模板方法的框架,既然使用的是模板方法那就需要子類去實現了,在ReentrantLock內部類Sync中是AQS的實現。下面是原始碼

  @Override
        protected final boolean tryAcquire(int acquire) {
            return nonfairTryAcquire(acquire);
        }

 final boolean nonfairTryAcquire(int acquires) {
            //獲取但當前執行緒
            final Thread current = Thread.currentThread();
            //獲取資源的狀態
            int c = getState();
            if (c == 0) {
                //如果資源的狀態為0的話說明state是沒有執行緒持有當前資源的
                if (compareAndSetState(0, acquires)) {
                    //使用CAS替換,如果成功那就將獨佔執行緒設為當前執行緒,也就意味著當前執行緒
                    //擁有執行時間了
                    setExclusiveOwnerThread(current);
                     //獲取的資源返回true
                    return true;
                }
            } else if (current == getExclusiveOwnerThread()) {
                //如果state不為0的話獨佔執行緒是當前執行緒的話那麼給state加一,這裡是
                //重入鎖的實現
                int nextc = c + acquires;
                if (nextc < 0) {
                    throw new Error("Maximum lock count exceeded");
                }
                setState(nextc);
                return true;
            }
            //如果沒獲取到資源的話返回false
            return false;
        }

好了tryAcquire()方法幹什麼的已經知道了,如果獲取到資源的話返回true,沒有獲取到資源的話就返回false,現在再看acquire()這個方法,當執行緒獲取到資源的時候返回的是true ,!true也就是false,那就不必在在執行&&面的語句了,如果沒有獲取到資源,就要執行後面的語句了,首先將當前執行緒包裝成一個節點新增到對列尾部並返回這個節點。既然包裝了,那就要處理將這個節點了。acquireQueued方法就是幹這個的,下面是原始碼

 /**
     * 將競爭節點設定為頭節點,同時當前節點不是頭節點的話
     *
     * @param node 要獲取頭節點的節點
     * @param arg  state狀態引數
     * @return 返回true表示執行緒發生了中斷
     */
    final boolean acquireQueued(final Node node, int arg) {
        //這個變數來看是否要取消節點的競爭
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (; ; ) {
                //獲取node的前驅節點
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {//當前的前驅節點是頭節點,那麼當前節點就獲取資源
                    //將node設定為頭節點
                    setHead(node);
                    //消除引用有利於垃圾回收
                    p.next = null;
                    failed = false;
                    return interrupted;
                }
                //這裡是for迴圈的移動條件跳過前驅接節點未取消狀態的節點,
                //當前執行緒中斷的話只能返回去等待了
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt()) {
                    interrupted = true;
                }
            }
        } finally {
            //當執行緒中斷直接取消當前節點競爭
            if (failed) {
                cancelAcquire(node);
            }
        }

    }

對於上面一段原始碼來說,獲取資源是不難理解的,但是沒有獲取到資源時候執行了一個if語句,看一下if語句中兩個方法中分別做了什麼。下面是原始碼


    /**
     * 檢查是否可以在節點後新增競爭節點,同時檢查node前驅節點是否取消,如果取消了就要將這個節點
     * 移除掉,如果在前驅節點等待中返回true
     *
     * @param pred 前驅節點
     * @param node 當前節點
     * @return 返回boolean
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //獲取前驅節點的狀態
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL) {
            return true;
        }
        if (ws > 0) {
            do {
                //如果前驅節點的的狀態為取消狀態那麼跳過直到找到可以
                //獲取競爭的node
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

當node中的前驅節點是等待中的時候就會執行下一個方法,下一個方法的原始碼如下

    /**
     * 將執行緒至於waiting中 並返回執行緒是否中斷
     *
     * @return 中斷執行緒的boolean
     */
    private final boolean parkAndCheckInterrupt() {
        //將執行緒至於waiting中
        LockSupport.park(this);
        return Thread.interrupted();
    }

看到這裡基本上已經清楚了,acquireQueued()方法就是如果可以獲取到資源的時候直接獲取,不能獲取到資源時檢查父節點是否在等待狀態中,如果在等待中,就呼叫LockSupport.park(),將當前執行緒至於等待狀態,等待中斷或著喚醒。AQS獲取資源也就完了。

AQS獲取資源的總結:

1.首先先使用CAS獲取支援state。如果獲取成功的話就不在新增節點了,如果獲取失敗的話將當前執行緒封裝到node中。

2.在新增節點的時候判斷前驅節點是否是頭節點,如果前驅節點是頭節點的話,繼續for迴圈獲取資源。如果不是頭節點的話檢查當前的前驅節點是否為空,將當前節點的所有前面的節點為取消狀態的全都去掉。去掉之後當前節點還不是頭節點時,將執行緒至於waiting狀態等待中斷或喚醒。

上面的總結也就驗證了的猜想一,其實就是非公平鎖的實現,猜想二是公平的鎖的實現。可以在ReentrantLock中的內部類FairSync看到公平鎖的實現。

三 資源的釋放

關於支援的釋放我認為是比較簡單的,可以大體的猜想一下,找到持有資源的node節點,將state的值設定為0,再將當前node節點從同步佇列中移除掉。然後喚醒下一個節點的執行緒。下面是原始碼

    /**
     * 釋放資源,並喚醒下一個節點
     *
     * @param arg 狀態
     * @return 釋放成功
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0) {
                //喚醒下一個節點
                unparkSuccessor(h);
            }
            return true;
        }
        return false;
    }

上面原始碼幹什麼已經在註釋中說的很清楚了,但是沒有看到資源是如何操作的。其中資源的操作就在tryRelease()方法中下面是原始碼。

    /**
     * 執行緒呼叫釋放鎖
     *
     * @param arg 狀態
     * @return 是否釋放成功
     */
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

這個方法和tryAcquire()方法一樣都是拋了一個異常我們看其中子類的實現。下面是原始碼。

    /**
         * 釋放資源
         *
         * @param release 釋放鎖的數字
         * @return 是否釋放成功
         */
        @Override
        protected final boolean tryRelease(int release) {
            //獲取到資源並減去資源
            int c = getState() - release;
            
            if (Thread.currentThread() != getExclusiveOwnerThread()) {
                throw new IllegalMonitorStateException();
            }
            boolean free = false;
            if (c == 0) {
                free = true;
                //當state為0的時候表示資源釋放完成想獨佔的執行緒設定為null
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

四 總結

 從上面的原始碼中我們可以看出,鎖的實現無非就是資源的獲取,與佇列的操作,執行緒狀態的轉化。

獲取資源時:先操作state,操作成功就直接獲取到了,操作不成功新增到同步佇列中,呼叫LockSuppport.park(),將執行緒至於waiting狀態等待中斷或喚醒。

資源釋放:先操作state,操作成功的話將佇列中的當前節點移除,喚醒下一個節點。