1. 程式人生 > >AQS詳解,併發程式設計的半壁江山

AQS詳解,併發程式設計的半壁江山

千呼萬喚始出來,終於寫到AQS這個一章了,其實為了寫這一章,前面也是做了很多的鋪墊,比如之前的

深度理解volatile關鍵字 執行緒之間的協作(等待通知模式) JUC 常用4大併發工具類 CAS 原子操作 顯示鎖 瞭解LockSupport工具類

這些文章其實都是為了讓大家理解AQS而寫的鋪墊,就像吃東西需要一口一口的吃一樣

AQS概述及其實現類:

  AQS,是AbstractQuenedSynchronizer的縮寫,中文名稱為抽象的佇列式同步器,是java併發程式設計這一塊的半壁江山,這個類存在於在java.util.concurrent.locks包,AQS定義了一套多執行緒訪問共享資源的同步器框架,許多同步類實現都依賴於它,比如之前寫的顯示鎖ReentrantLock,,讀寫鎖ReentrantReadWriteLock,JUC的四大併發工具類中的Semaphore,CountDownLatch,執行緒池暫時還沒寫之後再寫

 

 

 在JDK1.7之前,FutureTask,應該也是繼承了AQS來實現的,但是1.8之後就改變了

 

 

 但是實現思想應該沒有太大改變,,所以說AQS是併發程式設計的半壁江山

 

核心思想:

  如果被請求的共享資源空閒,則將當前請求資源的執行緒設定為有效的工作執行緒,並將共享資源設定為鎖定狀態,如果被請求的共享資源被佔用,那麼就需要一套執行緒阻塞等待以及被喚醒時鎖分配的機制,這個機制AQS是用CLH佇列鎖實現的,即將暫時獲取不到鎖的執行緒加入到佇列中。

CLH(Craig,Landin,and Hagersten)佇列是一個虛擬的雙向佇列,虛擬的雙向佇列即不存在佇列例項,僅存在節點之間的關聯關係。

AQS是將每一條請求共享資源的執行緒封裝成一個CLH鎖佇列的一個結點(Node),來實現鎖的分配。

其實在我理解來說,AQS就是基於CLH佇列,用volatile修飾共享變數state,來保證變數的可見性,執行緒通過CAS去改變狀態符,保證狀態的原子性,成功則獲取鎖成功,失敗則進入等待佇列,等待被喚醒。

注意:AQS是自旋鎖:在等待喚醒的時候,經常會使用自旋(while(!cas()))的方式,不停地嘗試獲取鎖,直到被其他執行緒獲取成功

 

框架:

 

 

通過這個圖得知,AQS維護了一個volatile int state和一個FIFO執行緒等待佇列,多執行緒爭用資源被阻塞的時候就會進入這個佇列。state就是共享資源,其訪問方式有如下三種:

getState();setState();compareAndSetState();

AQS 定義了兩種資源共享方式:
1.Exclusive:獨佔,只有一個執行緒能執行,如ReentrantLock
2.Share:共享,多個執行緒可以同時執行,如Semaphore、CountDownLatch、ReadWriteLock,CyclicBarrier

不同的自定義的同步器爭用共享資源的方式也不同。

AQS底層使用了模板方法模式

同步器的設計是基於模板方法模式的,如果不瞭解的可以去看看模板方法設計模式,之前在寫設計模式的六大設計原則的時候也說了,看看設計模式有助於理解原始碼,如果需要自定義同步器一般的方式是這樣:

  1. 使用者繼承AbstractQueuedSynchronizer並重寫指定的方法。
  2. 將AQS組合在自定義同步元件的實現中,並呼叫其模板方法,而這些模板方法會呼叫使用者重寫的方法,就類似於我定義了一個骨架,你填充東西一樣

自定義同步器在實現的時候只需要實現共享資源state的獲取和釋放方式即可,至於具體執行緒等待佇列的維護,AQS已經在頂層實現好了。自定義同步器實現的時候主要實現下面幾種方法:

  • isHeldExclusively():該執行緒是否正在獨佔資源。只有用到condition才需要去實現它。
  • tryAcquire(int):獨佔方式。嘗試獲取資源,成功則返回true,失敗則返回false。
  • tryRelease(int):獨佔方式。嘗試釋放資源,成功則返回true,失敗則返回false。
  • tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
  • tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放後允許喚醒後續等待結點返回true,否則返回false。

  以ReentrantLock為例,state初始化為0,表示未鎖定狀態。A執行緒lock()時,會呼叫tryAcquire()獨佔該鎖並將state+1。此後,其他執行緒再tryAcquire()時就會失敗,直到A執行緒unlock()到state=0(即釋放鎖)為止,其它執行緒才有機會獲取該鎖。當然,釋放鎖之前,A執行緒自己是可以重複獲取此鎖的(state會累加),這就是可重入的概念。但要注意,獲取多少次就要釋放多麼次,這樣才能保證state是能回到零態的。

  再以CountDownLatch以例,任務分為N個子執行緒去執行,state也初始化為N(注意N要與執行緒個數一致)。這N個子執行緒是並行執行的,每個子執行緒執行完後countDown()一次,state會CAS減1。等到所有子執行緒都執行完後(即state=0),會unpark()主呼叫執行緒,然後主呼叫執行緒就會從await()函式返回,繼續後餘動作。

  一般來說,自定義同步器要麼是獨佔方法,要麼是共享方式,他們也只需實現tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一種即可。但AQS也支援自定義同步器同時實現獨佔和共享兩種方式,如ReentrantReadWriteLock。

  在acquire() acquireShared()兩種方式下,執行緒在等待佇列中都是忽略中斷的,acquireInterruptibly()/acquireSharedInterruptibly()是支援響應中斷的。

繼承AQS,手寫獨佔式可重入鎖:

  說了那麼多,但是說一千道一萬不如自己手寫試試,接下來看程式碼

package org.dance.day4.aqs;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * 採用主類實現Lock介面,內部類繼承AQS,封裝細節
 * 自定義鎖
 * @author ZYGisComputer
 */
public class CustomerLock implements Lock {

    private final Sync sync = new Sync();

    /**
     * 採用內部類來繼承AQS,封裝細節
     *  實現獨佔鎖,通過控制state狀態開表示鎖的狀態
     *      state:1 代表鎖已被佔用
     *      state:0 代表鎖可以被佔用
     */
    private static class Sync extends AbstractQueuedSynchronizer{
        @Override
        protected boolean tryAcquire(int arg) {
            if(compareAndSetState(0,1)){
                // 當前執行緒獲取到鎖
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }else{
                return false;
            }
        }

        @Override
        protected boolean tryRelease(int arg) {
            // 如果狀態為沒人佔用,還去釋放,就報錯
            if(getState()==0){
                throw new UnsupportedOperationException();
            }
            // 把鎖的佔用者制空
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        /**
         * 判斷執行緒是否佔用資源
         * @return
         */
        @Override
        protected boolean isHeldExclusively() {
            return getState()==1;
        }

        /**
         * 獲取Condition介面
         * @return
         */
        public Condition getCondition(){
            return new ConditionObject();
        }
    }

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1,unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.getCondition();
    }
}

工具類:

package org.dance.tools;

import java.util.concurrent.TimeUnit;

/**
 * 類說明:執行緒休眠輔助工具類
 */
public class SleepTools {

    /**
     * 按秒休眠
     * @param seconds 秒數
     */
    public static final void second(int seconds) {
        try {
            TimeUnit.SECONDS.sleep(seconds);
        } catch (InterruptedException e) {
        }
    }

    /**
     * 按毫秒數休眠
     * @param seconds 毫秒數
     */
    public static final void ms(int seconds) {
        try {
            TimeUnit.MILLISECONDS.sleep(seconds);
        } catch (InterruptedException e) {
        }
    }
}

測試類:

package org.dance.day4.aqs;


import org.dance.tools.SleepTools;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 *類說明:測試手寫鎖
 */
public class TestMyLock {
    public static void main(String[] args) {
        TestMyLock testMyLock = new TestMyLock();
        testMyLock.test();
    }
    public void test() {
        // 先使用ReentrantLock 然後替換為我們自己的Lock
        final Lock lock = new ReentrantLock();

        class Worker extends Thread {
            @Override
            public void run() {
                while (true) {
                    lock.lock();
                    try {
                        SleepTools.second(1);
                        System.out.println(Thread.currentThread().getName());
                        SleepTools.second(1);
                    } finally {
                        lock.unlock();
                    }
                    SleepTools.second(2);
                }
            }
        }
        // 啟動10個子執行緒
        for (int i = 0; i < 10; i++) {
            Worker w = new Worker();
            w.setDaemon(true);
            w.start();
        }
        // 主執行緒每隔1秒換行
        for (int i = 0; i < 10; i++) {
            SleepTools.second(1);
            System.out.println();
        }
    }
}

執行結果:

Thread-0



Thread-1

Thread-2



Thread-3

Thread-4

通過結果可以看出來每次都是隻有一個執行緒在執行的,執行緒的鎖獲取沒有問題,接下來換我們自己的鎖

final Lock lock = new CustomerLock();

再次執行測試

執行結果:

Thread-0


Thread-1


Thread-2


Thread-3


Thread-4

由此可見,這個手寫的鎖,和ReentrantLock是一樣的效果,是不是感覺也挺簡單的,也沒有多少行程式碼

那麼獨佔鎖,被一個執行緒佔用著,其他執行緒去了哪裡?不要走開接下來進入AQS的原始碼看看

 

理論:

 

 

 

 在AQS中的資料結構是採用同步器+一個雙向迴圈連結串列的資料結構,來儲存等待的節點的,因為雙向連結串列是沒有頭的,但是為了保證喚醒的操作,同步器中的head標誌了連結串列中的一個節點為頭節點,也就是將要喚醒的,也標識了一個尾節點

 

 

結點狀態waitStatus,需要保證可見性,用volatile修飾

      這裡我們說下Node。Node結點是對每一個等待獲取資源的執行緒的封裝,其包含了需要同步的執行緒本身及其等待狀態,如是否被阻塞、是否等待喚醒、是否已經被取消等。變數waitStatus則表示當前Node結點的等待狀態,共有5種取值CANCELLED、SIGNAL、CONDITION、PROPAGATE、0。

  • CANCELLED(1):表示當前結點已取消排程。當timeout或被中斷(響應中斷的情況下),會觸發變更為此狀態,進入該狀態後的結點將不會再變化。

  • SIGNAL(-1):表示後繼結點在等待當前結點喚醒。後繼結點入隊時,會將前繼結點的狀態更新為SIGNAL。

  • CONDITION(-2):表示結點等待在Condition上,當其他執行緒呼叫了Condition的signal()方法後,CONDITION狀態的結點將從等待佇列轉移到同步佇列中,等待獲取同步鎖。

  • PROPAGATE(-3):共享模式下,前繼結點不僅會喚醒其後繼結點,同時也可能會喚醒後繼的後繼結點。

  • 0:新結點入隊時的預設狀態。

注意,負值表示結點處於有效等待狀態,而正值表示結點已被取消。所以原始碼中很多地方用>0、<0來判斷結點的狀態是否正常。

同步佇列中節點的增加和移除

 

 

 通過圖可以看出來,在增加 尾節點的時候需要通過CAS設定,因為可能是多個執行緒同時設定,但是移除首節點的時候是不需要的,因為這個操作是由同步器操作的,並且首節點只有一個

獨佔式同步狀態的獲取與釋放

 

 

 AQS的從執行緒的獲取同步狀態到,對同步佇列的維護,到釋放,的流程圖就是這樣的,有興趣看原始碼的自己去跟一下,就是主要實現的模板方法,

注意:其實在這個給大家提個醒,看原始碼的時候,找核心的看,找主要的看,不要一行一行的扣著看,沒有意義,還有就是呼叫過程複雜,體會核心流程就可以

 

之前寫了<<Lock介面之Condition介面>>這一章,然後在這裡寫一下Condition介面在AQS裡面的實現吧,因為不管自己寫鎖也好,預設鎖的實現也好,用的Condition都是AQS預設寫好的

Condition實現分析:

 

 

 

 一個鎖是可以有多個Condition的,每個Condition都包含一個自己的等待佇列,不同於Object屬於同一個物件等待,他存在一個單鏈表結構的等待佇列,清晰的知道要喚醒自己的等待佇列中的節點,所以採用signal方法而不是signalall

當然採用的類還是Node類當然單鏈表其實就是沒有上一個節點的引用而已

等待佇列和同步佇列採用的是相同的類,只不過是實現的資料機構確是不一樣的而已

最終一個鎖的例項化會成為上圖中第二個圖的這種形式,Demo也就是之前寫的<<Lock介面之Condition介面>>中的用的鎖最終形成的結構及時就是維持了一個同步佇列和兩個等待佇列,鎖用於控制併發,而兩個佇列用於控制地點變化和公里數變化的不同的等待通知模式

節點在佇列中的移動

 

 

 就是在當前執行緒await的時候從同步佇列移除後加入到等待佇列尾部,而喚醒就是從等待佇列移除後加入到同步佇列尾部,兩個佇列相互轉換的過程,之所以採用同一個類,就是為了方便的在不同佇列中相互轉化

當然這也是為什麼不推薦使用SignalAll方法的原因,因為如果一個等待佇列中有很多的執行緒在等待,全部喚醒後,最多且只能有一個執行緒獲取到同步狀態,其他執行緒全部要被加入到同步佇列的末尾,而且也可能當前的同步狀態被別人持有,一個執行緒也獲取不到,全部都要被加入同步佇列中,所以不推薦使用SignalAll,推薦是用Signal

其實也可以想象,比如wait和notify/notifyAll 在寫<<執行緒之間的協作(等待通知模式)>>這篇文章的時候的最後一個問題也可以大概想象一下,應該也是維持了一個同步佇列,但是等待佇列應該是隻有一個,所以,被喚醒的是第一個等待的節點,但是它沒有辦法保證要被喚醒的節點一定是在頭一個,只能喚醒全部的節點,來保證需要喚醒的執行緒一定被喚醒,大概也是這樣的一個節點的移動,根據網路文章的描述,應該八九不離十

根據猜測,結合上方的Condition介面分析,所以說,在wait,notify/notifyAll中推薦使用notifyAll,防止第一個節點不是需要喚醒的節點,造成喚醒錯誤,但是Condition是知道的,被喚醒的一定是需要喚醒的,不會喚醒錯誤,所以說,推薦使用signal

能看到這裡的證明你真的很愛這個行業,你是最棒的!加油

回顧Lock的實現

ReentrantLock

其實在上面手寫的鎖,是有一些缺陷的,因為判斷的是不是等於1,所以他是一個不支援可重入的,一旦重入,就會造成死鎖,自己鎖住自己,但是ReentrantLock就不會

他支援鎖的可重入,並且支援鎖的公平和非公平

 

 通過原始碼可以看到,他是通過狀態的累加完成的鎖的可重入,當然前提是已經拿到鎖的執行緒,會有這樣一個判斷

 

 所以可想而知,釋放的時候,每次釋放就遞減,最終等於0的時候完成鎖的釋放

 

 在實現公平鎖的時候,就是判斷當前節點是否有前期節點,是不是第一個,如果有,不是第一個,抱歉你不能搶鎖

 

 可想而知在非公平鎖中就是不判斷而已

因為不需要判斷,並且是誰搶到鎖,鎖就是誰的,所以說非公平鎖比公平鎖效率高

 

ReentrantReadWriteLock

在讀寫鎖中,一個狀態如何 儲存兩個狀態呢?採用位數分割

 

 應該有知道 int是32位的,他把32位一分為二,採用低位儲存寫的狀態,高位儲存讀的狀態

寫鎖,應該都知道,只能同時被一個執行緒持有,所以重入的話,也比較好儲存

但是讀鎖不一樣,可以被多個執行緒同時持有,是共享鎖,並且重入的次數是不一樣的,那麼該則麼儲存呢?採用高位只儲存被多少執行緒持有

 

 採用每個持有鎖的執行緒中的一個HoldCounter物件儲存,使用ThreadLocalHoldCounter繼承ThreadLocal來儲存執行緒變數,區別不同執行緒

讀寫鎖的升級和降級

讀寫鎖支援寫鎖降級為讀鎖,但是不支援讀鎖升級為寫鎖,為了保證執行緒安全和資料可見性,因為在寫鎖執行期間,讀鎖是被阻塞的,所以說寫鎖降級為讀鎖是沒有問題的,但是如果是讀鎖升級為寫鎖,在其他執行緒使用完寫鎖的時候,讀鎖是看不見的,為了保證執行緒安全,所以不支援讀鎖升級成寫鎖

 

到此AQS就寫完了,因為AQS涉及的知識太多,能看到現在的也都是大神了,恭喜你們,掌握了併發程式設計的半壁江上,為了自己的夢想更近了一步,加油,因為知識點多,所以大家多看幾遍,不理解的可以百度,也可以評論區提問

 

作者:彼岸舞

時間:2020\11\18

內容關於:併發程式設計

本文來源於網路,只做技術分享,一概不負任何責任

&n