1. 程式人生 > >Java並發編程-AbstractQueuedSynchronizer源碼分析

Java並發編程-AbstractQueuedSynchronizer源碼分析

otherwise 場景 獨占鎖 serial moni 流程圖 升級版 catch 所有

簡介

提供了一個基於FIFO隊列,可以用於構建鎖或者其他相關同步裝置的基礎框架。該同步器(以下簡稱同步器)利用了一個int來表示狀態,期望它能夠成為實現大部分同步需求的基礎。使用的方法是繼承,子類通過繼承同步器並需要實現它的方法來管理其狀態,管理的方式就是通過類似acquire和release的方式來操縱狀態。然而多線程環境中對狀態的操縱必須確保原子性,因此子類對於狀態的把握,需要使用這個同步器提供的以下三個方法對狀態進行操作:

  • java.util.concurrent.locks.AbstractQueuedSynchronizer.getState()
  • java.util.concurrent.locks.AbstractQueuedSynchronizer.setState(int)
  • java.util.concurrent.locks.AbstractQueuedSynchronizer.compareAndSetState(int, int)

子類推薦被定義為自定義同步裝置的內部類,同步器自身沒有實現任何同步接口,它僅僅是定義了若幹acquire之類的方法來供使用。該同步器即可以作為排他模式也可以作為共享模式,當它被定義為一個排他模式時,其他線程對其的獲取就被阻止,而共享模式對於多個線程獲取都可以成功。


同步器是實現鎖的關鍵,利用同步器將鎖的語義實現,然後在鎖的實現中聚合同步器。可以這樣理解:鎖的API是面向使用者的,它定義了與鎖交互的公共行為,而每個鎖需要完成特定的操作也是透過這些行為來完成的(比如:可以允許兩個線程進行加鎖,排除兩個以上的線程),但是實現是依托給同步器來完成;同步器面向的是線程訪問和資源控制,它定義了線程對資源是否能夠獲取以及線程的排隊等操作。鎖和同步器很好的隔離了二者所需要關註的領域,嚴格意義上講,同步器可以適用於除了鎖以外的其他同步設施上(包括鎖)。
同步器的開始提到了其實現依賴於一個FIFO隊列,那麽隊列中的元素Node就是保存著線程引用和線程狀態的容器,每個線程對同步器的訪問,都可以看做是隊列中的一個節點。Node的主要包含以下成員變量:

1    Node {
2        int waitStatus;
3        Node prev;
4        Node next;
5        Node nextWaiter;
6        Thread thread;
7    }

以上五個成員變量主要負責保存該節點的線程引用,同步等待隊列(以下簡稱sync隊列)的前驅和後繼節點,同時也包括了同步狀態。

屬性名稱 描述
int waitStatus 表示節點的狀態。其中包含的狀態有:

  1. CANCELLED,值為1,表示當前的線程被取消;
  2. SIGNAL,值為-1,表示當前節點的後繼節點包含的線程需要運行,也就是unpark;
  3. CONDITION,值為-2,表示當前節點在等待condition,也就是在condition隊列中;
  4. PROPAGATE,值為-3,表示當前場景下後續的acquireShared能夠得以執行;
  5. 值為0,表示當前節點在sync隊列中,等待著獲取鎖。
Node prev 前驅節點,比如當前節點被取消,那就需要前驅節點和後繼節點來完成連接。
Node next 後繼節點。
Node nextWaiter 存儲condition隊列中的後繼節點。
Thread thread 入隊列時的當前線程。

節點成為sync隊列和condition隊列構建的基礎,在同步器中就包含了sync隊列。同步器擁有三個成員變量:sync隊列的頭結點head、sync隊列的尾節點tail和狀態state。對於鎖的獲取,請求形成節點,將其掛載在尾部,而鎖資源的轉移(釋放再獲取)是從頭部開始向後進行。對於同步器維護的狀態state,多個線程對其的獲取將會產生一個鏈式的結構。
技術分享

API說明

實現自定義同步器時,需要使用同步器提供的getState()、setState()和compareAndSetState()方法來操縱狀態的變遷。

方法名稱 描述
protected boolean tryAcquire(int arg) 排它的獲取這個狀態。這個方法的實現需要查詢當前狀態是否允許獲取,然後再進行獲取(使用compareAndSetState來做)狀態。
protected boolean tryRelease(int arg) 釋放狀態。
protected int tryAcquireShared(int arg) 共享的模式下獲取狀態。
protected boolean tryReleaseShared(int arg) 共享的模式下釋放狀態。
protected boolean isHeldExclusively() 在排它模式下,狀態是否被占用。

實現這些方法必須是非阻塞而且是線程安全的,推薦使用該同步器的父類java.util.concurrent.locks.AbstractOwnableSynchronizer來設置當前的線程。
開始提到同步器內部基於一個FIFO隊列,對於一個獨占鎖的獲取和釋放有以下偽碼可以表示。
獲取一個排他鎖。

01    while(獲取鎖) {
02        if (獲取到) {
03            退出while循環
04        } else {
05            if(當前線程沒有入隊列) {
06                那麽入隊列
07            }
08            阻塞當前線程
09        }
10    }

釋放一個排他鎖。

1    if (釋放成功) {
2        刪除頭結點
3        激活原頭結點的後繼節點
4    }

示例

下面通過一個排它鎖的例子來深入理解一下同步器的工作原理,而只有掌握同步器的工作原理才能夠更加深入了解其他的並發組件。
排他鎖的實現,一次只能一個線程獲取到鎖。

01    class Mutex implements Lock, java.io.Serializable {
02       // 內部類,自定義同步器
03       private static class Sync extends AbstractQueuedSynchronizer {
04         // 是否處於占用狀態
05         protected boolean isHeldExclusively() {
06           return getState() == 1;
07         }
08         // 當狀態為0的時候獲取鎖
09         public boolean tryAcquire(int acquires) {
10           assert acquires == 1; // Otherwise unused
11           if (compareAndSetState(0, 1)) {
12             setExclusiveOwnerThread(Thread.currentThread());
13             return true;
14           }
15           return false;
16         }
17         // 釋放鎖,將狀態設置為0
18         protected boolean tryRelease(int releases) {
19           assert releases == 1; // Otherwise unused
20           if (getState() == 0) throw new IllegalMonitorStateException();
21           setExclusiveOwnerThread(null);
22           setState(0);
23           return true;
24         }
25         // 返回一個Condition,每個condition都包含了一個condition隊列
26         Condition newCondition() { return new ConditionObject(); }
27       }
28       // 僅需要將操作代理到Sync上即可
29       private final Sync sync = new Sync();
30       public void lock()                { sync.acquire(1); }
31       public boolean tryLock()          { return sync.tryAcquire(1); }
32       public void unlock()              { sync.release(1); }
33       public Condition newCondition()   { return sync.newCondition(); }
34       public boolean isLocked()         { return sync.isHeldExclusively(); }
35       public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
36       public void lockInterruptibly() throws InterruptedException {
37         sync.acquireInterruptibly(1);
38       }
39       public boolean tryLock(long timeout, TimeUnit unit)
40           throws InterruptedException {
41         return sync.tryAcquireNanos(1, unit.toNanos(timeout));
42       }
43     }

可以看到Mutex將Lock接口均代理給了同步器的實現。
使用方將Mutex構造出來之後,調用lock獲取鎖,調用unlock進行解鎖。下面以Mutex為例子,詳細分析以下同步器的實現邏輯。

實現分析

public final void acquire(int arg)

該方法以排他的方式獲取鎖,對中斷不敏感,完成synchronized語義。

1    public final void acquire(int arg) {
2            if (!tryAcquire(arg) &&
3                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4                selfInterrupt();
5    }

上述邏輯主要包括:
1. 嘗試獲取(調用tryAcquire更改狀態,需要保證原子性);
在tryAcquire方法中使用了同步器提供的對state操作的方法,利用compareAndSet保證只有一個線程能夠對狀態進行成功修改,而沒有成功修改的線程將進入sync隊列排隊。
2. 如果獲取不到,將當前線程構造成節點Node並加入sync隊列;
進入隊列的每個線程都是一個節點Node,從而形成了一個雙向隊列,類似CLH隊列,這樣做的目的是線程間的通信會被限制在較小規模(也就是兩個節點左右)。
3. 再次嘗試獲取,如果沒有獲取到那麽將當前線程從線程調度器上摘下,進入等待狀態。
使用LockSupport將當前線程unpark,關於LockSupport後續會詳細介紹。

01    private Node addWaiter(Node mode) {
02        Node node = new Node(Thread.currentThread(), mode);
03        // 快速嘗試在尾部添加
04        Node pred = tail;
05        if (pred != null) {
06            node.prev = pred;
07            if (compareAndSetTail(pred, node)) {
08                pred.next = node;
09                return node;
10            }
11        }
12        enq(node);
13        return node;
14    }
15     
16    private Node enq(final Node node) {
17        for (;;) {
18            Node t = tail;
19            if (t == null) { // Must initialize
20                if (compareAndSetHead(new Node()))
21                    tail = head;
22            } else {
23                node.prev = t;
24                if (compareAndSetTail(t, node)) {
25                t.next = node;
26                return t;
27            }
28        }
29    }

上述邏輯主要包括:
1. 使用當前線程構造Node;
對於一個節點需要做的是將當節點前驅節點指向尾節點(current.prev = tail),尾節點指向它(tail = current),原有的尾節點的後繼節點指向它(t.next = current)而這些操作要求是原子的。上面的操作是利用尾節點的設置來保證的,也就是compareAndSetTail來完成的。
2. 先行嘗試在隊尾添加;
如果尾節點已經有了,然後做如下操作:
(1)分配引用T指向尾節點;
(2)將節點的前驅節點更新為尾節點(current.prev = tail);
(3)如果尾節點是T,那麽將當尾節點設置為該節點(tail = current,原子更新);
(4)T的後繼節點指向當前節點(T.next = current)。
註意第3點是要求原子的。
這樣可以以最短路徑O(1)的效果來完成線程入隊,是最大化減少開銷的一種方式。
3. 如果隊尾添加失敗或者是第一個入隊的節點。
如果是第1個節點,也就是sync隊列沒有初始化,那麽會進入到enq這個方法,進入的線程可能有多個,或者說在addWaiter中沒有成功入隊的線程都將進入enq這個方法。
可以看到enq的邏輯是確保進入的Node都會有機會順序的添加到sync隊列中,而加入的步驟如下:
(1)如果尾節點為空,那麽原子化的分配一個頭節點,並將尾節點指向頭節點,這一步是初始化;
(2)然後是重復在addWaiter中做的工作,但是在一個while(true)的循環中,直到當前節點入隊為止。
進入sync隊列之後,接下來就是要進行鎖的獲取,或者說是訪問控制了,只有一個線程能夠在同一時刻繼續的運行,而其他的進入等待狀態。而每個線程都是一個獨立的個體,它們自省的觀察,當條件滿足的時候(自己的前驅是頭結點並且原子性的獲取了狀態),那麽這個線程能夠繼續運行。

01    final boolean acquireQueued(final Node node, int arg) {
02        boolean failed = true;
03        try {
04            boolean interrupted = false;
05            for (;;) {
06                final Node p = node.predecessor();
07                if (p == head && tryAcquire(arg)) {
08                    setHead(node);
09                    p.next = null; // help GC
10                    failed = false;
11                    return interrupted;
12                }
13                if (shouldParkAfterFailedAcquire(p, node) &&
14                        parkAndCheckInterrupt())
15                    interrupted = true;
16                    }
17        } finally {
18            if (failed)
19                cancelAcquire(node);
20        }
21    }

上述邏輯主要包括:
1. 獲取當前節點的前驅節點;
需要獲取當前節點的前驅節點,而頭結點所對應的含義是當前站有鎖且正在運行。
2. 當前驅節點是頭結點並且能夠獲取狀態,代表該當前節點占有鎖;
如果滿足上述條件,那麽代表能夠占有鎖,根據節點對鎖占有的含義,設置頭結點為當前節點。
3. 否則進入等待狀態。
如果沒有輪到當前節點運行,那麽將當前線程從線程調度器上摘下,也就是進入等待狀態。
這裏針對acquire做一下總結:
1. 狀態的維護;
需要在鎖定時,需要維護一個狀態(int類型),而對狀態的操作是原子和非阻塞的,通過同步器提供的對狀態訪問的方法對狀態進行操縱,並且利用compareAndSet來確保原子性的修改。
2. 狀態的獲取;
一旦成功的修改了狀態,當前線程或者說節點,就被設置為頭節點。
3. sync隊列的維護。
在獲取資源未果的過程中條件不符合的情況下(不該自己,前驅節點不是頭節點或者沒有獲取到資源)進入睡眠狀態,停止線程調度器對當前節點線程的調度。
這時引入的一個釋放的問題,也就是說使睡眠中的Node或者說線程獲得通知的關鍵,就是前驅節點的通知,而這一個過程就是釋放,釋放會通知它的後繼節點從睡眠中返回準備運行。
下面的流程圖基本描述了一次acquire所需要經歷的過程:
技術分享
如上圖所示,其中的判定退出隊列的條件,判定條件是否滿足和休眠當前線程就是完成了自旋spin的過程。

public final boolean release(int arg)

在unlock方法的實現中,使用了同步器的release方法。相對於在之前的acquire方法中可以得出調用acquire,保證能夠獲取到鎖(成功獲取狀態),而release則表示將狀態設置回去,也就是將資源釋放,或者說將鎖釋放。

1    public final boolean release(int arg) {
2        if (tryRelease(arg)) {
3            Node h = head;
4            if (h != null && h.waitStatus != 0)
5                unparkSuccessor(h);
6            return true;
7        }
8        return false;
9    }

上述邏輯主要包括:
1. 嘗試釋放狀態;
tryRelease能夠保證原子化的將狀態設置回去,當然需要使用compareAndSet來保證。如果釋放狀態成功過之後,將會進入後繼節點的喚醒過程。
2. 喚醒當前節點的後繼節點所包含的線程。
通過LockSupport的unpark方法將休眠中的線程喚醒,讓其繼續acquire狀態。

01    private void unparkSuccessor(Node node) {
02        // 將狀態設置為同步狀態
03        int ws = node.waitStatus;
04        if (ws < 0)      compareAndSetWaitStatus(node, ws, 0);   // 獲取當前節點的後繼節點,如果滿足狀態,那麽進行喚醒操作  

// 如果沒有滿足狀態,從尾部開始找尋符合要求的節點並將其喚醒 Node s = node.next; if (s == null || s.waitStatus > 0) { 05 s = null; 06 for (Node t = tail; t != null && t != node; t = t.prev) 07 if (t.waitStatus <= 0) 08 s = t; 09 } 10 if (s != null) 11 LockSupport.unpark(s.thread); 12 }

上述邏輯主要包括,該方法取出了當前節點的next引用,然後對其線程(Node)進行了喚醒,這時就只有一個或合理個數的線程被喚醒,被喚醒的線程繼續進行對資源的獲取與爭奪。
回顧整個資源的獲取和釋放過程:
在獲取時,維護了一個sync隊列,每個節點都是一個線程在進行自旋,而依據就是自己是否是首節點的後繼並且能夠獲取資源;
在釋放時,僅僅需要將資源還回去,然後通知一下後繼節點並將其喚醒。
這裏需要註意,隊列的維護(首節點的更換)是依靠消費者(獲取時)來完成的,也就是說在滿足了自旋退出的條件時的一刻,這個節點就會被設置成為首節點。

protected boolean tryAcquire(int arg)

tryAcquire是自定義同步器需要實現的方法,也就是自定義同步器非阻塞原子化的獲取狀態,如果鎖該方法一般用於Lock的tryLock實現中,這個特性是synchronized無法提供的。

public final void acquireInterruptibly(int arg)

該方法提供獲取狀態能力,當然在無法獲取狀態的情況下會進入sync隊列進行排隊,這類似acquire,但是和acquire不同的地方在於它能夠在外界對當前線程進行中斷的時候提前結束獲取狀態的操作,換句話說,就是在類似synchronized獲取鎖時,外界能夠對當前線程進行中斷,並且獲取鎖的這個操作能夠響應中斷並提前返回。一個線程處於synchronized塊中或者進行同步I/O操作時,對該線程進行中斷操作,這時該線程的中斷標識位被設置為true,但是線程依舊繼續運行。
如果在獲取一個通過網絡交互實現的鎖時,這個鎖資源突然進行了銷毀,那麽使用acquireInterruptibly的獲取方式就能夠讓該時刻嘗試獲取鎖的線程提前返回。而同步器的這個特性被實現Lock接口中的lockInterruptibly方法。根據Lock的語義,在被中斷時,lockInterruptibly將會拋出InterruptedException來告知使用者。

01    public final void acquireInterruptibly(int arg)
02        throws InterruptedException {
03        if (Thread.interrupted())
04            throw new InterruptedException();
05        if (!tryAcquire(arg))
06            doAcquireInterruptibly(arg);
07    }
08     
09    private void doAcquireInterruptibly(int arg)
10        throws InterruptedException {
11        final Node node = addWaiter(Node.EXCLUSIVE);
12        boolean failed = true;
13        try {
14            for (;;) {
15                final Node p = node.predecessor();
16                if (p == head && tryAcquire(arg)) {
17                    setHead(node);
18                    p.next = null; // help GC
19                    failed = false;
20                    return;
21                }
22                // 檢測中斷標誌位
23                if (shouldParkAfterFailedAcquire(p, node) &&
24                parkAndCheckInterrupt())
25                    throw new InterruptedException();
26            }
27        } finally {
28            if (failed)
29                cancelAcquire(node);
30        }
31    }

上述邏輯主要包括:
1. 檢測當前線程是否被中斷;
判斷當前線程的中斷標誌位,如果已經被中斷了,那麽直接拋出異常並將中斷標誌位設置為false。
2. 嘗試獲取狀態;
調用tryAcquire獲取狀態,如果順利會獲取成功並返回。
3. 構造節點並加入sync隊列;
獲取狀態失敗後,將當前線程引用構造為節點並加入到sync隊列中。退出隊列的方式在沒有中斷的場景下和acquireQueued類似,當頭結點是自己的前驅節點並且能夠獲取到狀態時,即可以運行,當然要將本節點設置為頭結點,表示正在運行。
4. 中斷檢測。
在每次被喚醒時,進行中斷檢測,如果發現當前線程被中斷,那麽拋出InterruptedException並退出循環。

private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException

該方法提供了具備有超時功能的獲取狀態的調用,如果在指定的nanosTimeout內沒有獲取到狀態,那麽返回false,反之返回true。可以將該方法看做acquireInterruptibly的升級版,也就是在判斷是否被中斷的基礎上增加了超時控制。
針對超時控制這部分的實現,主要需要計算出睡眠的delta,也就是間隔值。間隔可以表示為nanosTimeout = 原有nanosTimeout – now(當前時間)+ lastTime(睡眠之前記錄的時間)。如果nanosTimeout大於0,那麽還需要使當前線程睡眠,反之則返回false。

01    private boolean doAcquireNanos(int arg, long nanosTimeout)
02    throws InterruptedException {
03        long lastTime = System.nanoTime();
04        final Node node = addWaiter(Node.EXCLUSIVE);
05        boolean failed = true;
06        try {
07            for (;;) {
08                final Node p = node.predecessor();
09                if (p == head && tryAcquire(arg)) {
10                    setHead(node);
11                    p.next = null; // help GC
12                    failed = false;
13                    return true;
14                }
15                if (nanosTimeout <= 0)               return false;           if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
16                LockSupport.parkNanos(this, nanosTimeout);
17                long now = System.nanoTime();
18                //計算時間,當前時間減去睡眠之前的時間得到睡眠的時間,然後被
19                //原有超時時間減去,得到了還應該睡眠的時間
20                nanosTimeout -= now - lastTime;
21                lastTime = now;
22                if (Thread.interrupted())
23                    throw new InterruptedException();
24            }
25        } finally {
26            if (failed)
27                cancelAcquire(node);
28        }
29    }

上述邏輯主要包括:
1. 加入sync隊列;
將當前線程構造成為節點Node加入到sync隊列中。
2. 條件滿足直接返回;
退出條件判斷,如果前驅節點是頭結點並且成功獲取到狀態,那麽設置自己為頭結點並退出,返回true,也就是在指定的nanosTimeout之前獲取了鎖。
3. 獲取狀態失敗休眠一段時間;
通過LockSupport.unpark來指定當前線程休眠一段時間。
4. 計算再次休眠的時間;
喚醒後的線程,計算仍需要休眠的時間,該時間表示為nanosTimeout = 原有nanosTimeout – now(當前時間)+ lastTime(睡眠之前記錄的時間)。其中now – lastTime表示這次睡眠所持續的時間。
5. 休眠時間的判定。
喚醒後的線程,計算仍需要休眠的時間,並無阻塞的嘗試再獲取狀態,如果失敗後查看其nanosTimeout是否大於0,如果小於0,那麽返回完全超時,沒有獲取到鎖。 如果nanosTimeout小於等於1000L納秒,則進入快速的自旋過程。那麽快速自旋會造成處理器資源緊張嗎?結果是不會,經過測算,開銷看起來很小,幾乎微乎其微。Doug Lea應該測算了在線程調度器上的切換造成的額外開銷,因此在短時1000納秒內就讓當前線程進入快速自旋狀態,如果這時再休眠相反會讓nanosTimeout的獲取時間變得更加不精確。
上述過程可以如下圖所示:
技術分享
上述這個圖中可以理解為在類似獲取狀態需要排隊的基礎上增加了一個超時控制的邏輯。每次超時的時間就是當前超時剩余的時間減去睡眠的時間,而在這個超時時間的基礎上進行了判斷,如果大於0那麽繼續睡眠(等待),可以看出這個超時版本的獲取狀態只是一個近似超時的獲取狀態,因此任何含有超時的調用基本結果就是近似於給定超時。

public final void acquireShared(int arg)

調用該方法能夠以共享模式獲取狀態,共享模式和之前的獨占模式有所區別。以文件的查看為例,如果一個程序在對其進行讀取操作,那麽這一時刻,對這個文件的寫操作就被阻塞,相反,這一時刻另一個程序對其進行同樣的讀操作是可以進行的。如果一個程序在對其進行寫操作,那麽所有的讀與寫操作在這一時刻就被阻塞,直到這個程序完成寫操作。
以讀寫場景為例,描述共享和獨占的訪問模式,如下圖所示:
技術分享
上圖中,紅色代表被阻塞,綠色代表可以通過。

01    public final void acquireShared(int arg) {
02        if (tryAcquireShared(arg) < 0)   doAcquireShared(arg); } private void doAcquireShared(int arg) {     

final Node node = addWaiter(Node.SHARED);
boolean failed = true;

try {

boolean interrupted = false;

for (;;) {

final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);

if (r >= 0) { 03 setHeadAndPropagate(node, r); 04 p.next = null; // help GC 05 if (interrupted) 06 selfInterrupt(); 07 failed = false; 08 return; 09 } 10 } 11 if (shouldParkAfterFailedAcquire(p, node) && 12 parkAndCheckInterrupt()) 13 interrupted = true; 14 } 15 } finally { 16 if (failed) 17 cancelAcquire(node); 18 } 19 }

上述邏輯主要包括:
1. 嘗試獲取共享狀態;
調用tryAcquireShared來獲取共享狀態,該方法是非阻塞的,如果獲取成功則立刻返回,也就表示獲取共享鎖成功。
2. 獲取失敗進入sync隊列;
在獲取共享狀態失敗後,當前時刻有可能是獨占鎖被其他線程所把持,那麽將當前線程構造成為節點(共享模式)加入到sync隊列中。
3. 循環內判斷退出隊列條件;
如果當前節點的前驅節點是頭結點並且獲取共享狀態成功,這裏和獨占鎖acquire的退出隊列條件類似。
4. 獲取共享狀態成功;
在退出隊列的條件上,和獨占鎖之間的主要區別在於獲取共享狀態成功之後的行為,而如果共享狀態獲取成功之後會判斷後繼節點是否是共享模式,如果是共享模式,那麽就直接對其進行喚醒操作,也就是同時激發多個線程並發的運行。
5. 獲取共享狀態失敗。
通過使用LockSupport將當前線程從線程調度器上摘下,進入休眠狀態。
對於上述邏輯中,節點之間的通知過程如下圖所示:
技術分享
上圖中,綠色表示共享節點,它們之間的通知和喚醒操作是在前驅節點獲取狀態時就進行的,紅色表示獨占節點,它的被喚醒必須取決於前驅節點的釋放,也就是release操作,可以看出來圖中的獨占節點如果要運行,必須等待前面的共享節點均釋放了狀態才可以。而獨占節點如果獲取了狀態,那麽後續的獨占式獲取和共享式獲取均被阻塞。

public final boolean releaseShared(int arg)

調用該方法釋放共享狀態,每次獲取共享狀態acquireShared都會操作狀態,同樣在共享鎖釋放的時候,也需要將狀態釋放。比如說,一個限定一定數量訪問的同步工具,每次獲取都是共享的,但是如果超過了一定的數量,將會阻塞後續的獲取操作,只有當之前獲取的消費者將狀態釋放才可以使阻塞的獲取操作得以運行。

上述邏輯主要就是調用同步器的tryReleaseShared方法來釋放狀態,並同時在doReleaseShared方法中喚醒其後繼節點。

一個例子

在上述對同步器AbstractQueuedSynchronizer進行了實現層面的分析之後,我們通過一個例子來加深對同步器的理解:
設計一個同步工具,該工具在同一時刻,只能有兩個線程能夠並行訪問,超過限制的其他線程進入阻塞狀態。
對於這個需求,可以利用同步器完成一個這樣的設定,定義一個初始狀態,為2,一個線程進行獲取那麽減1,一個線程釋放那麽加1,狀態正確的範圍在[0,1,2]三個之間,當在0時,代表再有新的線程對資源進行獲取時只能進入阻塞狀態(註意在任何時候進行狀態變更的時候均需要以CAS作為原子性保障)。由於資源的數量多於1個,同時可以有兩個線程占有資源,因此需要實現tryAcquireShared和tryReleaseShared方法,這裏謝謝luoyuyou和同事小明指正,已經修改了實現。

01    public class TwinsLock implements Lock {
02        private final Sync  sync    = new Sync(2);
03     
04        private static final class Sync extends AbstractQueuedSynchronizer {
05            private static final long   serialVersionUID    = -7889272986162341211L;
06     
07            Sync(int count) {
08                if (count <= 0) {
09                    throw new IllegalArgumentException("count must large than zero.");
10                }
11                setState(count);
12            }
13     
14            public int tryAcquireShared(int reduceCount) {
15                for (;;) {
16                    int current = getState();
17                    int newCount = current - reduceCount;
18                    if (newCount < 0 || compareAndSetState(current, newCount)) {
19                        return newCount;
20                    }
21                }
22            }
23     
24            public boolean tryReleaseShared(int returnCount) {
25                for (;;) {
26                    int current = getState();
27                    int newCount = current + returnCount;
28                    if (compareAndSetState(current, newCount)) {
29                        return true;
30                    }
31                }
32            }
33        }
34     
35        public void lock() {
36            sync.acquireShared(1);
37        }
38     
39        public void lockInterruptibly() throws InterruptedException {
40            sync.acquireSharedInterruptibly(1);
41        }
42     
43        public boolean tryLock() {
44            return sync.tryAcquireShared(1) >= 0;
45        }
46     
47        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
48            return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
49        }
50     
51        public void unlock() {
52            sync.releaseShared(1);
53        }
54     
55        @Override
56        public Condition newCondition() {
57            return null;
58        }
59    }

這裏我們編寫一個測試來驗證TwinsLock是否能夠正常工作並達到預期。

01    public class TwinsLockTest {
02     
03        @Test
04        public void test() {
05            final Lock lock = new TwinsLock();
06     
07            class Worker extends Thread {
08                public void run() {
09                    while (true) {
10                        lock.lock();
11     
12                        try {
13                            Thread.sleep(1000L);
14                    System.out.println(Thread.currentThread());
15                            Thread.sleep(1000L);
16                        } catch (Exception ex) {
17     
18                        } finally {
19                            lock.unlock();
20                        }
21                    }
22                }
23            }
24     
25            for (int i = 0; i &lt; 10; i++) {
26                Worker w = new Worker();
27                w.start();
28            }
29     
30            new Thread() {
31                public void run() {
32                    while (true) {
33     
34                        try {
35                            Thread.sleep(200L);
36                            System.out.println();
37                        } catch (Exception ex) {
38     
39                        }
40                    }
41                }
42            }.start();
43     
44            try {
45                Thread.sleep(20000L);
46            } catch (InterruptedException e) {
47                e.printStackTrace();
48            }
49        }
50    }

上述測試用例的邏輯主要包括:
?1. 打印線程
Worker在兩次睡眠之間打印自身線程,如果一個時刻只能有兩個線程同時訪問,那麽打印出來的內容將是成對出現。
?2. 分隔線程
不停的打印換行,能讓Worker的輸出看起來更加直觀。
該測試的結果是在一個時刻,僅有兩個線程能夠獲得到鎖,並完成打印,而表象就是打印的內容成對出現。

Java並發編程-AbstractQueuedSynchronizer源碼分析