1. 程式人生 > >多執行緒高併發程式設計(3) -- ReentrantLock原始碼分析AQS

多執行緒高併發程式設計(3) -- ReentrantLock原始碼分析AQS

背景:

  AbstractQueuedSynchronizer(AQS)

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable
  • 介紹

  1. 提供一個框架,用於實現依賴先進先出(FIFO)等待佇列的阻塞鎖和相關同步器(訊號量,事件等)。 該類被設計為大多數型別的同步器的有用依據,這些同步器依賴於單個原子int值來表示狀態。 子類必須定義改變此狀態的受保護方法,以及根據該物件被獲取或釋放來定義該狀態的含義。 給定這些,這個類中的其他方法執行所有排隊和阻塞機制。 子類可以保持其他狀態欄位,但只以原子方式更新int
    使用方法操縱值getState()setState(int)compareAndSetState(int, int)被跟蹤相對於同步。
  2. 子類應定義為非公共內部助手類,用於實現其封閉類的同步屬性。 AbstractQueuedSynchronizer類不實現任何同步介面。 相反,它定義了一些方法,如acquireInterruptibly(int) ,可以通過具體的鎖和相關同步器來呼叫適當履行其公共方法。
  3. 此類支援預設獨佔模式和共享模式。【使用模板模式來定義是獨佔還是共享模式】 當以獨佔模式獲取時,嘗試通過其他執行緒獲取不能成功。 多執行緒獲取的共享模式可能(但不需要)成功。 除了在機械意義上,這個類不理解這些差異,當共享模式獲取成功時,下一個等待執行緒(如果存在)也必須確定它是否也可以獲取。 在不同模式下等待的執行緒共享相同的FIFO佇列。 通常,實現子類只支援這些模式之一,但是兩者都可以在ReadWriteLock
    中發揮作用 。 僅支援獨佔或僅共享模式的子類不需要定義支援未使用模式的方法。
  4. 這個類定義的巢狀AbstractQueuedSynchronizer.ConditionObject可用於作為一類Condition由子類支援獨佔模式用於該方法的實施isHeldExclusively()份報告是否同步排他相對於保持在當前執行緒,方法release(int)與當前呼叫getState()值完全釋放此目的,和acquire(int) ,給定此儲存的狀態值,最終將此物件恢復到其先前獲取的狀態。 AbstractQueuedSynchronizer方法將建立此類條件,因此如果不能滿足此約束,請勿使用該約束。 AbstractQueuedSynchronizer.ConditionObject
    的行為當然取決於其同步器實現的語義。
  5. 該類為內部佇列提供檢查,檢測和監控方法,以及條件物件的類似方法。 這些可以根據需要匯出到類中,使用AbstractQueuedSynchronizer進行同步機制。
  6. 此類的序列化僅儲存底層原子整數維持狀態,因此反序列化物件具有空執行緒佇列。 需要可序列化的典型子類將定義一個readObject方法,可以將其恢復為readObject時的已知初始狀態。
  • 用法

    使用這個類用作同步的基礎上,重新定義以下方法,【即在同步器中定義內部類重寫下面的方法】,如適用,通過檢查和/或修改使用所述同步狀態getState()setState(int)compareAndSetState(int, int)【通過這三個方法來保證原子性和執行緒安全性】 :

    • tryAcquire(int)獨佔模式獲取鎖
    • tryRelease(int)獨佔模式釋放鎖
    • tryAcquireShared(int)共享模式獲取鎖
    • tryReleaseShared(int)共享模式釋放鎖
    • isHeldExclusively()是否是獨佔式
    每個這些方法預設丟擲UnsupportedOperationException。 這些方法的實現必須是執行緒安全的,通常應該是短的而不是阻止的。 定義這些方法是唯一支援使用此類的方法。 所有其他方法都被宣告為final ,因為它們不能獨立變化。
  • 看下面的原始碼分析前可先觀看多執行緒高併發程式設計(2) -- 可重入鎖介紹和自定義

一.ReentrantLock的lock和unlock原始碼解析

  • lock流程:

  1. 呼叫同步器Sync的抽象方法lock,由公平鎖FairSync實現
  2. 呼叫AQS的acquire獲取鎖
    1. 嘗試獲取鎖:公平鎖FairSync的tryAcquire
      1. 第一次獲取鎖,若佇列沒有前節點和設定狀態成功,儲存當前執行緒,返回true,表示獲取成功;
      2. 如果是重入獲取,鎖持有數量+1;
    2. 獲取鎖失敗,把當前執行緒加入等待佇列中,並對等待佇列進行阻塞,不斷競爭獲取鎖:acquireQueued遍歷等待佇列addWaiter,若有頭節點則從佇列中取出來,若沒有則進行中斷;
  • unlock流程:

  1. 呼叫同步器的release方法,有AQS實現;
  2. tryRelease釋放鎖,由Sync實現
    1. 鎖數量-1;
    2. 當前執行緒和儲存的執行緒不一致丟擲異常;
    3. 鎖數量為0則進行釋放鎖,把獨佔執行緒設定為null,修改狀態; 
  1 //===========================ReentrantLock原始碼===============================
  2 public class ReentrantLock implements Lock, java.io.Serializable {
  3     private final Sync sync;//同步器
  4     public void lock() {//獲取鎖
  5             sync.lock();//呼叫同步器Sync的lock,由FairSync實現
  6     }
  7     public void unlock() {//使用鎖
  8         sync.release(1);//呼叫同步器的release,由AQS實現
  9     }
 10 
 11 
 12     //內部類,同步器繼承AQS,實現tryRelease釋放鎖
 13     abstract static class Sync extends AbstractQueuedSynchronizer{
 14         abstract void lock();//獲取鎖抽象方法,由FairSync實現
 15 //===========================釋放鎖===============================
 16         protected final boolean tryRelease(int releases) {
 17             int c = getState() - releases;//鎖數量-1
 18             //當前執行緒和儲存的執行緒不一致
 19             if (Thread.currentThread() != getExclusiveOwnerThread())
 20                 throw new IllegalMonitorStateException();
 21             boolean free = false;
 22             if (c == 0) {//持有的鎖數量為0
 23                 free = true;//釋放鎖
 24                 setExclusiveOwnerThread(null);//當前獨佔執行緒為null
 25             }
 26             setState(c);//設定狀態
 27             return free;
 28         }
 29     }
 30 
 31     //內部類,公平鎖繼承同步器,實現lock方法
 32     static final class FairSync extends Sync {
 33             //===========================獲取鎖===============================
 34             final void lock() {
 35                 acquire(1);//呼叫AQS的acquire
 36             }
 37             protected final boolean tryAcquire(int acquires) {
 38                 final Thread current = Thread.currentThread();//獲得當前執行緒
 39                 /**getState是AQS的Node的waitStatus,其值有
 40                 *CANCELLED =  1
 41                 *SIGNAL    = -1
 42                 *CONDITION = -2
 43                 *PROPAGATE = -3
 44                 */
 45                 int c = getState();
 46                 //c初始值為0,0表示不是以上的狀態;hasQueuedPredecessors之前是否有節點,
 47                 //如果是true表示這個執行緒的前面還有節點應該讓前面的節點先獲取鎖,當前執行緒獲取失敗;
 48                 //【非公平鎖少了hasQueuedPredecessors這個判斷】
 49                 //compareAndSetState CAS比較,設定當前狀態為1;setExclusiveOwnerThread當前執行緒設定為獨佔執行緒
 50                 if (c == 0) {
 51                     if (!hasQueuedPredecessors() &&
 52                         compareAndSetState(0, acquires)) {
 53                         setExclusiveOwnerThread(current);
 54                         return true;//獲取成功
 55                     }
 56                 }
 57                 else if (current == getExclusiveOwnerThread()) {//如果是當前執行緒,表示重入
 58                     int nextc = c + acquires;//鎖數量+1
 59                     if (nextc < 0)//小於0表示溢位
 60                         throw new Error("Maximum lock count exceeded");
 61                     setState(nextc);//更新狀態
 62                     return true;//獲取成功
 63                 }
 64                 return false;//獲取失敗
 65             }
 66     }
 67 }
 68 
 69 
 70 
 71 //=============AbstractQueuedSynchronizer原始碼==============
 72 public abstract class AbstractQueuedSynchronizer
 73     extends AbstractOwnableSynchronizer
 74     implements java.io.Serializable{
 75 //===========================獲取鎖===============================
 76     //以獨佔模式獲取,忽略中斷。通過呼叫至少一次tryAcquire(int)實現,成功返回。否則執行緒排隊,
 77     //可能會重複阻塞和解除阻塞,直到成功才呼叫tryAcquire(int)。
 78     public final void acquire(int arg) {//FairSync的lock呼叫
 79             //tryAcquire獲取鎖;acquireQueued執行緒加入到了等待佇列中,進行阻塞等待,競爭獲取鎖;
 80             //addWaiter其他執行緒獲取鎖失敗新增到等待佇列中;Node.EXCLUSIVE節點獨佔,為null
 81             if (!tryAcquire(arg) &&
 82                 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
 83                 selfInterrupt();
 84     }
 85     protected boolean tryAcquire(int arg) {//acquire呼叫,由FairSync實現
 86         throw new UnsupportedOperationException();
 87     }
 88     final boolean acquireQueued(final Node node, int arg) {//acquire呼叫
 89         boolean failed = true;
 90         try {
 91             boolean interrupted = false;
 92             for (;;) {
 93                 final Node p = node.predecessor();//獲取前一個節點
 94                 if (p == head && tryAcquire(arg)) {//如果獲取的節點為頭節點並且獲取到鎖
 95                     setHead(node);//當前節點設定為頭節點
 96                     p.next = null;//頭節點下一節點為空,即把當前節點從佇列中移除出來
 97                     failed = false;
 98                     return interrupted;
 99                 }
100               //當前節點不是頭節點,parkAndCheckInterrupt讓當前執行緒處於阻塞等待狀態由其他執行緒喚醒
101                 if (shouldParkAfterFailedAcquire(p, node) &&
102                     parkAndCheckInterrupt())
103                     interrupted = true;
104             }
105         } finally {
106             if (failed)
107                 cancelAcquire(node);
108         }
109     }
110     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {//acquireQueued呼叫
111         int ws = pred.waitStatus;//獲取前一節點的等待狀態
112         if (ws == Node.SIGNAL)//如果狀態為喚醒狀態
113             return true;
114         if (ws > 0) {//處於CANCELLED狀態
115             do {
116                 node.prev = pred = pred.prev;//1.把所有處於CANCELLED狀態的節點移除
117             } while (pred.waitStatus > 0);
118             pred.next = node;//2.把所有處於CANCELLED狀態的節點移除
119         } else {
120             compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//設定為SIGNAL狀態
121         }
122         return false;
123     }
124     private Node addWaiter(Node mode) {//acquireQueued的引數,在acquire中呼叫
125         Node node = new Node(Thread.currentThread(), mode);//建立Node,當前執行緒指向node
126         Node pred = tail;//前節點指向尾節點【雙向連結串列】
127         if (pred != null) {//尾節點不為空
128             node.prev = pred;//當前執行緒節點指向尾節點
129             if (compareAndSetTail(pred, node)) {//CAS比較,把當前執行緒節點更新為尾節點
130                 pred.next = node;//前尾節點的下一節點指向當前尾節點
131                 return node;
132             }
133         }
134         enq(node);//如果尾節點為空,把當前節點放到一個初始化節點或新增到節點中做為尾節點
135         return node;
136     }
137     private Node enq(final Node node) {//addWaiter呼叫
138         for (;;) {
139             Node t = tail;
140             if (t == null) { // 尾節點為空
141                 if (compareAndSetHead(new Node()))//建立新節點並維護一個頭節點
142                     tail = head;//把當前節點設定為頭節點
143             } else {
144                 node.prev = t;//當前節點指向尾節點
145                 if (compareAndSetTail(t, node)) {//把當前節點更新為尾節點
146                     t.next = node;//前尾節點的下一節點指向當前尾節點
147                     return t;
148                 }
149             }
150         }
151     }
152     static void selfInterrupt() {//acquire呼叫
153         Thread.currentThread().interrupt();//當前執行緒中斷
154     }
155 //===========================釋放鎖===============================
156     public final boolean release(int arg) {//ReentrantLock的unlock呼叫
157         if (tryRelease(arg)) {//當前執行緒鎖釋放成功,喚醒其他執行緒進行資源的競爭
158             Node h = head;
159             if (h != null && h.waitStatus != 0)
160                 unparkSuccessor(h);
161             return true;
162         }
163         return false;
164     }
165     protected boolean tryRelease(int arg) {//release呼叫,由Sync實現
166         throw new UnsupportedOperationException();
167     }
168 }

  備註:公平鎖是針對鎖的獲取而言,如果一個鎖是公平的,那麼鎖的獲取順序就應該符合請求的絕對時間順序;非公平鎖會進行插隊獲取鎖;

二.AQS重寫鎖

流程:

  1. 實現Lock,重寫實現方法lock、lockInterruptibly、tryLock、unlock、newCondition;
  2. 內部類繼承AQS,重寫tryAcquire和tryRelease;
 1 public class MyAQSLock implements Lock{
 2     private MyAQS myAQS;
 3     private class MyAQS extends AbstractQueuedSynchronizer{
 4         @Override
 5         protected boolean tryAcquire(int arg) {
 6             int state = getState();//獲取狀態
 7             Thread thread = Thread.currentThread();
 8             if(state==0){//執行緒第一次進來獲取,狀態為0,表示可以拿到鎖
 9                 if(compareAndSetState(0,arg)){//更新狀態
10                     setExclusiveOwnerThread(Thread.currentThread());//設定為獨佔執行緒,其他執行緒進來進入等待
11                     return true;//獲取成功
12                 }
13             }else if(getExclusiveOwnerThread()== thread){//重入,儲存執行緒等於當前執行緒
14                 setState(state+1);//鎖數量+1
15                 return true;
16             }
17             return false;//獲取失敗
18         }
19 
20         @Override
21         protected boolean tryRelease(int arg) {
22             //當前執行緒不是儲存執行緒
23             if(Thread.currentThread() != getExclusiveOwnerThread()){
24                 throw new RuntimeException();
25             }
26             int state = getState()-arg;//鎖數量-1
27             boolean flag = false;
28             if(state==0){//鎖數量為0
29                 setExclusiveOwnerThread(null);//獨佔鎖為null,表示可以讓其他執行緒進來競爭獲取資源了
30                 flag=true;
31             }
32             setState(state);//更新狀態
33             return flag;
34         }
35 
36         public ConditionObject newConditonObject(){
37             return new ConditionObject();
38         }
39     }
40     @Override
41     public void lock() {
42         myAQS.acquire(1);
43     }
44 
45     @Override
46     public void lockInterruptibly() throws InterruptedException {
47         myAQS.acquireInterruptibly(1);
48     }
49 
50     @Override
51     public boolean tryLock() {
52         return myAQS.tryAcquire(1);
53     }
54 
55     @Override
56     public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
57         return myAQS.tryAcquireNanos(1,unit.toNanos(time));
58     }
59 
60     @Override
61     public void unlock() {
62             myAQS.release(1);
63     }
64 
65     @Override
66     public Condition newCondition() {
67         return myAQS.newConditonObject();
68     }
69 }