java併發:AbstractQueuedSynchronizer的介紹和原理分析
阿新 • • 發佈:2019-02-03
API說明
實現自定義同步器時,需要使用同步器提供的getState()、setState()和compareAndSetState()方法來操縱狀態的變遷。
方法名稱 | 描述 |
protected boolean tryAcquire(int arg) | 排它的獲取這個狀態。這個方法的實現需要查詢當前狀態是否允許獲取,然後再進行獲取(使用compareAndSetState來做)狀態。 |
protected boolean tryRelease(int arg) | 釋放狀態。 |
protected int tryAcquireShared(int arg) | 共享的模式下獲取狀態。 |
protected boolean tryReleaseShared(int arg) | 共享的模式下釋放狀態。 |
protected boolean isHeldExclusively() | 在排它模式下,狀態是否被佔用。 |
實現這些方法必須是非阻塞而且是執行緒安全的,推薦使用該同步器的父類java.util.concurrent.locks.AbstractOwnableSynchronizer來設定當前的執行緒。
開始提到同步器內部基於一個FIFO佇列,對於一個獨佔鎖的獲取和釋放有以下偽碼可以表示。
獲取一個排他鎖。
01 | while (獲取鎖) { |
02 | if (獲取到) { |
03 | 退出 while 迴圈 |
04 | } else { |
05 | if (當前執行緒沒有入佇列) { |
06 | 那麼入佇列 |
07 | } |
08 | 阻塞當前執行緒 |
09 | } |
10 | } |
釋放一個排他鎖。
1 | if (釋放成功) { |
2 | 刪除頭結點 |
3 | 啟用原頭結點的後繼節點 |
4 | } |
public class Mutex implements Lock, java.io.Serializable { // 內部類,自定義同步器 private static class Sync extends AbstractQueuedSynchronizer { // 是否處於佔用狀態 protected boolean isHeldExclusively() { return getState() == 1; } // 當狀態為0的時候獲取鎖 public boolean tryAcquire(int acquires) { assert acquires == 1; // Otherwise unused if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // 釋放鎖,將狀態設定為0 protected boolean tryRelease(int releases) { assert releases == 1; // Otherwise unused if (getState() == 0) throw new IllegalMonitorStateException(); setExclusiveOwnerThread(null); setState(0); return true; } // 返回一個Condition,每個condition都包含了一個condition佇列 Condition newCondition() { return new ConditionObject(); } } // 僅需要將操作代理到Sync上即可 private final Sync sync = new Sync(); public void lock() { sync.acquire(1); } public boolean tryLock() { return sync.tryAcquire(1); } public void unlock() { sync.release(1); } public Condition newCondition() { return sync.newCondition(); } public boolean isLocked() { return sync.isHeldExclusively(); } public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } }
實現分析
加鎖:public final void acquire(int arg)
該方法以排他的方式獲取鎖,對中斷不敏感,完成synchronized語義。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
上述邏輯主要包括:1. 嘗試獲取(呼叫tryAcquire更改狀態,需要保證原子性);
在tryAcquire方法中使用了同步器提供的對state操作的方法,利用compareAndSet保證只有一個執行緒能夠對狀態進行成功修改,而沒有成功修改的執行緒將進入sync佇列排隊。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
留空了,是想留給子類去實現,這個可以以後結合ReentrantLock的原始碼去分析。2. 如果獲取不到,將當前執行緒構造成節點Node並加入sync佇列;
進入佇列的每個執行緒都是一個節點Node,從而形成了一個雙向佇列,類似CLH佇列,這樣做的目的是執行緒間的通訊會被限制在較小規模(也就是兩個節點左右)。(這裡我補充下,下圖是CLH佇列節點的示意圖: