一篇文章搞定——JDK8中新增的StampedLock
一、StampedLock類簡介
StampedLock類,在JDK1.8時引入,是對讀寫鎖ReentrantReadWriteLock的增強,該類提供了一些功能,優化了讀鎖、寫鎖的訪問,同時使讀寫鎖之間可以互相轉換,更細粒度控制併發。
首先明確下,該類的設計初衷是作為一個內部工具類,用於輔助開發其它執行緒安全元件,用得好,該類可以提升系統性能,用不好,容易產生死鎖和其它莫名其妙的問題。
1.1 StampedLock的引入
上一篇文章,講解了讀寫鎖——ReentrantReadWriteLock原理詳解 ,那麼為什麼有了ReentrantReadWriteLock,還要引入StampedLock?
ReentrantReadWriteLock使得多個讀執行緒同時持有讀鎖(只要寫鎖未被佔用),而寫鎖是獨佔的。
但是,讀寫鎖如果使用不當,很容易產生“飢餓”問題:
比如在讀執行緒非常多,寫執行緒很少的情況下,很容易導致寫執行緒“飢餓”,雖然使用“公平”策略可以一定程度上緩解這個問題,但是“公平”策略是以犧牲系統吞吐量為代價的。
1.2 StampedLock的特點
try系列獲取鎖的函式,當獲取鎖失敗後會返回為0的stamp值。當呼叫釋放鎖和轉換鎖的方法時候需要傳入獲取鎖時候返回的stamp值。
StampedLockd的內部實現是基於CLH鎖的,CLH鎖原理:鎖維護著一個等待執行緒佇列,所有申請鎖且失敗的執行緒都記錄在佇列。一個節點代表一個執行緒,
儲存著一個標記位locked,用以判斷當前執行緒是否已經釋放鎖。當一個執行緒試圖獲取鎖時,從佇列尾節點作為前序節點,迴圈判斷所有的前序節點是否已經成功釋放鎖。
二、StampedLock使用示例
先來看一個Oracle官方的例子:
class Point { private double x, y; private final StampedLock sl = new StampedLock(); void move(double deltaX, double deltaY) { long stamp = sl.writeLock(); //涉及對共享資源的修改,使用寫鎖-獨佔操作 try { x += deltaX; y += deltaY; } finally { sl.unlockWrite(stamp); } } /** * 使用樂觀讀鎖訪問共享資源 * 注意:樂觀讀鎖在保證資料一致性上需要拷貝一份要操作的變數到方法棧,並且在操作資料時候可能其他寫執行緒已經修改了資料, * 而我們操作的是方法棧裡面的資料,也就是一個快照,所以最多返回的不是最新的資料,但是一致性還是得到保障的。 * * @return */ double distanceFromOrigin() { long stamp = sl.tryOptimisticRead(); // 使用樂觀讀鎖 double currentX = x, currentY = y; // 拷貝共享資源到本地方法棧中 if (!sl.validate(stamp)) { // 如果有寫鎖被佔用,可能造成資料不一致,所以要切換到普通讀鎖模式 stamp = sl.readLock(); try { currentX = x; currentY = y; } finally { sl.unlockRead(stamp); } } return Math.sqrt(currentX * currentX + currentY * currentY); } void moveIfAtOrigin(double newX, double newY) { // upgrade // Could instead start with optimistic, not read mode long stamp = sl.readLock(); try { while (x == 0.0 && y == 0.0) { long ws = sl.tryConvertToWriteLock(stamp); //讀鎖轉換為寫鎖 if (ws != 0L) { stamp = ws; x = newX; y = newY; break; } else { sl.unlockRead(stamp); stamp = sl.writeLock(); } } } finally { sl.unlock(stamp); } } }
可以看到,上述示例最特殊的其實是distanceFromOrigin方法,這個方法中使用了“Optimistic reading”樂觀讀鎖,使得讀寫可以併發執行,但是“Optimistic reading”的使用必須遵循以下模式:
long stamp = lock.tryOptimisticRead(); // 非阻塞獲取版本資訊
copyVaraibale2ThreadMemory(); // 拷貝變數到執行緒本地堆疊
if(!lock.validate(stamp)){ // 校驗
long stamp = lock.readLock(); // 獲取讀鎖
try {
copyVaraibale2ThreadMemory(); // 拷貝變數到執行緒本地堆疊
} finally {
lock.unlock(stamp); // 釋放悲觀鎖
}
}
useThreadMemoryVarables(); // 使用執行緒本地堆疊裡面的資料進行操作
三、StampedLock原理
3.1 StampedLock的內部常量
StampedLock雖然不像其它鎖一樣定義了內部類來實現AQS框架,但是StampedLock的基本實現思路還是利用CLH佇列進行執行緒的管理,通過同步狀態值來表示鎖的狀態和型別。
StampedLock內部定義了很多常量,定義這些常量的根本目的還是和ReentrantReadWriteLock一樣,對同步狀態值按位切分,以通過位運算對State進行操作:
對於StampedLock來說,寫鎖被佔用的標誌是第8位為1,讀鎖使用0-7位,正常情況下讀鎖數目為1-126,超過126時,使用一個名為
readerOverflow
的int整型儲存超出數。
部分常量的位元位表示如下:
另外,StampedLock相比ReentrantReadWriteLock,對多核CPU進行了優化,可以看到,當CPU核數超過1時,會有一些自旋操作:
3.2 示例分析
假設現在有多個執行緒:ThreadA、ThreadB、ThreadC、ThreadD、ThreadE。操作如下:
ThreadA呼叫writeLock————獲取寫鎖
ThreadB呼叫readLock————獲取讀鎖
ThreadC呼叫readLock————獲取讀鎖
ThreadD呼叫writeLock————獲取寫鎖
ThreadE呼叫readLock————獲取讀鎖
1. StampedLock物件的建立
StampedLock的構造器很簡單,構造時設定下同步狀態值:
/** * Creates a new lock, initially in unlocked state. */ public StampedLock() { state = ORIGIN; }
另外,StamedLock提供了三類檢視:
// views transient ReadLockView readLockView; transient WriteLockView writeLockView; transient ReadWriteLockView readWriteLockView;
這些檢視其實是對StamedLock方法的封裝,便於習慣了ReentrantReadWriteLock的使用者使用:
例如,ReadLockView其實相當於ReentrantReadWriteLock.readLock()
返回的讀鎖;
final class ReadLockView implements Lock { public void lock() { readLock(); } public void lockInterruptibly() throws InterruptedException { readLockInterruptibly(); } public boolean tryLock() { return tryReadLock() != 0L; } public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return tryReadLock(time, unit) != 0L; } public void unlock() { unstampedUnlockRead(); } public Condition newCondition() { throw new UnsupportedOperationException(); } }
2. ThreadA呼叫writeLock獲取寫鎖
來看下writeLock方法:
public long writeLock() { long s, next; // bypass acquireWrite in fully unlocked case only return ((((s = state) & ABITS) == 0L &&//(s=state)&ABITS==0L表示讀鎖和寫鎖都未被使用 U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?//CAS操作:將第8位置為1,表示寫鎖被佔用 next : acquireWrite(false, 0L));//獲取失敗則呼叫acquireWrite,加入到等待佇列 }
說明:上述程式碼獲取寫鎖,如果獲取失敗,則進入阻塞,注意該方法不響應中斷,返回非0表示獲取成功。
StampedLock中大量運用了位運算,這裡(s = state) & ABITS == 0L
表示讀鎖和寫鎖都未被使用,這裡寫鎖可以立即獲取成功,然後CAS操作更新同步狀態值State。
操作完成後,等待佇列的結構如下:
注意:StampedLock中,等待佇列的結點要比AQS中簡單些,僅僅三種狀態。
0:初始狀態
-1:等待中
1:取消
另外,結點的定義中有個cowait
欄位,該欄位指向一個棧,用於儲存讀執行緒,這個後續會講到。
3. ThreadB呼叫readLock獲取讀鎖
來看下readLock方法:
由於ThreadA此時持有寫鎖,所以ThreadB獲取讀鎖失敗,將呼叫acquireRead方法,加入等待佇列:
public long readLock() { long s = state, next; // bypass acquireRead on common uncontended case return ((whead == wtail && (s & ABITS) < RFULL &&//表示寫鎖未被佔用,且讀鎖數量沒用超限 U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ? next : acquireRead(false, 0L)); }
說明:上述程式碼獲取讀鎖,如果寫鎖被佔用,執行緒會阻塞,注意該方法不響應中斷,返回非0表示獲取成功。
acquireRead方法非常複雜,用到了大量自旋操作:
/**
* 嘗試自旋的獲取讀鎖, 獲取不到則加入等待佇列, 並阻塞執行緒
*
* @param interruptible true 表示檢測中斷, 如果執行緒被中斷過, 則最終返回INTERRUPTED
* @param deadline 如果非0, 則表示限時獲取
* @return 非0表示獲取成功, INTERRUPTED表示中途被中斷過
*/
private long acquireRead(boolean interruptible, long deadline) {
WNode node = null, p; // node指向入隊結點, p指向入隊前的隊尾結點
/**
* 自旋入隊操作
* 如果寫鎖未被佔用, 則立即嘗試獲取讀鎖, 獲取成功則返回.
* 如果寫鎖被佔用, 則將當前讀執行緒包裝成結點, 並插入等待佇列(如果隊尾是寫結點,直接連結到隊尾;否則,連結到隊尾讀結點的棧中)
*/
for (int spins = -1; ; ) {
WNode h;
if ((h = whead) == (p = wtail)) { // 如果佇列為空或只有頭結點, 則會立即嘗試獲取讀鎖
for (long m, s, ns; ; ) {
if ((m = (s = state) & ABITS) < RFULL ? // 判斷寫鎖是否被佔用
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : //寫鎖未佔用,且讀鎖數量未超限, 則更新同步狀態
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) //寫鎖未佔用,但讀鎖數量超限, 超出部分放到readerOverflow欄位中
return ns; // 獲取成功後, 直接返回
else if (m >= WBIT) { // 寫鎖被佔用,以隨機方式探測是否要退出自旋
if (spins > 0) {
if (LockSupport.nextSecondarySeed() >= 0)
--spins;
} else {
if (spins == 0) {
WNode nh = whead, np = wtail;
if ((nh == h && np == p) || (h = nh) != (p = np))
break;
}
spins = SPINS;
}
}
}
}
if (p == null) { // p == null表示佇列為空, 則初始化佇列(構造頭結點)
WNode hd = new WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = hd;
} else if (node == null) { // 將當前執行緒包裝成讀結點
node = new WNode(RMODE, p);
} else if (h == p || p.mode != RMODE) { // 如果佇列只有一個頭結點, 或隊尾結點不是讀結點, 則直接將結點連結到隊尾, 連結完成後退出自旋
if (node.prev != p)
node.prev = p;
else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
p.next = node;
break;
}
}
// 佇列不為空, 且隊尾是讀結點, 則將添加當前結點連結到隊尾結點的cowait鏈中(實際上構成一個棧, p是棧頂指標 )
else if (!U.compareAndSwapObject(p, WCOWAIT, node.cowait = p.cowait, node)) { // CAS操作隊尾結點p的cowait欄位,實際上就是頭插法插入結點
node.cowait = null;
} else {
for (; ; ) {
WNode pp, c;
Thread w;
// 嘗試喚醒頭結點的cowait中的第一個元素, 假如是讀鎖會通過迴圈釋放cowait鏈
if ((h = whead) != null && (c = h.cowait) != null &&
U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null) // help release
U.unpark(w);
if (h == (pp = p.prev) || h == p || pp == null) {
long m, s, ns;
do {
if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s,
ns = s + RUNIT) :
(m < WBIT &&
(ns = tryIncReaderOverflow(s)) != 0L))
return ns;
} while (m < WBIT);
}
if (whead == h && p.prev == pp) {
long time;
if (pp == null || h == p || p.status > 0) {
node = null; // throw away
break;
}
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, p, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if ((h != pp || (state & ABITS) == WBIT) && whead == h && p.prev == pp) {
// 寫鎖被佔用, 且當前結點不是隊首結點, 則阻塞當前執行緒
U.park(false, time);
}
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, p, true);
}
}
}
}
for (int spins = -1; ; ) {
WNode h, np, pp;
int ps;
if ((h = whead) == p) { // 如果當前執行緒是隊首結點, 則嘗試獲取讀鎖
if (spins < 0)
spins = HEAD_SPINS;
else if (spins < MAX_HEAD_SPINS)
spins <<= 1;
for (int k = spins; ; ) { // spin at head
long m, s, ns;
if ((m = (s = state) & ABITS) < RFULL ? // 判斷寫鎖是否被佔用
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : //寫鎖未佔用,且讀鎖數量未超限, 則更新同步狀態
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) { //寫鎖未佔用,但讀鎖數量超限, 超出部分放到readerOverflow欄位中
// 獲取讀鎖成功, 釋放cowait鏈中的所有讀結點
WNode c;
Thread w;
// 釋放頭結點, 當前隊首結點成為新的頭結點
whead = node;
node.prev = null;
// 從棧頂開始(node.cowait指向的結點), 依次喚醒所有讀結點, 最終node.cowait==null, node成為新的頭結點
while ((c = node.cowait) != null) {
if (U.compareAndSwapObject(node, WCOWAIT, c, c.cowait) && (w = c.thread) != null)
U.unpark(w);
}
return ns;
} else if (m >= WBIT &&
LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
break;
}
} else if (h != null) { // 如果頭結點存在cowait鏈, 則喚醒鏈中所有讀執行緒
WNode c;
Thread w;
while ((c = h.cowait) != null) {
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
}
if (whead == h) {
if ((np = node.prev) != p) {
if (np != null)
(p = np).next = node; // stale
} else if ((ps = p.status) == 0) // 將前驅結點的等待狀態置為WAITING, 表示之後將喚醒當前結點
U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
else if (ps == CANCELLED) {
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
} else { // 阻塞當前讀執行緒
long time;
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L) //限時等待超時, 取消等待
return cancelWaiter(node, node, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if (p.status < 0 && (p != h || (state & ABITS) == WBIT) && whead == h && node.prev == p) {
// 如果前驅的等待狀態為WAITING, 且寫鎖被佔用, 則阻塞當前呼叫執行緒
U.park(false, time);
}
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, node, true);
}
}
}
}
我們來分析下這個方法。
該方法會首先自旋的嘗試獲取讀鎖,獲取成功後,就直接返回;否則,會將當前執行緒包裝成一個讀結點,插入到等待佇列。
由於,目前等待佇列還是空,所以ThreadB會初始化佇列,然後將自身包裝成一個讀結點,插入隊尾,然後在下面這個地方跳出自旋:
if (p == null) { // initialize queue,表示等待佇列為空,且當前執行緒未獲得讀鎖,則初始化佇列(構造頭結點) WNode hd = new WNode(WMODE, null); if (U.compareAndSwapObject(this, WHEAD, null, hd)) wtail = hd; } else if (node == null)//將當前執行緒保證成共享節點 node = new WNode(RMODE, p); else if (h == p || p.mode != RMODE) {//如果等待佇列只有一個頭結點或當前入隊的是寫執行緒,則直接將節點連結到隊尾,連結完成後退出自旋 if (node.prev != p) node.prev = p; else if (U.compareAndSwapObject(this, WTAIL, p, node)) { p.next = node; break;//這裡退出迴圈 } }
此時,等待佇列的結構如下:
跳出自旋後,ThreadB會繼續向下執行,進入下一個自旋,在下一個自旋中,依然會再次嘗試獲取讀鎖,如果這次再獲取不到,就會將前驅的等待狀態置為WAITING, 表示我(當前執行緒)要去睡了(阻塞),到時記得叫醒我:
if (whead == h) { if ((np = node.prev) != p) { if (np != null) (p = np).next = node; // stale } else if ((ps = p.status) == 0)//將前驅結點的等待狀態置為WAITING,表示之後將喚醒當前結點 U.compareAndSwapInt(p, WSTATUS, 0, WAITING); else if (ps == CANCELLED) { if ((pp = p.prev) != null) { node.prev = pp; pp.next = node; } }
最終, ThreadB進入阻塞狀態:
else {//阻塞當前執行緒 long time; if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L)//限時等待超時,取消等待 return cancelWaiter(node, node, false); Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); node.thread = wt; if (p.status < 0 && (p != h || (state & ABITS) == WBIT) && whead == h && node.prev == p) U.park(false, time);//如果前驅的等待狀態為WAITINF,其寫鎖被佔用,則阻塞當前呼叫執行緒 node.thread = null; U.putObject(wt, PARKBLOCKER, null); if (interruptible && Thread.interrupted()) return cancelWaiter(node, node, true); } }
最終,等待佇列的結構如下:
4. ThreadC呼叫readLock獲取讀鎖
這個過程和ThreadB獲取讀鎖一樣,區別在於ThreadC被包裝成結點加入等待佇列後,是連結到ThreadB結點的棧指標中的。呼叫完下面這段程式碼後,ThreadC會連結到以Thread B為棧頂指標的棧中:
else if (!U.compareAndSwapObject(p, WCOWAIT, node.cowait = p.cowait, node))//CAS操作隊尾結點,p的cowait欄位,實際上就是頭插法插入節點 node.cowait = null;
說明:上述程式碼佇列不為空,且隊尾是讀結點,則將添加當前結點連結到隊尾結點的cowait鏈中(實際上構成一個棧,p是棧頂指標)
注意:讀結點的cowait欄位其實構成了一個棧,入棧的過程其實是個“頭插法”插入單鏈表的過程。比如,再來個ThreadX讀結點,則cowait連結串列結構為:
ThreadB - > ThreadX -> ThreadC
。最終喚醒讀結點時,將從棧頂開始。
然後會在下一次自旋中,阻塞當前讀執行緒:
if (whead == h && p.prev == pp) { long time; if (pp == null || h == p || p.status > 0) { node = null; // throw away break; } if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) return cancelWaiter(node, p, false); Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); node.thread = wt; if ((h != pp || (state & ABITS) == WBIT) && whead == h && p.prev == pp) U.park(false, time);//寫鎖被佔用,且當前節點不是隊首節點,則阻塞當前執行緒 node.thread = null; U.putObject(wt, PARKBLOCKER, null); if (interruptible && Thread.interrupted()) return cancelWaiter(node, p, true); }
最終,等待佇列的結構如下:
可以看到,此時ThreadC結點並沒有把它的前驅的等待狀態置為-1,因為ThreadC是連結到棧中的,當寫鎖釋放的時候,會從棧底元素開始,喚醒棧中所有讀結點。
5. ThreadD呼叫writeLock獲取寫鎖
ThreadD呼叫writeLock方法獲取寫鎖失敗後(ThreadA依然佔用著寫鎖),會呼叫acquireWrite方法,該方法整體邏輯和acquireRead差不多,首先自旋的嘗試獲取寫鎖,獲取成功後,就直接返回;否則,會將當前執行緒包裝成一個寫結點,插入到等待佇列。
public long writeLock() { long s, next; // bypass acquireWrite in fully unlocked case only return ((((s = state) & ABITS) == 0L &&//表示讀鎖和寫鎖都未被使用 U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?//CAS操作:將第8位置位1,表示寫鎖被佔用 next : acquireWrite(false, 0L));//獲取失敗則呼叫acquireWrite,加入等待佇列 }
說明:上述程式碼獲取寫鎖,如果失敗,則進入阻塞,注意該方法不響應中斷,返回非0,表示獲取成功
acquireWrite原始碼:
/**
* 嘗試自旋的獲取寫鎖, 獲取不到則阻塞執行緒
*
* @param interruptible true 表示檢測中斷, 如果執行緒被中斷過, 則最終返回INTERRUPTED
* @param deadline 如果非0, 則表示限時獲取
* @return 非0表示獲取成功, INTERRUPTED表示中途被中斷過
*/
private long acquireWrite(boolean interruptible, long deadline) {
WNode node = null, p;
/**
* 自旋入隊操作
* 如果沒有任何鎖被佔用, 則立即嘗試獲取寫鎖, 獲取成功則返回.
* 如果存在鎖被使用, 則將當前執行緒包裝成獨佔結點, 並插入等待佇列尾部
*/
for (int spins = -1; ; ) {
long m, s, ns;
if ((m = (s = state) & ABITS) == 0L) { // 沒有任何鎖被佔用
if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT)) // 嘗試立即獲取寫鎖
return ns; // 獲取成功直接返回
} else if (spins < 0)
spins = (m == WBIT && wtail == whead) ? SPINS : 0;
else if (spins > 0) {
if (LockSupport.nextSecondarySeed() >= 0)
--spins;
} else if ((p = wtail) == null) { // 佇列為空, 則初始化佇列, 構造佇列的頭結點
WNode hd = new WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = hd;
} else if (node == null) // 將當前執行緒包裝成寫結點
node = new WNode(WMODE, p);
else if (node.prev != p)
node.prev = p;
else if (U.compareAndSwapObject(this, WTAIL, p, node)) { // 連結結點至隊尾
p.next = node;
break;
}
}
for (int spins = -1; ; ) {
WNode h, np, pp;
int ps;
if ((h = whead) == p) { // 如果當前結點是隊首結點, 則立即嘗試獲取寫鎖
if (spins < 0)
spins = HEAD_SPINS;
else if (spins < MAX_HEAD_SPINS)
spins <<= 1;
for (int k = spins; ; ) { // spin at head
long s, ns;
if (((s = state) & ABITS) == 0L) { // 寫鎖未被佔用
if (U.compareAndSwapLong(this, STATE, s,
ns = s + WBIT)) { // CAS修改State: 佔用寫鎖
// 將隊首結點從佇列移除
whead = node;
node.prev = null;
return ns;
}
} else if (LockSupport.nextSecondarySeed() >= 0 &&
--k <= 0)
break;
}
} else if (h != null) { // 喚醒頭結點的棧中的所有讀執行緒
WNode c;
Thread w;
while ((c = h.cowait) != null) {
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null)
U.unpark(w);
}
}
if (whead == h) {
if ((np = node.prev) != p) {
if (np != null)
(p = np).next = node; // stale
} else if ((ps = p.status) == 0) // 將當前結點的前驅置為WAITING, 表示當前結點會進入阻塞, 前驅將來需要喚醒我
U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
else if (ps == CANCELLED) {
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
} else { // 阻塞當前呼叫執行緒
long time; // 0 argument to park means no timeout
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, node, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if (p.status < 0 && (p != h || (state & ABITS) != 0L) && whead == h && node.prev == p)
U.park(false, time); // emulate LockSupport.park
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, node, true);
}
}
}
}
acquireWrite中的下面這個自旋操作,用於將執行緒包裝成寫結點,插入隊尾:
for (int spins = -1;;) { // spin while enqueuing long m, s, ns; if ((m = (s = state) & ABITS) == 0L) {//沒用任何鎖被佔用 if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))//嘗試立即獲取寫鎖 return ns;//獲取成功直接返回 } else if (spins < 0) spins = (m == WBIT && wtail == whead) ? SPINS : 0; else if (spins > 0) { if (LockSupport.nextSecondarySeed() >= 0) --spins; } else if ((p = wtail) == null) { // initialize queue,佇列為空,則初始化佇列,構造佇列的頭結點 WNode hd = new WNode(WMODE, null); if (U.compareAndSwapObject(this, WHEAD, null, hd)) wtail = hd; } else if (node == null) node = new WNode(WMODE, p);//將當前執行緒包裝成寫節點 else if (node.prev != p) node.prev = p; else if (U.compareAndSwapObject(this, WTAIL, p, node)) {//連結節點至隊尾 p.next = node; break; } }
說明:上述程式碼自旋入隊操作,如果沒用任何鎖被佔用,則立即嘗試獲取寫鎖,獲取成功則返回,如果存在鎖被使用,則將當前執行緒包裝成獨佔節點,並插入等待佇列尾部
插入完成後,佇列結構如下:
然後,進入下一個自旋,並在下一個自旋中阻塞ThreadD,最終佇列結構如下:
6. ThreadE呼叫readLock獲取讀鎖
同樣,由於寫鎖被ThreadA佔用著,所以最終會呼叫acquireRead方法,在該方法的第一個自旋中,會將ThreadE加入等待佇列:
注意,由於隊尾結點是寫結點,所以當前讀結點會直接連結到隊尾;如果隊尾是讀結點,則會連結到隊尾讀結點的cowait鏈中。
然後進入第二個自旋,阻塞ThreadE,最終佇列結構如下:
7. ThreadA呼叫unlockWrite釋放寫鎖
通過CAS操作,修改State成功後,會呼叫release方法喚醒等待佇列的隊首結點:
//釋放寫鎖 public void unlockWrite(long stamp) { WNode h; if (state != stamp || (stamp & WBIT) == 0L)//stamp不匹配,或者寫鎖未被佔用,丟擲異常 throw new IllegalMonitorStateException(); state = (stamp += WBIT) == 0L ? ORIGIN : stamp;//正常情況下,stamp+=WBIT後,第8位位0,表示寫鎖被釋放;但是溢位,則置為ORIGIN if ((h = whead) != null && h.status != 0) release(h);//喚醒等待佇列中的隊首節點 }
release方法非常簡單,先將頭結點的等待狀態置為0,表示即將喚醒後繼結點,然後立即喚醒隊首結點:
//喚醒等待佇列的隊首節點(即頭結點whead的後繼節點) private void release(WNode h) { if (h != null) { WNode q; Thread w; U.compareAndSwapInt(h, WSTATUS, WAITING, 0);//將頭結點的等待狀態從-1置為0,表示將要喚醒後繼節點 if ((q = h.next) == null || q.status == CANCELLED) {//從隊尾開始查詢距離頭結點最近的WAITING節點 for (WNode t = wtail; t != null && t != h; t = t.prev) if (t.status <= 0) q = t; } if (q != null && (w = q.thread) != null) U.unpark(w);//喚醒隊首節點 } }
此時,等待佇列的結構如下:
8. ThreadB被喚醒後繼續向下執行
ThreadB被喚醒後,會從原阻塞處繼續向下執行,然後開始下一次自旋:
if (whead == h) { if ((np = node.prev) != p) { if (np != null) (p = np).next = node; // stale } else if ((ps = p.status) == 0)前驅結點的等待狀態設定為WAITING,表示之後喚醒當前結點 U.compareAndSwapInt(p, WSTATUS, 0, WAITING);//將 else if (ps == CANCELLED) { if ((pp = p.prev) != null) { node.prev = pp; pp.next = node; } } else {//阻塞當前讀執行緒 long time; // 0 argument to park means no timeout if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L)//限時等待超時,取消等待 return cancelWaiter(node, node, false); Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); node.thread = wt; if (p.status < 0 && (p != h || (state & ABITS) != 0L) && whead == h && node.prev == p) U.park(false, time); // emulate LockSupport.park,如果前驅的等待狀態為WAITING,且寫鎖被佔用,則阻塞當前呼叫執行緒,注意,ThreadB從此處被喚醒,並繼續向下執行 node.thread = null; U.putObject(wt, PARKBLOCKER, null); if (interruptible && Thread.interrupted()) return cancelWaiter(node, node, true); } }
第二次自旋時,ThreadB發現寫鎖未被佔用,則成功獲取到讀鎖,然後從棧頂(ThreadB的cowait指標指向的結點)開始喚醒棧中所有執行緒,
最後返回:
for (int k = spins;;) { // spin at head long m, s, ns; if ((m = (s = state) & ABITS) < RFULL ?//判斷寫鎖是否被佔用 U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) ://寫鎖未被佔用,且讀鎖數量未超限制,則更新同步狀態 (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {//寫鎖未被佔用,但讀鎖數量限制,超出部分放到readerOverflow欄位中 WNode c; Thread w;//獲取讀鎖成功,釋放cowrite鏈中的所有讀結點 whead = node; node.prev = null;//釋放頭節點,當前隊首節點成為新的頭結點 //從棧頂開始(node.cowait指向的節點),依次喚醒所有讀結點,最終node.cowait==null,node成為新的頭結點 while ((c = node.cowait) != null) { if (U.compareAndSwapObject(node, WCOWAIT, c, c.cowait) && (w = c.thread) != null) U.unpark(w); } return ns; } else if (m >= WBIT && LockSupport.nextSecondarySeed() >= 0 && --k <= 0) break; }
最終,等待佇列的結構如下:
9. ThreadC被喚醒後繼續向下執行
ThreadC被喚醒後,繼續執行,並進入下一次自旋,下一次自旋時,會成功獲取到讀鎖。
for (;;) { WNode pp, c; Thread w; //嘗試喚醒頭節點whead的cowait中的第一個元素,假如是讀鎖會通過迴圈釋放cowait鏈 if ((h = whead) != null && (c = h.cowait) != null && U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null) // help release U.unpark(w); if (h == (pp = p.prev) || h == p || pp == null) { long m, s, ns; do { if ((m = (s = state) & ABITS) < RFULL ? U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) return ns; } while (m < WBIT); } if (whead == h && p.prev == pp) { long time; if (pp == null || h == p || p.status > 0) { node = null; // throw away break; } if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) return cancelWaiter(node, p, false); Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); node.thread = wt; if ((h != pp || (state & ABITS) == WBIT) && whead == h && p.prev == pp) U.park(false, time);//寫鎖被釋放,且當前節點不是隊首節點,則阻塞當前執行緒 node.thread = null; U.putObject(wt, PARKBLOCKER, null); if (interruptible && Thread.interrupted()) return cancelWaiter(node, p, true); } }
注意,此時ThreadB和ThreadC已經拿到了讀鎖,ThreadD(寫執行緒)和ThreadE(讀執行緒)依然阻塞中,原來ThreadC對應的結點是個孤立結點,會被GC回收。
最終,等待佇列的結構如下:
10. ThreadB和ThreadC釋放讀鎖
ThreadB和ThreadC呼叫unlockRead方法釋放讀鎖,CAS操作State將讀鎖數量減1:
//釋放讀鎖 public void unlockRead(long stamp) { long s, m; WNode h; for (;;) { if (((s = state) & SBITS) != (stamp & SBITS) ||//stamp不匹配,或沒用任何鎖被佔用,都會丟擲異常 (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT) throw new IllegalMonitorStateException(); if (m < RFULL) {//讀鎖數量未超限 if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {//讀鎖數量-1 if (m == RUNIT && (h = whead) != null && h.status != 0)//如果當前讀鎖數量為1,喚醒等待佇列中的隊首節點 release(h); break; } } else if (tryDecReaderOverflow(s) != 0L)//讀鎖數量超限,則溢位欄位要-1 break; } }
注意,當讀鎖的數量變為0時才會呼叫release方法,喚醒隊首結點:
//喚醒等待佇列中的隊首節點(即頭結點whead的後繼節點) private void release(WNode h) { if (h != null) { WNode q; Thread w; U.compareAndSwapInt(h, WSTATUS, WAITING, 0);//將頭結點的等待狀態從-1置為0,表示將要喚醒後繼節點 if ((q = h.next) == null || q.status == CANCELLED) {//從隊尾開始查詢距離頭結點最近的WAITING節點 for (WNode t = wtail; t != null && t != h; t = t.prev) if (t.status <= 0) q = t; } if (q != null && (w = q.thread) != null) U.unpark(w);//喚醒隊首節點 } }
隊首結點(ThreadD寫結點被喚醒),最終等待佇列的結構如下:
11. ThreadD被喚醒後繼續向下執行
ThreadD會從原阻塞處繼續向下執行,並在下一次自旋中獲取到寫鎖,然後返回:
for (int spins = -1;;) { WNode h, np, pp; int ps; if ((h = whead) == p) { if (spins < 0) spins = HEAD_SPINS; else if (spins < MAX_HEAD_SPINS) spins <<= 1; for (int k = spins;;) { // spin at head long m, s, ns; if ((m = (s = state) & ABITS) < RFULL ? U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) { WNode c; Thread w; whead = node; node.prev = null; while ((c = node.cowait) != null) { if (U.compareAndSwapObject(node, WCOWAIT, c, c.cowait) && (w = c.thread) != null) U.unpark(w); } return ns; } else if (m >= WBIT && LockSupport.nextSecondarySeed() >= 0 && --k <= 0) break; } } else if (h != null) { WNode c; Thread w; while ((c = h.cowait) != null) { if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null) U.unpark(w); } } if (whead == h) { if ((np = node.prev) != p) { if (np != null) (p = np).next = node; // stale } else if ((ps = p.status) == 0) U.compareAndSwapInt(p, WSTATUS, 0, WAITING); else if (ps == CANCELLED) { if ((pp = p.prev) != null) { node.prev = pp; pp.next = node; } } else { long time; if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) return cancelWaiter(node, node, false); Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); node.thread = wt; if (p.status < 0 && (p != h || (state & ABITS) == WBIT) && whead == h && node.prev == p) U.park(false, time); node.thread = null; U.putObject(wt, PARKBLOCKER, null); if (interruptible && Thread.interrupted()) return cancelWaiter(node, node, true); } }
最終,等待佇列的結構如下:
12. ThreadD呼叫unlockWrite釋放寫鎖
ThreadD釋放寫鎖的過程和步驟7完全相同,會呼叫unlockWrite喚醒隊首結點(ThreadE)。
ThreadE被喚醒後會從原阻塞處繼續向下執行,但由於ThreadE是個讀結點,所以同時會喚醒cowait棧中的所有讀結點,過程和步驟8完全一樣。最終,等待佇列的結構如下:
至此,全部執行完成。
四、StampedLock總結
StampedLock的等待佇列與RRW的CLH佇列相比,有以下特點:
- 當入隊一個執行緒時,如果隊尾是讀結點,不會直接連結到隊尾,而是連結到該讀結點的cowait鏈中,cowait鏈本質是一個棧;
- 當入隊一個執行緒時,如果隊尾是寫結點,則直接連結到隊尾;
- QS類似喚醒執行緒的規則和A,都是首先喚醒隊首結點。區別是StampedLock中,當喚醒的結點是讀結點時,會喚醒該讀結點的cowait鏈中的所有讀結點(順序和入棧順序相反,也就是後進先出)。
另外,StampedLock使用時要特別小心,避免鎖重入的操作,在使用樂觀讀鎖時也需要遵循相應的呼叫模板,防止出現數據不一致的問題。
參考書籍
Java併發程式設計之美
參考連結