1. 程式人生 > >java併發:AbstractQueuedSynchronizer的介紹和原理分析

java併發:AbstractQueuedSynchronizer的介紹和原理分析


API說明

實現自定義同步器時,需要使用同步器提供的getState()、setState()和compareAndSetState()方法來操縱狀態的變遷。

方法名稱描述
protected boolean tryAcquire(int arg)排它的獲取這個狀態。這個方法的實現需要查詢當前狀態是否允許獲取,然後再進行獲取(使用compareAndSetState來做)狀態。
protected boolean tryRelease(int arg) 釋放狀態。
protected int tryAcquireShared(int arg)共享的模式下獲取狀態。
protected boolean tryReleaseShared(int arg)
共享的模式下釋放狀態。
protected boolean isHeldExclusively()在排它模式下,狀態是否被佔用。

實現這些方法必須是非阻塞而且是執行緒安全的,推薦使用該同步器的父類java.util.concurrent.locks.AbstractOwnableSynchronizer來設定當前的執行緒。
開始提到同步器內部基於一個FIFO佇列,對於一個獨佔鎖的獲取和釋放有以下偽碼可以表示。
獲取一個排他鎖。

01while(獲取鎖) {
02if (獲取到) {
03退出while迴圈
04else {
05if(當前執行緒沒有入佇列) {
06那麼入佇列
07}
08阻塞當前執行緒
09}
10}

釋放一個排他鎖。

1
if (釋放成功) {
2刪除頭結點
3啟用原頭結點的後繼節點
4}
public class Mutex implements Lock, java.io.Serializable {
	   // 內部類,自定義同步器
	   private static class Sync extends AbstractQueuedSynchronizer {
	     // 是否處於佔用狀態
	     protected boolean isHeldExclusively() {
	       return getState() == 1;
	     }
	     // 當狀態為0的時候獲取鎖
	     public boolean tryAcquire(int acquires) {
	       assert acquires == 1; // Otherwise unused
	       if (compareAndSetState(0, 1)) {
	         setExclusiveOwnerThread(Thread.currentThread());
	         return true;
	       }
	       return false;
	     }
	     // 釋放鎖,將狀態設定為0
	     protected boolean tryRelease(int releases) {
	       assert releases == 1; // Otherwise unused
	       if (getState() == 0) throw new IllegalMonitorStateException();
	       setExclusiveOwnerThread(null);
	       setState(0);
	       return true;
	     }
	     // 返回一個Condition,每個condition都包含了一個condition佇列
	     Condition newCondition() { return new ConditionObject(); }
	   }
	   // 僅需要將操作代理到Sync上即可
	   private final Sync sync = new Sync();
	   public void lock()                { sync.acquire(1); }
	   public boolean tryLock()          { return sync.tryAcquire(1); }
	   public void unlock()              { sync.release(1); }
	   public Condition newCondition()   { return sync.newCondition(); }
	   public boolean isLocked()         { return sync.isHeldExclusively(); }
	   public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
	   public void lockInterruptibly() throws InterruptedException {
	     sync.acquireInterruptibly(1);
	   }
	   public boolean tryLock(long timeout, TimeUnit unit)
	       throws InterruptedException {
	     return sync.tryAcquireNanos(1, unit.toNanos(timeout));
	   }
	 }

實現分析

加鎖:public final void acquire(int arg)

該方法以排他的方式獲取鎖,對中斷不敏感,完成synchronized語義。

  public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
上述邏輯主要包括:
1. 嘗試獲取(呼叫tryAcquire更改狀態,需要保證原子性);
在tryAcquire方法中使用了同步器提供的對state操作的方法,利用compareAndSet保證只有一個執行緒能夠對狀態進行成功修改,而沒有成功修改的執行緒將進入sync佇列排隊。
 protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
留空了,是想留給子類去實現,這個可以以後結合ReentrantLock的原始碼去分析。
2. 如果獲取不到,將當前執行緒構造成節點Node並加入sync佇列;
進入佇列的每個執行緒都是一個節點Node,從而形成了一個雙向佇列,類似CLH佇列,這樣做的目的是執行緒間的通訊會被限制在較小規模(也就是兩個節點左右)。(這裡我補充下,下圖是CLH佇列節點的示意圖: