好了,我們來開始今天的內容,首先我們來看下AQS是什麼,全稱是

AbstractQueuedSynchronizer 翻譯過來就是【抽象佇列同步】對吧。通過名字我們也能看出這是個抽象類



而且裡面定義了很多的方法

  裡面這麼多方法,咱們當然不是一個個去翻。裡面還有很多的抽象方法,咱們還得找它的實現多麻煩對不對。所以我們換個方式來探索。

場景模擬

  我們先來看下這樣一個場景

  在這裡我們有一個能被多個執行緒共享操作的資源,在這個場景中應該能看出我們的資料是不安全的,因為我們並不能保證我們的操作是原子操作對吧。基於這個場景我們通過程式碼來看看效果

package com.example.demo;

public class AtomicDemo {

    // 共享變數
private static int count = 0; // 操作共享變數的方法
public static void incr(){
// 為了演示效果 休眠一下子
try {
Thread.sleep(1);
count ++;
} catch (InterruptedException e) {
e.printStackTrace();
}
} public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 1000 ; i++) {
new Thread(()->AtomicDemo.incr()).start();
} Thread.sleep(4000);
System.out.println("result:" + count);
} }

  通過執行發現,執行的結果是一個不確定的值,但總是會小於等於1000,至於原因,是因為incr() 方法不是一個原子操作。為什麼不是原子操作這個咱們今天就不深究此處了.

迎合今天的主題,我們通過Lock來解決

package com.example.demo;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; public class AtomicDemo { // 共享變數
private static int count = 0; private static Lock lock = new ReentrantLock(); // 操作共享變數的方法
public static void incr(){
// 為了演示效果 休眠一下子
try {
lock.lock();
Thread.sleep(1);
count ++;
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
} public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 1000 ; i++) {
new Thread(()->AtomicDemo.incr()).start();
} Thread.sleep(4000);
System.out.println("result:" + count);
} }

  然後我們執行發現結果都是 1000了,這也就是1000個執行緒都去操作這個 count 變數,結果符合我們的預期了。那lock到底是怎麼實現的呢?

需求分析

  我們先來分析分析



這樣的圖片看著比較複雜,咱們簡化下。

  我們自己假設下,如果要你去設計這樣的方法,你應該要怎麼設計,他們需要實現哪些功能,

  首先是lock方法,它是不是要滿足這幾個功能。

需求清楚了,那我們怎麼設計呢?

第一個互斥怎麼做,也就是多個執行緒只有一個執行緒能搶佔到資源,這個時候我們可以這樣設定

// 給一個共享資源
Int state = 0 ; // 0表示資源沒有被佔用,可以搶佔
if(state == 0 ){
// 表示可以獲取鎖
}else{
// 表示鎖被搶佔 需要阻塞等待
}



然後就是沒有搶佔到鎖的執行緒的儲存,我們可以通過一個佇列,利用FIFO來實現儲存。

最後就是執行緒的阻塞和喚醒。大家說說有哪些阻塞執行緒的方式呀?

  1. wait/notify: 不合適,不能喚醒指定的執行緒
  2. Sleep:休眠,類似於定時器
  3. Condition:可以喚醒特定執行緒
  4. LockSupport:

    LockSupport.park():阻塞當前執行緒

    LockSupport.unpark(Thread t):喚醒特定執行緒

結合今天的主題,我們選擇LockSupport來實現阻塞和喚醒。

  好了,到這兒我們已經猜想到了Lock中的實現邏輯,但是在探究原始碼之前我們還有個概念需要先和大家講下,因為這個是我們原始碼中會接觸到的一個,先講了,看的時候就比較輕鬆了對吧。

什麼是重入鎖?

  我們先來看看重入鎖的場景程式碼

package com.example.demo;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; public class AtomicDemo { // 共享變數
private static int count = 0; private static Lock lock = new ReentrantLock(); // 操作共享變數的方法
public static void incr(){
// 為了演示效果 休眠一下子
try {
lock.lock();
Thread.sleep(1);
count ++;
// 呼叫了另外一個方法。
decr();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
} public static void decr(){
try {
// 重入鎖
lock.lock();
count--;
}catch(Exception e){ }finally {
lock.unlock();
}
} public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 1000 ; i++) {
new Thread(()->AtomicDemo.incr()).start();
} Thread.sleep(4000);
System.out.println("result:" + count);
} }

  首先大家考慮這段程式碼會死鎖嗎? 大家給我個回覆,我看看大家的理解的怎麼樣

好了,有說會死鎖的,有說不會,其實這兒是不會死鎖的,而且結果就是0.為什麼呢?

  這個其實是鎖的一個巢狀,因為這兩把鎖都是同一個 執行緒物件,我們講共享變數的設計是

  當state=0;執行緒可以搶佔到資源 state =1; 如果進去巢狀訪問 共享資源,這時 state = 2 如果有多個巢狀 state會一直累加,釋放資源的時候, state--,直到所有重入的鎖都釋放掉 state=0,那麼其他執行緒才能繼續搶佔資源,說白了重入鎖的設計目的就是為了防止 死鎖

AQS類圖

  通過類圖我們可以發現右車的業務應用其實內在都有相識的設計,這裡我們只需要搞清楚其中的一個,其他的你自己應該就可以看懂~,好了我們就具體結合前面的案例程式碼,以ReentrantLock為例來介紹AQS的程式碼實現。

原始碼分析

  在看原始碼之前先回顧下這個圖,帶著問題去看,會更輕鬆

Lock.lock()

final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

這個方法邏輯比較簡單,if條件成立說明 搶佔鎖成功並設定 當前執行緒為獨佔鎖

else 表示搶佔失敗,acquire(1) 方法我們後面具體介紹

compareAndSetState(0, 1):用到了CAS 是一個原子操作方法,底層是UnSafe.作用就是設定 共享操作的 state 由0到1. 如果state的值是0就修改為1

setExclusiveOwnerThread:程式碼很簡單,進去看一眼即可

acquire方法

public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
  1. tryAcquire()嘗試直接去獲取資源,如果成功則直接返回(這裡體現了非公平鎖,每個執行緒獲取鎖時會嘗試直接搶佔加塞一次,而CLH佇列中可能還有別的執行緒在等待);
  2. addWaiter()將該執行緒加入等待佇列的尾部,並標記為獨佔模式;
  3. acquireQueued()使執行緒阻塞在等待佇列中獲取資源,一直獲取到資源後才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。如果執行緒在等待過程中被中斷過,它是不響應的。只是獲取資源後才再進行自我中斷selfInterrupt(),將中斷補上。

  當然這裡程式碼的作用我是提前研究過的,對於大家肯定不是很清楚,我們繼續裡面去看,最後大家可以回到這兒再論證。

tryAcquire(int)

  再次嘗試搶佔鎖

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//再次嘗試搶佔鎖
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 重入鎖的情況
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// false 表示搶佔失敗
return false;
}

addWaiter

  將阻塞的執行緒新增到雙向連結串列的結尾

private Node addWaiter(Node mode) {
//以給定模式構造結點。mode有兩種:EXCLUSIVE(獨佔)和SHARED(共享)
Node node = new Node(Thread.currentThread(), mode); //嘗試快速方式直接放到隊尾。
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
} //上一步失敗則通過enq入隊。
enq(node);
return node;
}

enq(Node)

private Node enq(final Node node) {
//CAS"自旋",直到成功加入隊尾
for (;;) {
Node t = tail;
if (t == null) { // 佇列為空,建立一個空的標誌結點作為head結點,並將tail也指向它。
if (compareAndSetHead(new Node()))
tail = head;
} else {//正常流程,放入隊尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

第一個if語句

else語句

執行緒3進來會執行如下程式碼

那麼效果圖

acquireQueued(Node, int)

  OK,通過tryAcquire()和addWaiter(),該執行緒獲取資源失敗,已經被放入等待佇列尾部了。聰明的你立刻應該能想到該執行緒下一部該幹什麼了吧:進入等待狀態休息,直到其他執行緒徹底釋放資源後喚醒自己,自己再拿到資源,然後就可以去幹自己想幹的事了。沒錯,就是這樣!是不是跟醫院排隊拿號有點相似~~acquireQueued()就是幹這件事:在等待佇列中排隊拿號(中間沒其它事幹可以休息),直到拿到號後再返回。這個函式非常關鍵,還是上原始碼吧:

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;//標記是否成功拿到資源
try {
boolean interrupted = false;//標記等待過程中是否被中斷過 //又是一個“自旋”!
for (;;) {
final Node p = node.predecessor();//拿到前驅
//如果前驅是head,即該結點已成老二,那麼便有資格去嘗試獲取資源(可能是老大釋放完資源喚醒自己的,當然也可能被interrupt了)。
if (p == head && tryAcquire(arg)) {
setHead(node);//拿到資源後,將head指向該結點。所以head所指的標杆結點,就是當前獲取到資源的那個結點或null。
p.next = null; // setHead中node.prev已置為null,此處再將head.next置為null,就是為了方便GC回收以前的head結點。也就意味著之前拿完資源的結點出隊了!
failed = false; // 成功獲取資源
return interrupted;//返回等待過程中是否被中斷過
} //如果自己可以休息了,就通過park()進入waiting狀態,直到被unpark()。如果不可中斷的情況下被中斷了,那麼會從park()中醒過來,發現拿不到資源,從而繼續進入park()等待。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;//如果等待過程中被中斷過,哪怕只有那麼一次,就將interrupted標記為true
}
} finally {
if (failed) // 如果等待過程中沒有成功獲取資源(如timeout,或者可中斷的情況下被中斷了),那麼取消結點在佇列中的等待。
cancelAcquire(node);
}
}

  到這裡了,我們先不急著總結acquireQueued()的函式流程,先看看shouldParkAfterFailedAcquire()和parkAndCheckInterrupt()具體幹些什麼。

shouldParkAfterFailedAcquire(Node, Node)

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//拿到前驅的狀態
if (ws == Node.SIGNAL)
//如果已經告訴前驅拿完號後通知自己一下,那就可以安心休息了
return true;
if (ws > 0) {
/*
* 如果前驅放棄了,那就一直往前找,直到找到最近一個正常等待的狀態,並排在它的後邊。
* 注意:那些放棄的結點,由於被自己“加塞”到它們前邊,它們相當於形成一個無引用鏈,稍後就會被保安大叔趕走了(GC回收)!
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//如果前驅正常,那就把前驅的狀態設定成SIGNAL,告訴它拿完號後通知自己一下。有可能失敗,人家說不定剛剛釋放完呢!
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

  整個流程中,如果前驅結點的狀態不是SIGNAL,那麼自己就不能安心去休息,需要去找個安心的休息點,同時可以再嘗試下看有沒有機會輪到自己拿號。

parkAndCheckInterrupt()

  如果執行緒找好安全休息點後,那就可以安心去休息了。此方法就是讓執行緒去休息,真正進入等待狀態。

 private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//呼叫park()使執行緒進入waiting狀態
return Thread.interrupted();//如果被喚醒,檢視自己是不是被中斷的。
}

好了,我們可以小結下了。

看了shouldParkAfterFailedAcquire()和parkAndCheckInterrupt(),現在讓我們再回到acquireQueued(),總結下該函式的具體流程:

  1. 結點進入隊尾後,檢查狀態,找到安全休息點;
  2. 呼叫park()進入waiting狀態,等待unpark()或interrupt()喚醒自己;
  3. 被喚醒後,看自己是不是有資格能拿到號。如果拿到,head指向當前結點,並返回從入隊到拿到號的整個過程中是否被中斷過;如果沒拿到,繼續流程1。

最後我們再回到前面的acquire方法來總結下

public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

總結下它的流程吧

  1. 呼叫自定義同步器的tryAcquire()嘗試直接去獲取資源,如果成功則直接返回;
  2. 沒成功,則addWaiter()將該執行緒加入等待佇列的尾部,並標記為獨佔模式;
  3. acquireQueued()使執行緒在等待佇列中休息,有機會時(輪到自己,會被unpark())會去嘗試獲取資源。獲取到資源後才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。
  4. 如果執行緒在等待過程中被中斷過,它是不響應的。只是獲取資源後才再進行自我中斷selfInterrupt(),將中斷補上。

Lock.unlock()

  好了,lock方法看完後,我們再來看下unlock方法

release(int)

  它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待佇列裡的其他執行緒來獲取資源。這也正是unlock()的語義,當然不僅僅只限於unlock()

public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;//找到頭結點
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//喚醒等待佇列裡的下一個執行緒
return true;
}
return false;
}

tryRelease(int)

  此方法嘗試去釋放指定量的資源。下面是tryRelease()的原始碼:

 public final boolean release(int arg) {
if (tryRelease(arg)) {//這裡是先嚐試釋放一下資源,一般都可以釋放成功,除了多次重入但只釋放一次的情況。
Node h = head;
//這裡判斷的是 阻塞佇列是否還存在和head節點是否是tail節點,因為之前說過,佇列的尾節點的waitStatus是為0的
if (h != null && h.waitStatus != 0)
//到這裡就說明head節點已經釋放成功啦,就先去叫醒後面的直接節點去搶資源吧
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
//這裡,node一般為當前執行緒所在的結點。
int ws = node.waitStatus;
if (ws < 0)//置零當前執行緒所在的結點狀態,允許失敗。
compareAndSetWaitStatus(node, ws, 0); Node s = node.next;//找到下一個需要喚醒的結點s
if (s == null || s.waitStatus > 0) {//如果為空或已取消
s = null;
for (Node t = tail; t != null && t != node; t = t.prev) // 從後向前找。
if (t.waitStatus <= 0)//從這裡可以看出,<=0的結點,都是還有效的結點。
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//喚醒
}

  這個函式並不複雜。一句話概括:用unpark()喚醒等待佇列中最前邊的那個未放棄執行緒,這裡我們也用s來表示吧。此時,再和acquireQueued()聯絡起來,s被喚醒後,進入if (p == head && tryAcquire(arg))的判斷(即使p!=head也沒關係,它會再進入shouldParkAfterFailedAcquire()尋找一個安全點。這裡既然s已經是等待佇列中最前邊的那個未放棄執行緒了,那麼通過shouldParkAfterFailedAcquire()的調整,s也必然會跑到head的next結點,下一次自旋p==head就成立啦),然後s把自己設定成head標杆結點,表示自己已經獲取到資源了,acquire()也返回了

  好了,到這我們就因為把原始碼看完了,再回頭來看下這張圖



  是不是就清楚了AQS到底是怎麼實現的我們上面的猜想的了吧。那麼對應的下課後讓你自己去看

歡迎一起交流學習哦【463257262】