Java併發包原始碼學習之AQS框架(四)AbstractQueuedSynchronizer原始碼分析
經過前面幾篇文章的鋪墊,今天我們終於要看看AQS
的廬山真面目了,建議第一次看AbstractQueuedSynchronizer
類原始碼的朋友可以先看下我前面幾篇文章:
分析原始碼是非常枯燥乏味的一件事,其實程式碼本身其實就是最好的說明了,因此基本都是貼出一些程式碼加上一些註釋, 因為AbstractQueuedSynchronizer
上千行程式碼不可能也不需要每行都要分析,所以只撿一些關鍵的地方或 比較難理解的地方做說明,有一些地方可能我理解的有出入,歡迎大家指正。 詳細的註釋我都放在了GitHub上。
前面提到AQS
是基於CLH lock queue的,AbstractQueuedSynchronizer
Node
實現了一個變種。 前面基本說明了Node的主要內容,但這個類還有一些其他重要的欄位:
//標記當前結點是共享模式 static final Node SHARED = new Node(); //標記當前結點是獨佔模式 static final Node EXCLUSIVE = null; //結點的等待狀態。 volatile int waitStatus; //擁有當前結點的執行緒。 volatile Thread thread;
其中waitStatus
很重要,用來控制執行緒的阻塞/喚醒,以及避免不必要的呼叫LockSupport的park/unpark方法。 它主要有以下幾個取值:
//代表執行緒已經被取消 static final int CANCELLED = 1; //代表後續節點需要喚醒 static final int SIGNAL = -1; //代表執行緒在condition queue中,等待某一條件 static final int CONDITION = -2; //代表後續結點會傳播喚醒的操作,共享模式下起作用 static final int PROPAGATE = -3;
出隊操作
只要設定新的head
結點就可以了。
private void setHead(Node node) { head = node; node.thread= null; node.prev = null; }
入隊操作
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; // 這個if分支其實是一種優化:CAS操作失敗的話才進入enq中的迴圈。 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
獨佔模式獲取
public final void acquire(int arg) { // tryAcquire 由子類實現本身不會阻塞執行緒,如果返回 true,則執行緒繼續, // 如果返回 false 那麼就 加入阻塞佇列阻塞執行緒,並等待前繼結點釋放鎖。 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // acquireQueued返回true,說明當前執行緒被中斷喚醒後獲取到鎖, // 重置其interrupt status為true。 selfInterrupt(); }
一旦tryAcquire成功則立即返回,否則執行緒會加入佇列 執行緒可能會反覆的被阻塞和喚醒直到tryAcquire成功,這是因為執行緒可能被中斷, 而acquireQueued方法中會保證忽視中斷,只有tryAcquire成功了才返回。中斷版本的獨佔獲取是acquireInterruptibly
這個方法, doAcquireInterruptibly
這個方法中如果執行緒被中斷則acquireInterruptibly
會丟擲InterruptedException
異常。
addWaiter
方法只是入隊操作,acquireQueued
方法是主要邏輯,需要重點理解。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 等待前繼結點釋放鎖 // 自旋re-check for (;;) { // 獲取前繼 final Node p = node.predecessor(); // 前繼是head,說明next就是node了,則嘗試獲取鎖。 if (p == head && tryAcquire(arg)) { // 前繼出隊,node成為head setHead(node); p.next = null; // help GC failed = false; return interrupted; } // p != head 或者 p == head但是tryAcquire失敗了,那麼 // 應該阻塞當前執行緒等待前繼喚醒。阻塞之前會再重試一次,還需要設定前繼的waitStaus為SIGNAL。 // 執行緒會阻塞在parkAndCheckInterrupt方法中。 // parkAndCheckInterrupt返回可能是前繼unpark或執行緒被中斷。 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 說明當前執行緒是被中斷喚醒的。 // 注意:執行緒被中斷之後會繼續走到if處去判斷,也就是會忽視中斷。 // 除非碰巧執行緒中斷後acquire成功了,那麼根據Java的最佳實踐, // 需要重新設定執行緒的中斷狀態(acquire.selfInterrupt)。 interrupted = true; } } finally { // 出現異常 if (failed) cancelAcquire(node); } }
基本每行都有註釋,但得結合shouldParkAfterFailedAcquire
和parkAndCheckInterrupt
這兩個方法來一起理解會更 容易些。shouldParkAfterFailedAcquire方法的作用是:
- 確定後繼是否需要park;
- 跳過被取消的結點;
- 設定前繼的waitStatus為SIGNAL.
int ws = pred.waitStatus; if (ws == Node.SIGNAL)// 前繼結點已經準備好unpark其後繼了,所以後繼可以安全的park /* * This node has already set status asking a release to signal it, * so it can safely park. */ return true; if (ws > 0) {// CANCELLED // 跳過被取消的結點。 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else {// 0 或 PROPAGATE (CONDITION用在ConditonObject,這裡不會是這個值) /** * waitStatus 等於0(初始化)或PROPAGATE。說明執行緒還沒有park,會先重試 * 確定無法acquire到再park。 */ // 更新pred結點waitStatus為SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false;
parkAndCheckInterrupt就是用LockSupport來阻塞當前執行緒,很簡單:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
執行緒被喚醒只可能是:被unpark
,被中斷或偽喚醒。被中斷會設定interrupted
,acquire方法返回前會 selfInterrupt
重置下執行緒的中斷狀態,如果是偽喚醒的話會for迴圈re-check。
獨佔模式釋放
比較簡單隻要直接喚醒後續結點就可以了,後續結點會從parkAndCheckInterrupt
方法中返回。
public final boolean release(int arg) { // tryReease由子類實現,通過設定state值來達到同步的效果。 if (tryRelease(arg)) { Node h = head; // waitStatus為0說明是初始化的空佇列 if (h != null && h.waitStatus != 0) // 喚醒後續的結點 unparkSuccessor(h); return true; } return false; }
共享模式獲取
acquireShared
方法是用來共享模式獲取。
public final void acquireShared(int arg) { //如果沒有許可了則入隊等待 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } private void doAcquireShared(int arg) { // 新增佇列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; // 等待前繼釋放並傳遞 for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg);// 嘗試獲取 if (r >= 0) { // 獲取成功則前繼出隊,跟獨佔不同的是 // 會往後面結點傳播喚醒的操作,保證剩下等待的執行緒能夠儘快 獲取到剩下的許可。 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // p != head || r < 0 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
核心是這個doAcquireShared
方法,跟獨佔模式的acquireQueued
很像,主要區別在setHeadAndPropagate
方法中, 這個方法會將node設定為head。如果當前結點acquire到了之後發現還有許可可以被獲取,則繼續釋放自己的後繼, 後繼會將這個操作傳遞下去。這就是PROPAGATE
狀態的含義。
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); /* * 嘗試喚醒後繼的結點:<br /> * propagate > 0說明許可還有能夠繼續被執行緒acquire;<br /> * 或者 之前的head被設定為PROPAGATE(PROPAGATE可以被轉換為SIGNAL)說明需要往後傳遞;<br /> * 或者為null,我們還不確定什麼情況。 <br /> * 並且 後繼結點是共享模式或者為如上為null。 * <p> * 上面的檢查有點保守,在有多個執行緒競爭獲取/釋放的時候可能會導致不必要的喚醒。<br /> * */ if (propagate > 0 || h == null || h.waitStatus < 0) { Node s = node.next; // 後繼結是共享模式或者s == null(不知道什麼情況) // 如果後繼是獨佔模式,那麼即使剩下的許可大於0也不會繼續往後傳遞喚醒操作 // 即使後面有結點是共享模式。 if (s == null || s.isShared()) // 喚醒後繼結點 doReleaseShared(); } } private void doReleaseShared() { for (;;) { Node h = head; // 佇列不為空且有後繼結點 if (h != null && h != tail) { int ws = h.waitStatus; // 不管是共享還是獨佔只有結點狀態為SIGNAL才嘗試喚醒後繼結點 if (ws == Node.SIGNAL) { // 將waitStatus設定為0 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h);// 喚醒後繼結點 // 如果狀態為0則更新狀態為PROPAGATE,更新失敗則重試 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 如果過程中head被修改了則重試。 if (h == head) // loop if head changed break; } }
共享模式釋放
主要邏輯也就會doReleaseShared
。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
獨佔和共享模式除了對應的中斷版本,還有超時版本,整體程式碼相差不大,具體再贅述了。提前前面文章 中提到的自旋
,好像目前整個AQS中都沒用到這個功能,accquire中for迴圈主要作用不是為了自旋,那麼 它用在什麼地方呢?AQS中有一個變數:
static final long spinForTimeoutThreshold = 1000L;
這個變數用在doAcquireNanos
方法,也就是支援超時的獲取版本。
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { long lastTime = System.nanoTime(); final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } if (nanosTimeout <= 0)// 超時 return false; // nanosTimeout > spinForTimeoutThreshold // 如果超時時間很短的話,自旋效率會更高。 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); long now = System.nanoTime(); nanosTimeout -= now - lastTime; lastTime = now; if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
AQS的的主要內容其實差不多看完了,但是上面的邏輯中waitStatus中有一個狀態還沒涉及到那就是CONDITION
, 下一篇部落格《Java併發包原始碼學習之AQS框架(五)ConditionObject原始碼分析》中會介紹它。
相關推薦
Java併發包原始碼學習之AQS框架(四)AbstractQueuedSynchronizer原始碼分析
經過前面幾篇文章的鋪墊,今天我們終於要看看AQS的廬山真面目了,建議第一次看AbstractQueuedSynchronizer 類原始碼的朋友可以先看下我前面幾篇文章: 分析原始碼是非常枯燥乏味的一件事,其實程式碼本身其實就是最好的說明了,因此基本都是貼出一些程式碼加上一些註釋, 因為Abstract
Java併發包原始碼學習之AQS框架(二)CLH lock queue和自旋鎖
上一篇文章提到AQS是基於CLH lock queue,那麼什麼是CLH lock queue,說複雜很複雜說簡單也簡單, 所謂大道至簡: CLH lock queue其實就是一個FIFO的佇列,佇列中的每個結點(執行緒)只要等待其前繼釋放鎖就可以了。 AbstractQueuedSynchronizer
Java併發包原始碼學習之AQS框架(一)概述
AQS其實就是java.util.concurrent.locks.AbstractQueuedSynchronizer這個類。 閱讀Java的併發包原始碼你會發現這個類是整個java.util.concurrent的核心之一,也可以說是閱讀整個併發包原始碼的一個突破口。 比如讀ReentrantLock的
JavaWeb學習之Hibernate框架(二)
utils xtend auto etl SQ dial begin 可選 oct hibernateAPI詳解 Configuration 創建 加載主配置 創建sessionFactory
Linux學習之程序通訊(四)
言之者無罪,聞之者足以戒。 ——《詩序》 IPC通訊 IPC通訊有三種:共享記憶體、訊息佇列、訊號燈 這個IPC物件,是存在於核心中的。而且使用者空間的檔案系統中沒有IPC檔案型別 IPC物件 IPC和檔案IO函式的比較: 檔案I/O
從零學習遊戲伺服器開發(四)LogServer原始碼探究
這是從零學習開源專案的第四篇,上一篇是《從零學習開源專案系列(三) CSBattleMgr服務原始碼研究》,這篇文章我們一起來學習LogServer,中文意思可能是“日誌伺服器”。那麼這個日誌伺服器到底做了哪些工作呢?我們在Visual Studio中將LogServer設定
Spring Boot學習之旅:(四)springboot 整合 fastjson
springboot 預設使用的 jackson 但是聽說某寶的fastjson 效能很好,而且平時用的習慣,所以來整合一下。 首先在pom 中匯入依賴 <dependency> <groupId>
Flask Web開發學習之爬坑(四)
關於第6章 電子郵件這一章把我坑了好久好久,真的是印象深刻。文章中的示例是通過 goolemail 傳送郵件,我把它換成了常用的qq郵箱。本節內容有點多,不過很多都是有用的。示例6-1需要修改app.config['MAIL_SERVER'] = 'smtp.qq.com'
Dubbo原始碼學習--RoundRobinLoadBalance負載均衡(四)
RoundRobin LoadBalance輪循,按公約後的權重設定輪循比率。存在慢的提供者累積請求的問題,比如:第二臺機器很慢,但沒掛,當請求調到第二臺時就卡在那,久而久之,所有請求都卡在調到第二臺上。1)獲取輪詢key 服務名+方法名獲取可供呼叫的invokers個數l
系統學習機器學習之神經網路(四) --SOM
轉自:http://blog.csdn.net/xbinworld/article/details/50818803,其實內容更多的是百度文庫裡叫《SOM自組織特徵對映神經網路》這篇文章上的,博主增加了一些理解。 本文詳細介紹一下自組織神經網路概念和原理,並重點介紹一下自組
Android之Fresco框架(四)--ImagePipeline的呼叫和使用
之前大致把ImagePipeline的配置和底層實現都講了一下,這一篇來重點講一下我們在傳送圖片請求的時候是怎麼把請求傳給ImagePipeline的,以及我們如何自己直接對ImagePipeline例項進行請求,記憶體管理等操作。SimpleDraweeView中Image
QT學習 之 對話方塊 (四) 字型對話方塊、訊息對話方塊、檔案對話方塊、程序對話方塊
QMessageBox類: 含有Question訊息框、Information訊息框、Warning訊息框和Critical訊息框等 通常有兩種方式可以來建立標準訊息對話方塊: 一種是採用“基於屬性”的API,一種是使用QMessageBox的靜態方法。 後者書寫容易,但缺
Golang原始碼學習:排程邏輯(四)系統呼叫
## Linux系統呼叫 概念:系統呼叫為使用者態程序提供了硬體的抽象介面。並且是使用者空間訪問核心的唯一手段,除異常和陷入外,它們是核心唯一的合法入口。保證系統的安全和穩定。 呼叫號:在Linux中,每個系統呼叫被賦予一個獨一無二的系統呼叫號。當用戶空間的程序執行一個系統呼叫時,會使用呼叫號指明系統呼叫
Shiro許可權管理框架(四):深入分析Shiro中的Session管理
其實關於Shiro的一些學習筆記很早就該寫了,因為懶癌和拖延症晚期一直沒有落實,直到今天公司的一個專案碰到了在叢集環境的單點登入頻繁掉線的問題,為了解決這個問題,Shiro相關的文件和教程沒少翻。最後問題解決了,但我覺得我也是時候來做一波Shiro學習筆記了。 本篇是Shiro系列第四篇,Shiro中的過濾器
Java併發包內容學習
1.Vector 和ArrayList區別。 vector是執行緒安全的,ArrayList不安全,但是ArrayList速度快。 2.hashtable 和hashmap 區別 hashtable是執行緒安全的,hashmap不安全,但是hashmap速度快。 3.co
Java併發系列(4)AbstractQueuedSynchronizer原始碼分析之條件佇列
通過前面三篇的分析,我們深入瞭解了AbstractQueuedSynchronizer的內部結構和一些設計理念,知道了AbstractQueuedSynchronizer內部維護了一個同步狀態和兩個排隊區,這兩個排隊區分別是同步佇列和條件佇列。我們還是拿公共廁所做比喻,同步佇
jdk源碼閱讀筆記之java集合框架(四)(LinkedList)
ray private array public 源碼閱讀 jdk源碼閱讀 oid color 解釋 關於LinkedList的分析,會從且僅從其添加(add)方法入手。 因為上一篇已經分析過ArrayList,相似的地方就不再敘述,關註點在LinkedList的特點。 屬
28 Java學習之NIO Buffer(二)(待補充)
客戶 oca opened output write 系統方面 eba 了解 取出 一. Buffer介紹 Buffer,故名思意,緩沖區,實際上是一個容器,是一個連續數組。Channel提供從文件、網絡讀取數據的渠道,但是讀取或寫入的數據都必須經由Buffer。具體看下面
29 Java學習之NIO Selector(三)
Selector(選擇器)是Java NIO中能夠檢測一到多個NIO通道,並能夠知曉通道是否為諸如讀寫事件做好準備的元件。這樣,一個單獨的執行緒可以管理多個channel,從而管理多個網路連線。 一. 為什麼使用Selector? 僅用單個執行緒來處理多個Channels的好處是,只需要更少的執行緒來處理
28 Java學習之NIO Buffer(二)(待補充)
一. Buffer介紹 Buffer,故名思意,緩衝區,實際上是一個容器,是一個連續陣列。Channel提供從檔案、網路讀取資料的渠道,但是讀取或寫入的資料都必須經由Buffer。具體看下面這張圖就理解了: 上面的圖描述了從一個客戶端向服務端傳送資料,然後服務端接收資料的過程。客戶端傳送資料時,必