多執行緒-day-11AbstractQueuedSynchronizer深入分析
AbstractQueuedSynchronizer深入分析
AQS理解起來不難,繁複的概念卻讓人望而生怯,這裡將花幾天時間對AQS進行一個詳細剖析。
什麼是AQS?
AQS(AbstractQueuedSynchronizer
),AQS是JDK下提供的一套用於實現基於FIFO等待佇列的阻塞鎖和相關的同步器的一個同步框架。這個抽象類被設計為作為一些可用原子int值來表示狀態的同步器的基類。如果你有看過類似 CountDownLatch
類的原始碼實現,會發現其內部有一個繼承了 AbstractQueuedSynchronizer
Sync
。可見 CountDownLatch
是基於AQS框架來實現的一個同步器.類似的同步器在JUC下還有不少。(eg. Semaphore
)
AQS的核心思想是基於volatile int state這樣的volatile變數,配合Unsafe工具對其原子性的操作來實現對當前鎖狀態進行修改。同步器內部依賴一個FIFO的雙向佇列來完成資源獲取執行緒的排隊工作。
下面先看看AQS資料結構和加入節點,移除節點的流程圖
AQS中的資料結構 -- 節點和同步佇列
節點加入到同步佇列
首節點變化
上面三個圖介紹了AQS的資料結構,節點加入,首節點變化(當首節點完成後,後續節點競爭或同步設定為首節點)的流程圖介紹。
AQS資料結構介紹
由上圖可以看出,AQS的資料結構是一個雙向連結串列結構。在同步佇列中,head指向的是同步佇列中的首節點(頭結點),tail指向的是同步佇列中的尾節點。佇列中的執行緒都以連結串列形式連線。
當競爭失敗的執行緒會被打包成Node放到同步佇列中,Node可能的狀態有:
CANCELLED:執行緒等待超時或者被中斷了,需要從佇列中移走
SIGNAL:後續的節點等待狀態,當前節點,通知後面的節點去執行
CONDITION:當前節點處於等待佇列
PROPAGATE:共享,表示狀態要往後面的節點傳播
0:表示初始狀態
節點在同步佇列中增加和移出。當同步佇列中的執行緒獲取到鎖之後,執行對應的業務操作,完成業務操作後,釋放鎖,並在同步佇列總移出。當有新的執行緒要加入到同步佇列中,則會在同步佇列的節點末尾增加新的執行緒。
首先,來看一下獨佔鎖同步狀態獲取與釋放:
流程講解:
1、當執行緒獲取同步狀態state時,如果獲取成功,則操作完業務後退出並返回。
2、當執行緒獲取同步狀態state時,如果獲取失敗:
①、則會生成一個同步佇列的節點,並加入到同步佇列的尾部
②、會通過CAS自旋設定:
A、如果判定前驅為頭節點,則去獲取同步狀態state
I、如果獲取到了同步狀態,則當前節點被設定為頭結點,並執行邏輯業務退出返回。
II、如果獲取同步狀態失敗,則繼續進入等待狀態,等待執行緒被中斷或者前驅節點被釋放後,繼續通過CAS自旋操作獲取同步狀態,直到成功。
B、如果判定前驅不為頭結點,則進入等待狀態,等待執行緒被中斷或者前驅節點被釋放後,繼續通過CAS自旋操作獲取同步狀態,直到成功。
這裡就不講共享鎖的流程圖了。畢竟,共享鎖的話,可以想象ReetrantLock提供的讀寫鎖中的讀鎖,讀鎖之間的執行緒共享,不互斥,可以同時進入訪問。
我們先來看看AQS(AbstractQueuedSynchronizer)的JDK的結構
瞭解AQS的使用方式和其中的設計模式。
AQS的使用方式:
1、AQS(AbstractQueuedSynchronizer)是一個抽象類,繼承AbstractOwnableSynchronizer抽象類。
2、因此,只有通過繼承(extends)方式來使用AQS
AQS的設計模式:
AQS採用的設計模式為:模板方法設計模式。
模板方法設計模式的概念:定義一個操作中演算法的骨架,而將一些步驟延遲到子類中,模板方法使得子類可以不改變演算法的結構即可重定義該演算法的某些特定步驟。通俗點的理解就是:完成一件事情,有固定的數個步驟,但是每個步驟根據物件的不同,而實現細節不同;就可以在父類中定義一個完成該事情的總方法,按照完成事件需要的步驟去呼叫其每個步驟的實現方法。每個步驟的具體實現,由子類完成。
模板方法設計模式應用舉例
傳送訊息/郵件場景:我們所知道,當公司裡面制定了一套郵件或者訊息機制,那麼這套郵件或者訊息機制都是有一個通用模板的,只是裡面的內容或者方式可能需要再去個性化的制定。
定義一個郵件/訊息的父類(抽象類)SendToCustomer,裡面的構成程式碼如下:
import java.util.Date;
/**
*
* @FileName SendCustom.java
* @Author xiaogaoxiang
* @At 2018年11月14日 下午11:32:17
* @Desc 訊息模板方法
*/
public abstract class SendCustom {
// 訊息/郵件要傳送給哪個使用者
public abstract void to();
// 訊息/郵件來由哪個使用者傳送
public abstract void from();
// 訊息/郵件內容
public abstract void content();
// 日期,因為是統一的取當前日期,因此不需要子類再去實現
public void date() {
System.out.println(new Date());
}
// 傳送
public abstract void send();
// 框架方法-模板方法
public void sendMessage() {
to();
from();
content();
date();
send();
}
}
下面建立一個模板方法的派生類SendSms訊息傳送類,繼承了SendToCustomer抽象類
/**
*
* @FileName SendSms.java
* @Author xiaogaoxiang
* @At 2018年11月14日 下午11:33:07
* @Desc 模板方法的派生類
*/
public class SendSms extends SendCustom {
@Override
public void to() {
// 自定義輸出
System.out.println("zhangsan");
}
@Override
public void from() {
// 自定義輸出
System.out.println("xiaogaoxiang");
}
@Override
public void content() {
// 自定義輸出
System.out.println("Hello world");
}
@Override
public void send() {
// 自定義輸出
System.out.println("Send sms");
}
public static void main(String[] args) {
SendCustom sendC = new SendSms();
sendC.sendMessage();
}
}
模板方法設計模式很簡單,通過定義統一行為的抽象類,再通過其派生類去做個性化的制定實現某種業務即可。
通過以上了解了AQS採用的是模板方法設計模式的思想之後,我們來了解一下,AQS裡面制定了那些模板方法:
獨享式獲取:accquire、acquireInterruptibly、 tryAcquireNanos
共享式獲取:acquireShared、acquireSharedInterruptibly、tryAcquireSharedNanos
獨佔式釋放鎖:release
共享式釋放鎖:releaseShared
需要子類覆蓋的流程方法:
獨佔式獲取:tryAcquire
獨佔式釋放:tryRelease
共享式獲取:tryAcquireShared
共享式釋放:tryReleaseShared
同步器是否處於獨佔模式:isHeldExclusively
同步狀態state:
getState:獲取當前的同步狀態
setState:設定當前同步狀態
compareAndSetState:使用CAS設定狀態,保證狀態設定的原子性
整體對AQS(AbstractQueuedSynchronizer)抽象類裡面定義的內部類,方法進行理解。
1、AQS實現了自身的構造方法。裡面是預設的構造方法
2、定義了一個靜態的final Node類,該類裡面有:
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {
}
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
static final Node SHARED = new Node(); // 定義了共享節點的物件,用來標記哪些執行緒屬於共享節點
static final Node EXCLUSIVE = null; // 定義了獨享節點的物件,用來標記哪些執行緒屬於獨享節點
static final int CANCELLED = 1; // 表示執行緒已取消,用數字1進行標誌
static final int SIGNAL = -1; // 表示執行緒成功獲取到鎖,需要進行喚醒,用數字-1進行標誌
static final int CONDITION = -2; // 表示執行緒需要繼續等待,用數字-2進行標誌
static final int PROPAGATE = -3; // 表示執行緒無條件獲取到鎖,用數字-3進行標誌(這個需要再考證一下)
volatile int waitStatus; // 定義了一個volatile型別的整型數waitStatus,表示執行緒當前所處的狀態,有5個狀態,分別為:SIGNAL、CANCELLED、CONDITION、PROPAGATE、0;其中0表示沒有一種是處於上面4中狀態的情況。
volatile Node prev; // 定義了volatile型別的節點物件,表示前驅節點
volatile Node next; // 定義了volatile型別的節點物件,表示下一個節點
volatile Thread thread; // 定義了volatile型別的Thread類物件,對執行緒進行同步控制
Node nextWaiter; // 定義了Node物件nextWaiter,表示下一個等待節點
// 定義了一個返回boolean值的isShared方法,判斷節點是否是共享執行緒
final boolean isShared() {
return nextWaiter == SHARED;
}
// 定義了返回前驅節點的物件predecessor方法,並且對空指標進行了判斷,這裡空指標可以忽略不寫,但是加上可以減輕對虛擬機器VM的壓力。
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
// 初始化操作
Node() {}
// 增加新節點
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
// 增加等待節點
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
AQS中變數和方法的定義和理解:
// 等待佇列的頭結點,懶載入初始化。除了初始化,只能通過setHead進行修改。備註:如果頭結點存在,那麼它的狀態一定能確保不是CANCELLED狀態的。
private transient volatile Node head;
// 等待佇列的尾節點,懶載入初始化。只能通過enq(進入佇列)來增加一個新的節點進行修改。
private transient volatile Node tail;
// 同步執行緒的狀態
private volatile int state;
// 返回當前同步執行緒的狀態
protected final int getState() {
return state;
}
// 設定同步執行緒的狀態
protected final void setState(int newState) {
state = newState;
}
// 通過原子操作設定如果當前狀態,等於預期狀態,則將同步狀態的值設定為預期目標的值,這裡利用了CAS原子操作
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
// 利用CAS原子操作(自旋操作)的超時時間
static final long spinForTimeoutThreshold = 1000L;
// 往同步佇列插入節點,利用CAS原子操作(自旋)
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
今天將AQS(AbstractQueuedSynchronizer)裡面相關的構造方法,內部類,變數,方法等進行了註釋介紹。2018-11-18 23:22