1. 程式人生 > >並發編程(十):AQS

並發編程(十):AQS

rri 線程同步 不知道 ota void 兩個類 aqs 自增 執行c

AQS全稱為AbstractQueuedSynchronizer,是並發容器中的同步器,AQS是J.U.C的核心,它是抽象的隊列式的同步器,AQS定義了一套多線程訪問共享資源的同步器框架,許多同步類都依賴它,如ReentrantLock、Semaphore、CyclicBarrier、ReentrantLock、Condition、FutureTask等。

AQS的特點:

a、使用Node實現FIFO隊列,可以用於構建鎖或者其他同步裝置的基礎框架

b、利用一個int類型表示狀態

c、使用方法是繼承

d、子類通過繼承並通過實現它的方法管理其狀態

e、可以同時實現排它鎖和共享鎖模式(獨占、共享)

AQS實現原理:

AQS維護了一個volatile int state和一個FIFO線程等待隊列

state的訪問方式有三種:getState(),setState(),compareAndSetState()

AQS定義兩種資源共享方式,Exclusive(獨占,只有一個線程能執行),Share(共享,多個線程可同時執行)

自定義同步器在實現時只需要實現共享資源state的獲取與釋放方式即可,具體線程等待隊列的維護,如獲取資源失敗入隊,喚醒出隊等,AQS在頂層實現好了。自定義同步器實現時主要實現以下幾種方法;

tryAcquire(int):獨占方式,嘗試獲取資源,成功返回true,失敗返回false

tryRelease(int): 獨占方法,嘗試釋放資源,成功則返回true,失敗則返回false

tryAcquireShared(int):共享方式,嘗試獲取資源,負數表示失敗,0表示成功,但沒有剩余可用資源,正數表示成功,且有剩余資源

tryReleaseShared(int):共享方式,嘗試釋放資源,如果釋放後允許喚醒後續等待節點返回true,否則返回false

acquire(int)

a、tryAcquire()嘗試直接去獲取資源,如果成功則直接返回;

b、addWaiter() 將該線程加入等待隊列的尾部,並標記為獨占模式

c、acquireQueueed() 使線程在等待隊列中休息,有機會時會去嘗試獲取資源,獲取到資源後才返回,如果在正在等待過程中被中斷過,則返回true,否則返回false

d、如果線程在等待過程中被中斷過,它是不響應的,只是獲取資源後才進行自我中斷selfInterrupt(),將中斷補上

release(int)

此方法是獨占模式下線程釋放共享資源的頂層入口,它會釋放指定量的資源,如果徹底釋放了(state=0),它會喚醒等待隊列裏的其他線程來獲取資源

acquireShared(int)

此方法是共享模式下線程獲取共享資源的頂層入口,它會獲取指定量的資源,獲取成功則直接返回,獲取失敗則進入等待隊列,直到獲取到資源為止, 跟獨占模式比這裏只有線程是head.next時,才會去嘗試獲取資源,有剩余的話還會喚醒之後的隊友,假如老大用完後釋放了5個資源,而老二需要6個,老三需要1個,老四需要兩個,老大先喚醒老二,老二一看資源不夠,他是把資源讓給老三呢還是不讓,答案是老二會繼續park(),等待其他線程釋放資源,也更不會去喚醒老三和老四了,獨占模式同一時刻只有一個線程去執行,但共享模式下,多個線程是可以同時執行的,因為老二的資源需求量大,而把後面量小的老三和老四也都卡住了,它跟acqure()流程大同小異,只不過多了個自己拿到資源後,還會去喚醒後繼隊友的操作

releaseShared()

此方式是共享模式下線程釋放共享資源的頂層入口,它會釋放指定量的資源,如果成功釋放且允許喚醒等待線程,它會喚醒等待隊列裏的其他線程來獲取資源,也就是釋放掉資源後,喚醒後繼

下面我們介紹一下通過AQS實現的類的例子,CountDownLatch、Semaphore、CyclicBarrier、ReentrantLock等都是通過AQS實現的,其中CountDownLatch和Semaphore我們已經在前面的博客中說過了,我們著重來看剩下的兩個類

CyclicBarrier

它允許一組工作線程相互等待,直到到達某個工作屏障點,只有當每個線程都準備就緒後才能繼續執行後面的操作,它和CountDownLatch有相似的地方都是通過計數器實現的,但它在釋放等待線程後可以重用,是循環屏障,可以一直循環來使用(計數器可重置)。CountDownLatch是一個或多個線程等待一個線程的關系,CyclicBarrier主要是實現了多個線程之間相互等待,直到所有線程都滿足條件後才能繼續後續的操作,如有五個線程在等待,只有這5個線程都調用了await()方法後才能繼續執行

技術分享圖片

CyclicBarrier-demo1

@Slf4j
public class CyclicBarrierExample1 {

    //定義有多少線程同步等待
    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args)throws Exception {
        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(()->{

                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception",e);
                }

            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum)throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready",threadNum);
        barrier.await();
        log.info("{} continue",threadNum);

    }
}

輸出如下:

技術分享圖片

使用此類時需先給出需要相互等待的線程數,如demo中我給出的是5個,當調用await()方法的線程數達到5個後才能繼續執行,await()還可以設置等待的時候,如果超過等待時間則繼續往下執行

CyclicBarrier-demo2

@Slf4j
public class CyclicBarrierExample3 {

//    達到屏障之後,優先執行callback is running
    private static CyclicBarrier barrier = new CyclicBarrier(5,()->{
        log.info("callback is running");
    });

    public static void main(String[] args)throws Exception {
        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(()->{

                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception",e);
                }

            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum)throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready",threadNum);
        barrier.await();
        log.info("{} continue",threadNum);

    }
}

輸出為:

技術分享圖片

此demo告訴我們當線程到達屏障的時候,可設置優先執行的代碼

ReentrantLock

我們首先說一下ReentrantLock和synchronized的區別

a、它倆都是可重入鎖,都是同一個線程進入一次鎖的計數器就自增1,所以等到鎖的計數器下降為0時才會釋放鎖

b、synchronized是JVM實現的,ReentrantLock是JDK實現的

c、synchronized引入偏向鎖、自旋鎖後性能已經和ReentrantLock差不多了,官方推薦使用synchronized

d、synchronized使用更加簡潔,是由編譯器保證鎖的加鎖和釋放的,ReentrantLock需要手工加鎖和釋放鎖,但在鎖的細粒度和靈活度方面ReentrantLock會優於synchronized

下面是ReentrantLock獨有的功能

a、可指定是公平鎖(先等待的線程先獲得鎖)還是非公平鎖

b、提供了一個Condition類,可以分組喚醒需要喚醒的線程,而不是像synchronized要麽隨機喚醒一個線程,要麽喚醒所有線程

c、提供能夠中斷等待鎖的機制

ReentrantLock-demo

@Slf4j
public class LockExample2 {

    //請求總數
    public static int clientTotal = 5000;
    //同時並發執行的線程數
    public static int threadTotal = 200;

    public static int count = 0;

    private final static Lock lock = new ReentrantLock();

    private  static void add() {

        lock.lock();
        try {
            count++;
        }finally {
           lock.unlock();
        }

    }

    public static void main(String[] args)throws Exception {

        //定義線程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        //定義信號量
        final Semaphore semaphore = new Semaphore(threadTotal);
        //定義計數器閉鎖
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);

        for (int i = 0; i < clientTotal; i++) {
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {

                    log.error("exception",e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}",count);

    }

}

運行結果為5000,上面就是簡單的ReentrantLock的使用

ReentrantReadWriteLock

@Slf4j
public class LockExample3 {

    private final Map<String, Data> map = new TreeMap<>();
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    private final Lock readLock = lock.readLock();
    private final Lock writeLock = lock.writeLock();

    public Data get(String key) {
        readLock.lock();
        try {
            return map.get(key);
        }finally {
            readLock.unlock();
        }
    }

    public Set<String> getAllKeys() {

        readLock.lock();
        try {
            return map.keySet();
        }finally {
            readLock.unlock();
        }
    }



    public Data put(String key, Data value) {
        writeLock.lock();
        try {
            return map.put(key,value);
        }finally {
            writeLock.unlock();
        }
    }

    class Data{

    }

}

此demo為讀寫鎖demo,對讀和寫分別進行鎖定操作,在獲取寫入鎖的時候,不允許讀操作的鎖還在保持著,如果運用在讀特別多而寫特別少的時候,會導致寫操作的饑餓,寫會一直在等待,不知道什麽時候會獲取鎖

StampedLock

@Slf4j
@ThreadSafe
public class LockExample4 {

    //請求總數
    public static int clientTotal = 5000;
    //同時並發執行的線程數
    public static int threadTotal = 200;

    public static int count = 0;

    private final static StampedLock lock = new StampedLock();

    private  static void add() {

        long stamp = lock.writeLock();
        try {
            count++;
        }finally {
           lock.unlock(stamp);
        }

    }

    public static void main(String[] args)throws Exception {

        //定義線程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        //定義信號量
        final Semaphore semaphore = new Semaphore(threadTotal);
        //定義計數器閉鎖
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);

        for (int i = 0; i < clientTotal; i++) {
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception",e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}",count);

    }

}

StampedLock控制鎖有三種模式,分別是寫、讀和樂觀讀,StampedLokc由版本和模式兩個部分組成,使用鎖時返回一個數字作為票據,用相應的鎖狀態來表示並控制相關的訪問,數字0表示沒有寫鎖被首先訪問,在讀鎖上分為悲觀鎖和樂觀鎖,樂觀讀就是在讀的操作很多而寫的操作很少的情況下,我們可以樂觀的認為寫入與讀取同時方法的概率很小,因此不悲觀的使用完全鎖定,程序可以查看讀取資料之後是否遭到寫入之前的變更再采取後續的措施,這個改進可以大幅度提高程序的吞吐量

Condition

@Slf4j
public class LockExample5 {

    public static void main(String[] args) {

        ReentrantLock reentrantLock = new ReentrantLock();
        Condition condition = reentrantLock.newCondition();

        new Thread(()->{
            try {
                reentrantLock.lock();
                log.info("wait signal");
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("get signal");
            reentrantLock.unlock();
        }).start();

        new Thread(()->{
            reentrantLock.lock();
            log.info("get lock");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            condition.signalAll();
            log.info("send signal");
            reentrantLock.unlock();
        }).start();
    }
}

輸出結果如下:

技術分享圖片

執行過程如下:首先定義ReentrantLock的實例,並從實例中取出Condition,線程一調用reentrantLock.lock()方法,線程加入到aqs的等待隊列中去,當調用condition.await()的時候,將線程從aqs隊列中移除,對應的操作是鎖的釋放,並加入到condition的等待隊列中去,此線程等待,因為線程一釋放鎖的關系,線程2被喚醒,線程2獲取鎖,也加入到aqs的等待隊列中,線程2在執行完後執行了condition.sigalAll(),此時線程1被從condition的等待隊列中取出放入aqs的等待隊列中去,但此時線程一並沒有被喚醒,直到線程二執行了reentrantLock.unlock,釋放鎖,此時aqs的等待隊列中只剩下線程一,按照從頭到尾的順序喚醒線程,線程一被喚醒,繼續執行,之後線程一釋放鎖,這個過程完畢

此外最後,向大家推薦一遍關於aqs實現原理的文章,這篇博客講的非常詳細,大家可以仔細研讀一下 https://www.cnblogs.com/waterystone/p/4920797.html

並發編程(十):AQS