1. 程式人生 > >JAVA學習筆記(併發程式設計 - 陸)- J.U.C之AQS及其相關元件詳解

JAVA學習筆記(併發程式設計 - 陸)- J.U.C之AQS及其相關元件詳解

文章目錄

J.U.C之AQS-介紹

Java併發包(JUC)中提供了很多併發工具,這其中有很多耳熟能詳的併發工具,譬如ReentrangLock、Semaphore,而它們的實現都用到了一個共同的基類–AbstractQueuedSynchronizer(抽象佇列同步器),簡稱AQS。

AQS是JDK提供的一套用於實現基於FIFO等待佇列的阻塞鎖和相關的同步器的一個同步框架,它使用一個int型別的volatile變數(命名為state)來維護同步狀態,通過內建的FIFO佇列來完成資源獲取執行緒的排隊工作。

AbstractQueuedSynchronizer中對state的操作是原子的,且不能被繼承。所有的同步機制的實現均依賴於對改變數的原子操作。為了實現不同的同步機制,需要建立一個非共有的(non-public internal)擴充套件了AQS類的內部輔助類來實現相應的同步邏輯。

AbstractQueuedSynchronizer並不實現任何同步介面,它提供了一些可以被具體實現類直接呼叫的一些原子操作方法來重寫相應的同步邏輯。AQS同時提供了獨佔模式(exclusive)和共享模式(shared)兩種不同的同步邏輯。一般情況下,子類只需要根據需求實現其中一種模式,當然也有同時實現兩種模式的同步類,如ReadWriteLock。

使用AQS能簡單且高效地構造出應用廣泛的大量的同步器,比如提到的ReentrantLock,Semaphore,其他的諸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基於AQS的。

當然,也能利用AQS非常輕鬆容易地構造出符合私人需求的同步器,由此可知AQS是Java併發包中最為核心的一個基類。

AbstractQueuedSynchronizer底層資料結構是一個雙向連結串列,屬於佇列的一種實現:
在這裡插入圖片描述

  • sync queue:同步佇列,其中head節點主要負責後面的排程
  • Condition queue:單向連結串列,不是必須的,只有程式中使用到Condition的時候才會存在,可能會有多個Condition queue

關於AQS裡的state狀態:

AbstractQueuedSynchronizer維護了一個volatile int型別的變數,命名為state,用於表示當前同步狀態。volatile雖然不能保證操作的原子性,但是保證了當前變數state的可見性。state的訪問方式有三種:

getState()
setState()
compareAndSetState()

這三種操作均是原子操作,其中compareAndSetState的實現依賴於Unsafe的compareAndSwapInt()方法。

關於自定義資源共享方式:

AQS支援兩種資源共享方式:Exclusive(獨佔,只有一個執行緒能執行,如ReentrantLock)和Share(共享,多個執行緒可同時執行,如Semaphore/CountDownLatch)。這樣方便使用者實現不同型別的同步元件,獨佔式如ReentrantLock,共享式如Semaphore,CountDownLatch,組合式的如ReentrantReadWriteLock。總之,AQS為使用提供了底層支撐,如何組裝實現,使用者可以自由發揮。

關於同步器設計:

同步器的設計是基於模板方法模式的,一般的使用方式是這樣:

  • 使用者繼承AbstractQueuedSynchronizer並重寫指定的方法。(這些重寫方法很簡單,無非是對於共享資源state的獲取和釋放)
  • 將AQS組合在自定義同步元件的實現中,並呼叫其模板方法,而這些模板方法會呼叫使用者重寫的方法。這其實是模板方法模式的一個很經典的應用。

不同的自定義同步器爭用共享資源的方式也不同。自定義同步器在實現時只需要實現共享資源state的獲取與釋放方式即可,至於具體執行緒等待佇列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在底層實現好了。自定義同步器實現時主要實現以下幾種方法:

protected boolean isHeldExclusively() // 該執行緒是否正在獨佔資源。只有用到condition才需要去實現它。
protected boolean tryAcquire(int) // 獨佔方式。嘗試獲取資源,成功則返回true,失敗則返回false。
protected boolean tryRelease(int) // 獨佔方式。嘗試釋放資源,成功則返回true,失敗則返回false。
protected int tryAcquireShared(int) // 共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
protected boolean tryReleaseShared(int) // 共享方式。嘗試釋放資源,如果釋放後允許喚醒後續等待結點返回true,否則返回false。

如何使用:

首先,需要去繼承AbstractQueuedSynchronizer這個類,然後根據需求去重寫相應的方法,比如要實現一個獨佔鎖,那就去重寫tryAcquire,tryRelease方法,要實現共享鎖,就去重寫tryAcquireShared,tryReleaseShared;最後,在元件中呼叫AQS中的模板方法就可以了,而這些模板方法是會呼叫到之前重寫的那些方法的。也就是說,只需要很小的工作量就可以實現自己的同步元件,重寫的那些方法,僅僅是一些簡單的對於共享資源state的獲取和釋放操作,至於像是獲取資源失敗,執行緒需要阻塞之類的操作,自然是AQS來幫忙完成了。

具體實現的思路:

  1. 首先AQS內部維護了一個CLH佇列,來管理鎖
  2. 執行緒嘗試獲取鎖,如果獲取失敗,則將等待資訊等包裝成一個Node結點,加入到同步佇列Sync queue裡
  3. 不斷重新嘗試獲取鎖(當前結點為head的直接後繼才會嘗試),如果獲取失敗,則會阻塞自己,直到被喚醒
  4. 當持有鎖的執行緒釋放鎖的時候,會喚醒佇列中的後繼執行緒

設計思想:

對於使用者來講,無需關心獲取資源失敗,執行緒排隊,執行緒阻塞/喚醒等一系列複雜的實現,這些都在AQS中為使用者處理好了。使用者只需要負責好自己的那個環節就好,也就是獲取/釋放共享資源state的姿勢。很經典的模板方法設計模式的應用,AQS為使用者定義好頂級邏輯的骨架,並提取出公用的執行緒入佇列/出佇列,阻塞/喚醒等一系列複雜邏輯的實現,將部分簡單的可由使用者決定的操作邏輯延遲到子類中去實現即可。

基於AQS的同步元件:

  • CountDownLatch
  • Semaphore
  • CyclicBarrier
  • ReentrantLock
  • Condition
  • FutureTask

AQS小結:

  • 使用Node實現FIFO佇列,可以用於構建鎖或者其他同步裝置的基礎框架
  • 利用了一個int型別表示狀態,有一個state的成員變數,表示獲取鎖的執行緒數(0沒有執行緒獲取鎖,1有執行緒獲取鎖,大於1表示重入鎖的數量),和一個同步元件ReentrantLock。狀態資訊通過procted級別的getState,setState,compareAndSetState進行操作
  • 使用方法是繼承,然後複寫AQS中的方法,基於模板方法模式
  • 子類通過繼承並通過實現它的方法管理其狀態{acquire和release}的方法操作狀態
  • 可以同時實現排它鎖和共享鎖的模式(獨佔、共享)

CountDownLatch

CountDownLatch是一個同步工具類,它允許一個或多個執行緒一直等待,直到其他執行緒執行完後再執行。例如,應用程式的主執行緒希望在負責啟動框架服務的執行緒已經啟動所有框架服務之後執行。

CountDownLatch是通過一個計數器來實現的,計數器的初始化值為執行緒的數量。每當一個執行緒完成了自己的任務後,計數器的值就相應得減1。當計數器到達0時,表示所有的執行緒都已完成任務,然後在閉鎖上等待的執行緒就可以恢復執行任務。
在這裡插入圖片描述

CountDownLatch的建構函式原始碼如下:

    /**
     * Constructs a {@code CountDownLatch} initialized with the given count.
     *
     * @param count the number of times {@link #countDown} must be invoked
     *        before threads can pass through {@link #await}
     * @throws IllegalArgumentException if {@code count} is negative
     */
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

計數器count是閉鎖需要等待的執行緒數量,只能被設定一次,且CountDownLatch沒有提供任何機制去重新設定計數器count。

與CountDownLatch的第一次互動是主執行緒等待其他執行緒。主執行緒必須在啟動其他執行緒後立即呼叫CountDownLatch.await()方法。這樣主執行緒的操作就會在這個方法上阻塞,直到其他執行緒完成各自的任務。

其他N個執行緒必須引用CountDownLatch閉鎖物件,因為它們需要通知CountDownLatch物件,它們各自完成了任務;這種通知機制是通過CountDownLatch.countDown()方法來完成的;每呼叫一次,count的值就減1,因此當N個執行緒都呼叫這個方法,count的值就等於0,然後主執行緒就可以通過await()方法,恢復執行自己的任務。

注:該計數器的操作是原子性的

CountDownLatch使用場景:

實現最大的並行性:有時使用者想同時啟動多個執行緒,實現最大程度的並行性。例如,使用者想測試一個單例類。如果使用者建立一個初始計數器為1的CountDownLatch,並讓其他所有執行緒都在這個鎖上等待,只需要呼叫一次countDown()方法就可以讓其他所有等待的執行緒同時恢復執行。
開始執行前等待N個執行緒完成各自任務:例如應用程式啟動類要確保在處理使用者請求前,所有N個外部系統都已經啟動和運行了。
死鎖檢測:一個非常方便的使用場景是你用N個執行緒去訪問共享資源,在每個測試階段執行緒數量不同,並嘗試產生死鎖。
使用示例
1.基本用法:

package com.mmall.concurrency.example.aqs;

import com.mmall.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
@ThreadSafe
public class CountDownLatchExample1 {
    private static int threadCount=200;

    public static void main(String[] args) throws InterruptedException {
        ExecutorService exec= Executors.newCachedThreadPool();
        CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        for (int i = 0; i <threadCount ; i++) {
            final int threadNum=i;
            exec.execute(()->{
                try {
                    test(threadNum);
                } catch (InterruptedException e) {
                    log.error("Exception",e);
                }finally {
                    countDownLatch.countDown();//-1
                }
            });
        }
        countDownLatch.await();//確保上面執行完後執行下面的程式碼
        log.info("finish");
        exec.shutdown();
    }
    private static void test(int threadCount) throws InterruptedException {
        Thread.sleep(100);
        log.info("{}",threadCount);
        Thread.sleep(100);
    }
}

2.比如有多個執行緒完成一個任務,但是這個任務只想給它一個指定的時間,超過這個任務就不繼續等待了,完成多少算多少:

package com.mmall.concurrency.example.aqs;

import com.mmall.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@Slf4j
@ThreadSafe
public class CountDownLatchExample2 {
    private static int threadCount=200;

    public static void main(String[] args) throws InterruptedException {
        ExecutorService exec= Executors.newCachedThreadPool();
        CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        for (int i = 0; i <threadCount ; i++) {
            final int threadNum=i;
            exec.execute(()->{
                try {
                    test(threadNum);
                } catch (InterruptedException e) {
                    log.error("Exception",e);
                }finally {
                    countDownLatch.countDown();//-1
                }
            });
        }
        //10毫秒後無論上方程式碼執行否都會繼續執行下方程式碼;上方程式碼是100毫秒執行時間;
        countDownLatch.await(10,TimeUnit.MILLISECONDS);//確保上面執行完後執行下面的程式碼
        log.info("finish");
        exec.shutdown();//並不是馬上銷燬執行緒池;而是保證上方程式碼執行結果完成才銷燬
    }
    private static void test(int threadCount) throws InterruptedException {
        Thread.sleep(100);
        log.info("{}",threadCount);
    }
}

Semaphore

Semaphore(訊號量)是用來控制同時訪問特定資源的執行緒數量,它通過協調各個執行緒,以保證合理的使用公共資源。很多年以來,我都覺得從字面上很難理解Semaphore所表達的含義,只能把它比作是控制流量的紅綠燈,比如XX馬路要限制流量,只允許同時有一百輛車在這條路上行使,其他的都必須在路口等待,所以前一百輛車會看到綠燈,可以開進這條馬路,後面的車會看到紅燈,不能駛入XX馬路,但是如果前一百輛中有五輛車已經離開了XX馬路,那麼後面就允許有5輛車駛入馬路,這個例子裡說的車就是執行緒,駛入馬路就表示執行緒在執行,離開馬路就表示執行緒執行完成,看見紅燈就表示執行緒被阻塞,不能執行。
在這裡插入圖片描述
所以簡單來說,Semaphore主要作用就是可以控制同一時間併發執行的執行緒數。Semaphore有兩個建構函式,引數permits表示許可數,它最後傳遞給了AQS的state值。執行緒在執行時首先獲取許可,如果成功,許可數就減1,執行緒執行,當執行緒執行結束就釋放許可,許可數就加1。如果許可數為0,則獲取失敗,執行緒位於AQS的等待佇列中,它會被其它釋放許可的執行緒喚醒。在建立Semaphore物件的時候還可以指定它的公平性。一般常用非公平的訊號量,非公平訊號量是指在獲取許可時先嚐試獲取許可,而不必關心是否已有需要獲取許可的執行緒位於等待佇列中,如果獲取失敗,才會入列。而公平的訊號量在獲取許可時首先要檢視等待佇列中是否已有執行緒,如果有則入列。
在這裡插入圖片描述

使用場景:

Semaphore可以用於做流量控制,特別公用資源有限的應用場景,比如資料庫連線。假如有一個需求,要讀取幾萬個檔案的資料,因為都是IO密集型任務,可以啟動幾十個執行緒併發的讀取,但是如果讀到記憶體後,還需要儲存到資料庫中,而資料庫的連線數只有10個,這時必須控制只有十個執行緒同時獲取資料庫連線儲存資料,否則會報錯無法獲取資料庫連線。這個時候,就可以使用Semaphore來做流控。

使用示例

1.每次獲取1個許可示例:

package com.mmall.concurrency.example.aqs;

import com.mmall.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

@Slf4j
@ThreadSafe
public class SemaphoreExample1 {
    private static int threadCount=200;

    public static void main(String[] args) throws InterruptedException {
        ExecutorService exec= Executors.newCachedThreadPool();

        final Semaphore semaphore=new Semaphore(20);//設定併發量

        for (int i = 0; i <threadCount ; i++) {
            final int threadNum=i;
            exec.execute(()->{
                try {
                    semaphore.acquire();//獲取一個許可
                    test(threadNum);
                    semaphore.release();//釋放一個許可
                } catch (InterruptedException e) {
                    log.error("Exception",e);
                }
            });
        }
        log.info("finish");
        exec.shutdown();
    }
    private static void test(int threadCount) throws InterruptedException {
        log.info("{}",threadCount);
        Thread.sleep(1000);
    }
}

在程式碼中,雖然有200個執行緒在執行,但是隻允許10個併發的執行。Semaphore的構造方法Semaphore(int permits) 接收一個整型的數字,表示可用的許可證數量。所以Semaphore(10)表示允許10個執行緒獲取許可證,也就是最大併發數是10。Semaphore的用法也很簡單,首先執行緒使用Semaphore的acquire()獲取一個許可證,使用完之後呼叫release()歸還許可證。還可以用tryAcquire()方法嘗試獲取許可證。

2.如何希望每次獲取多個許可的話,只需要在acquire()方法的引數中進行指定即可,如下示例:

package com.mmall.concurrency.example.aqs;

import com.mmall.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

@Slf4j
@ThreadSafe
public class SemaphoreExample2 {
    private static int threadCount=20;

    public static void main(String[] args) throws InterruptedException {
        ExecutorService exec= Executors.newCachedThreadPool();

        final Semaphore semaphore=new Semaphore(3);//設定併發量

        for (int i = 0; i <threadCount ; i++) {
            final int threadNum=i;
            exec.execute(()->{
                try {
                    semaphore.acquire(3);//獲取多個許可
                    test(threadNum);
                    semaphore.release(3);//獲取多個許可
                } catch (InterruptedException e) {
                    log.error("Exception",e);
                }
            });
        }
        log.info("finish");
        exec.shutdown();
    }
    private static void test(int threadCount) throws InterruptedException {
        log.info("{}",threadCount);
        Thread.sleep(1000);
    }
}

3.當併發很高,想要超過允許的併發數之後,就丟棄不處理的話,可以使用Semaphore裡的tryAcquire()方法嘗試獲取許可,該方法返回boolean型別的值,可以通過判斷這個值來拋棄超過併發數的請求。如下示例:

package com.mmall.concurrency.example.aqs;

import com.mmall.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

@Slf4j
@ThreadSafe
public class SemaphoreExample3 {
    private static int threadCount=20;

    public static void main(String[] args) throws InterruptedException {
        ExecutorService exec= Executors.newCachedThreadPool();

        final Semaphore semaphore=new Semaphore(3);//設定併發量

        for (int i = 0; i <threadCount ; i++) {
            final int threadNum=i;
            exec.execute(()->{
                try {
                    if(semaphore.tryAcquire()) {//嘗試獲取一個許可
                        test(threadNum);
                        semaphore.release();//釋放一個許可
                    }
                } catch (InterruptedException e) {
                    log.error("Exception",e);
                    //會有三個執行緒執行其他都會被丟棄
                }
            });
        }
        log.info("finish");
        exec.shutdown();
    }
    private static void test(int threadCount) throws InterruptedException {
        log.info("{}",threadCount);
        Thread.sleep(1000);
    }
}

Semaphore中嘗試獲取許可的相關方法:
在這裡插入圖片描述
可以指定嘗試獲取許可的超時時間,例如設定超時時間為1秒:

// 嘗試獲取一個許可,直到超過一秒
if (semaphore.tryAcquire(1, TimeUnit.SECONDS)) {
    System.out.println(threadNum);
    // 釋放一個許可
    semaphore.release();
}

除此之外,還可以嘗試獲取多個許可,並且指定超時時間:

// 嘗試獲取多個許可,直到超過一秒
if (semaphore.tryAcquire(3, 1, TimeUnit.SECONDS)) {
    System.out.println(threadNum);
    // 釋放多個許可
    semaphore.release(3);
}

Semaphore中其他一些常用的方法:

int availablePermits()             // 返回此訊號量中當前可用的許可證數。
int getQueueLength()               // 返回正在等待獲取許可證的執行緒數。
boolean hasQueuedThreads()         // 是否有執行緒正在等待獲取許可證。
void reducePermits(int reduction)  // 減少reduction個許可證。是個protected方法。
Collection getQueuedThreads()      // 返回所有等待獲取許可證的執行緒集合。是個protected方法。

CyclicBarrier

CyclicBarrier 的字面意思是可迴圈使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組執行緒到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個執行緒到達屏障時,屏障才會開門,所有被屏障攔截的執行緒才會繼續幹活。當某個執行緒呼叫了await方法之後,就會進入等待狀態,並將計數器-1,直到所有執行緒呼叫await方法使計數器為0,才可以繼續執行,由於計數器可以重複使用,所以又可以叫他迴圈屏障。CyclicBarrier預設的構造方法是CyclicBarrier(int parties),其引數表示屏障攔截的執行緒數量,每個執行緒呼叫await方法告訴CyclicBarrier我已經到達了屏障,然後當前執行緒被阻塞。
在這裡插入圖片描述

CyclicBarrier的應用場景:

CyclicBarrier可以用於多執行緒計算資料,最後合併計算結果的應用場景。比如用一個Excel儲存了使用者所有銀行流水,每個Sheet儲存一個帳戶近一年的每筆銀行流水,現在需要統計使用者的日均銀行流水,先用多執行緒處理每個sheet裡的銀行流水,都執行完之後,得到每個sheet的日均銀行流水,最後,再用barrierAction用這些執行緒的計算結果,計算出整個Excel的日均銀行流水。

CyclicBarrier和CountDownLatch的區別:

  • CountDownLatch的計數器只能使用一次。而CyclicBarrier的計數器可以使用reset() 方法重置。所以CyclicBarrier能處理更為複雜的業務場景,比如如果計算髮生錯誤,可以重置計數器,並讓執行緒們重新執行一次。
  • CountDownLatch主要用於實現一個或n個執行緒需要等待其他執行緒完成某項操作之後,才能繼續往下執行,描述的是一個或n個執行緒等待其他執行緒的關係,而CyclicBarrier是多個執行緒相互等待,知道滿足條件以後再一起往下執行。描述的是多個執行緒相互等待的場景
  • CyclicBarrier還提供其他有用的方法,比如getNumberWaiting方法可以獲得CyclicBarrier阻塞的執行緒數量。isBroken方法用來知道阻塞的執行緒是否被中斷。

CyclicBarrier方法列表:
在這裡插入圖片描述

使用示例

1.基本使用:

package com.mmall.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CyclicBarrier;
impor