1. 程式人生 > >多執行緒之美5一 AbstractQueuedSynchronizer原始碼分析<一>

多執行緒之美5一 AbstractQueuedSynchronizer原始碼分析<一>

AQS的原始碼分析 <一>

目錄結構

1、什麼是CAS ?

2、同步器類結構

3、CLH同步佇列

4、AQS中靜態內部類Node

5、方法分析

​ 5.1、acquire(int arg )

​ 5.2、release(int arg) 釋放鎖

6、總結

前言

在多執行緒環境下,我們一般會對臨界區資源(共享資源)進行加鎖,釋放鎖,保證同一時刻最多隻有一個執行緒(獨佔模式),就如去公共廁所裡,在使用一個小房間時會加鎖避免自己在使用的時候,別人突然闖進來一樣,引起不必要的麻煩,在使用完後,再開啟鎖,其他人才可使用;還有生產者消費者模型中,執行緒之間要同步,需要等待和通知機制,來協調執行緒合作。那麼這些是這麼實現的?如可重入鎖ReentrantLock, 讀寫鎖ReadWriteLock, 訊號量 Semaphore, 計數器CountDownLatch,這些都會涉及執行緒之間的協調同步,那麼會有一個抽象的結構,將這些需要共用的功能抽離出來,統一來滿足要求嗎?我們一起來看看AbstractQueuedSynchronizer 這個抽象類,如何來實現這些功能和其設計的巧妙, 我們能看到Doug lea 大佬在很多地方使用的迴圈CAS操作(自旋鎖)。

1、什麼是CAS ?

CAS 即 compare and swap 比較並交換, 涉及到三個引數,記憶體值V, 預期值A, 要修改的新值B, 拿著預期值A與記憶體值V比較,相等則符合預期,將記憶體值V更新為B, 否則不相等,不能更新V;

比如我想去果籃拿1個蘋果,我預期籃子是5個,而果籃中實際有2個,被其他人早已偷偷吃了幾個我還不知道!於是我的期望值5與果籃值實際2不符,那麼我就不能更新籃子的蘋果數量了。這和資料庫中樂觀鎖機制一樣。後面我想用一篇文章單獨總結一下CAS, 如它存在的ABA問題以及加上版本號來解決ABA問題, 還有用CAS實現自旋鎖等。

2、同步器類結構

public abstract class AbstractQueuedSynchronizer  extends AbstractOwnableSynchronizer

AbstractQueuedSynchronizer (以後簡稱AQS)是一個抽象類,定義了一些通用功能以及子類需要根據自身特性重寫實現的方法,有兩個內部類 Node和ConditionObject, 以及2個連結串列結構的佇列,一個是雙向連結串列的同步佇列,用來存放嘗試獲取鎖,卻未獲取到鎖需等待的執行緒,另一個是單向連結串列的條件佇列,用來存放某些執行緒已經獲取到鎖了,為了等待某些事件(如IO事件,mq訊息等)主動放棄鎖掛起等待條件的執行緒。

這裡主要分析2個方法,分別是獨佔式獲取鎖 acquire, 釋放鎖release。

// 獨佔模式 EXCLUSIVE
acquire(int arg)
release(int arg)
// 共享模式 SHARED
acquireShared(int arg)
releaseShared(int arg)

3、CLH同步佇列

AQS 內部持有一個雙向連結串列的佇列(CLH佇列,也稱執行緒同步佇列,用來儲存嘗試獲取鎖,未獲取到等待的執行緒),AQS 持有head, tail節點,以及資源同步狀態state 屬性, 來表示有限的鎖資源佔用情況。

注:Head節點是一個空節點,是不關聯執行緒的,因為Head節點表示當前已經佔用資源在使用的執行緒,該執行緒已經不需要等待鎖,在同步佇列中作佔位作用,它只是一個標誌節點,其他執行緒才是對鎖資源有需求,在佇列中等待的,慢慢看來,後面程式碼會更有理解。

雙向連結串列結構如下圖:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    /**
     * 等待佇列中的頭節點,懶初始化,如果頭節點存在,表示當前正在佔用鎖資源的執行緒。                
     *
     */
    private transient volatile Node head;

    /**
     * 等待佇列中尾節點
     */
    private transient volatile Node tail;

    /**
     * 同步狀態,在獨佔模式下 state=0,表示鎖空閒,可獲取,state =1,則表示被其他執行緒佔用,需等待
     * 就如火車上的廁所,綠燈表示可用,紅燈表示裡面有人,需要等待
     */
    private volatile int state;

4、AQS中靜態內部類Node

CLH雙向連結串列中每個node 中有關聯的執行緒,節點狀態資訊,以及節點的前後指標,每個屬性有volatile修飾,可保證多執行緒之間的可見性。

static final class Node {
    // 獲取鎖的模式分為獨佔式和共享式
        static final Node SHARED = new Node();
       
       //獨佔式
        static final Node EXCLUSIVE = null;
   
  // 以下是節點在等待佇列中的5種狀態,初始為0,即waitStatus的值

        static final int CANCELLED =  1;
  
        static final int SIGNAL    = -1;

        static final int CONDITION = -2;
  
        static final int PROPAGATE = -3;

// 以下4個屬性分別用volatile修飾,保證多執行緒之間可見性
 
 //節點的狀態
        volatile int waitStatus;
   //前置節點
        volatile Node prev;
  // 後置節點
        volatile Node next;
 // 節點關聯的執行緒
        volatile Thread thread;

Node中在佇列中的等待狀態 waitStatus,初始為0,還有以下4種狀態和其含義:

  • SIGNAL: waitStatus= -1,表示當前節點執行緒在使用完同步資源,釋放資源後,需要去喚醒後面的一個節點中的執行緒。
  • CANCELLED:waitStatus= 1,節點在迴圈嘗試獲取鎖資源失敗,被中斷了,就不再需要再在佇列中等待資源了,需要執行節點出隊操作,將該節點移出該佇列。就如一群小夥伴都在排隊等待領取免費的辣條呢,都要排到小名了, 突然被媽媽叫著回去吃飯了,被強行中斷了,那就要移出佇列,讓給其他人了哦。
  • PROPAGATE:waitStatus= -3, 該狀態是共享模式下,如CountDownLatch等實現同步時,可有多個執行緒同時獲取鎖資源,在獨佔模式下,鎖資源有限,只有1個,一次最多有一個執行緒獲取到鎖資源,共享模式下,鎖資源可有多個,可有多個執行緒同時獲取到鎖,如果鎖資源數量可足夠下一個節點中執行緒使用,會繼續喚醒下一個。
  • CONDITION:waitStatus= -2,表示在Condition條件等待佇列中。

5、方法分析

5.1、acquire(int arg )

鎖資源只有1個,同一時刻最多隻有一個執行緒可獲取到鎖,獨佔式獲取鎖,其他未獲取到鎖的執行緒需在同步佇列中等待。

 public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
          //如果在等待過程中,執行緒被標記中斷了,響應中斷
            selfInterrupt();
    }

變形為:

 public final void acquire(int arg) {
      boolean ta = tryAcquire(arg)
      if(!ta) {
        Node  aw = addWaiter(Node.EXCLUSIVE);
        boolean isInterrupt = acquireQueued(aw,arg);
        if(isInterrupt) {
          //如果在等待過程中,執行緒被標記中斷了,響應中斷
            selfInterrupt();
        }
      }
 }

1) tryAcquire(arg): 嘗試去獲取鎖資源,具體怎樣獲取鎖由每個子類實現,AQS不做處理,統一丟擲 throw new UnsupportedOperationException() 異常;

2) addWaiter(Node.EXCLUSIVE):如果嘗試獲取鎖失敗,將該執行緒封裝到一個新node節點中,新增到等待佇列中

3) acquireQueued(node, arg) :如果在head節點後,再次嘗試去獲取鎖,可能佔用鎖的執行緒已經釋放資源了,如果再次獲取失敗,在佇列中尋找到安全位置(告知前置節點,使用完資源喚醒一下自己)後,則在佇列中掛起,等待被喚醒,返回布林型別,表示執行緒是否被中斷。

下面來分別檢視以上3個方法的原始碼:

1)tryAcquire() 是嘗試去獲取鎖,需要每個具體實現類去實現,預設丟擲異常

 protected boolean tryAcquire(int arg) {
    //丟擲不支援操作異常
        throw new UnsupportedOperationException();
    }

來看看 ReentrantLock 是怎樣實現 tryAcquire() 去獲取鎖的,ReentrantLock 有公平方式和非公平兩種方式,這裡簡單看看非公平方式的實現細節。

protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

內部呼叫nonfairTryAcquire 方法,程式碼如下:

  final boolean nonfairTryAcquire(int acquires) {
          //1,獲取當前執行緒物件
            final Thread current = Thread.currentThread();
          //2,獲取同步狀態state
            int c = getState();
         //3, c為0表示鎖可用,cas更新為1,compareAndSetState內部有unsafe類 呼叫本地方法(native修飾)執行原子更新操作,多個執行緒同時進來更新,只會有一個更新成功。更新成功就設定當前執行緒物件,用來標識當前執行緒擁有鎖了,是有主的,其他執行緒在我沒釋放鎖之前不可佔用,也是用來支援可重入的。
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
              //4,如果當前執行緒等於當前已獲取鎖執行緒,可重入,同步狀態 +1,返回true,允許再次獲取鎖
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

可重入鎖 : 指執行緒在獲取鎖資源後,再次去獲取同步鎖,是允許再次獲得的,不然存在產生死鎖的風險,由上可知ReentrantLock 是支援可重入的,可重入多次,每重入一次 state +1,那麼重入n 次,也需要釋放n次(同理,每釋放一次,state -1 ),不然state 無法恢復到到空閒狀態0,導致其他執行緒無法獲取到鎖。

2)addWaiter(Node mode),將當前執行緒封裝為一個節點,新增到等待佇列中。該方法分為2步操作,第一步利用CAS快速將節點入隊,如果能入隊成功,則返回,否則第二步執行 enq() 方法。

為什麼?

為了執行效率和安全性兩方面考慮,因在多執行緒環境下,同一時間可能併發執行多個執行緒,如果此時有2個執行緒都在執行入隊操作,就有可能其中一個執行緒執行CAS 失敗,他引用 的tail節點已經被更新,需要重新獲取新的,此時就需要執行enq() 迴圈輪詢去獲取最新的tail節點執行入隊操作,直到成功。

如圖所示:

執行緒A在剛獲取到pred = tail後,cpu排程切換到執行緒B,執行緒B此時也執行入隊操作,入隊成功,佇列長度+1, tail更新,此時cpu排程 執行緒A,執行緒A再執行 compareAndSetTail(pred, node) 操作就會失敗。

快速新增到隊尾失敗了,有兩種情況,

1.是tail節點 為空,佇列為空,那麼需要初始化佇列,新增一個空的head節點,表示當前已經獲取鎖資源,正在執行的執行緒。

2.是在cas新增到隊尾後面時,有其他執行緒也在操作,搶先入隊成功,導致自己獲取的tail節點不是最新的,那就輪詢獲取最新的tail執行更新。

流程圖如下:

addWaiter(Node mode) 程式碼如下:

    private Node addWaiter(Node mode) {
      /*1,建立 node節點,節點資訊儲存執行緒及模式
       * Node.EXCLUSIVE for exclusive 獨佔模式
       * Node.SHARED for shared 共享模式
       */
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
  //2,如果隊尾節點不為空,嘗試將節點新增到 tail後面,建立雙向連結 
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
      //3,如果隊尾節點為空,或者cas新增節點失敗
        enq(node);
        return node;
    }

enq(node) 節點入隊操作,for無限迴圈嘗試去新增節點到隊尾,直至成功。

private Node enq(final Node node) {
    for (;;) {
      //每新的一次迴圈,獲取的是最新的tail,因tail節點是volatile修飾的,多執行緒之間記憶體可見,每次更新都會被重新整理到記憶體
        Node t = tail;
        if (t == null) { // Must initialize
           //1,head節點是一個 node物件,但是沒有繫結執行緒,表示當前已經獲取到鎖資源,正在執行的執行緒,此時首尾相同,會再次走一遍迴圈,添加當前節點到head節點後面
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
          //2,輪詢去設定節點到隊尾
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

boolean acquireQueued(final Node node, int arg) 返回執行緒是否被中斷

在上一步方法將節點新增到同步佇列後,執行該方法的目的是:

如果node的前置節點是head, 再次嘗試一次獲取鎖資源,如果已經被釋放了,那(執行緒)就去獲取鎖執行。如果不是頭節點或者嘗試獲取鎖資源失敗,那就在佇列找個位置掛起等待了 。

此時需要找一個安全點,確保前面的有效節點(沒被取消的節點)的執行緒執行後,能夠通知喚醒自己,當前節點執行緒才安心掛起等待。

流程圖如下:

原始碼如下:

  final boolean acquireQueued(final Node node, int arg) {
        //預設是失敗
        boolean failed = true;
        try {
          //預設是沒有被中斷
            boolean interrupted = false;
          //無限迴圈
            for (;;) {
                final Node p = node.predecessor();
               //1,如果前置節點是head節點,再去嘗試獲取鎖,如果獲取成功了,則將自己更新為head,此時當前執行緒可以執行了。
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                   // 此時返回 false,沒被中斷
                    return interrupted;
                }
             //2,1失敗或前置不是head,那麼要定位一個有效的位置去阻塞等待,前面有些節點可能是被取消的,需要跳過這些節點,清除出佇列,shouldParkAfterFailedAcquire就是來做這個事
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                  //3,標記中斷標記,返回被中斷過 
                    interrupted = true;
            }
        } finally {
          // 4,正常情況下是一直會進行for迴圈,當跳出該迴圈,即出現異常,如被標記了中斷,去呼叫阻塞,丟擲中斷異常,此時沒有在佇列中找到安全位置,因此將該節點移出佇列
            if (failed)
                cancelAcquire(node);
        }
    }

shouldParkAfterFailedAcquire 在失敗獲取鎖資源後,尋找可以安全掛起的位置

如果前置節點被取消,一直向前找到節點 ws<=0 為止,注意藍色箭頭,指向的前後指標還在,但是node1物件已經沒有被引用,下次gc會被回收,則出隊。

流程圖如下:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
       //1,前置節點已經是signal,在它釋放資源後,會通知喚醒佇列中下一個執行緒,所以當前位置就是安全阻塞等待點
            return true;
        if (ws > 0) {
          // ws> 0,前置節點被取消
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
      // 2,ws>0,那麼就繼續往前找,直到找到一個節點沒被取消的,跳過了那些被取消的節點,這些節點後面會被GC   
            pred.next = node;
        } else {
          //3,嘗試將前置節點 狀態更新為signal,ws= -1, 進行下一次輪詢
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

在找到安全位置後,掛起當前執行緒,等待被喚醒,如果下次被喚醒,首先檢查一下自己是被前置節點喚醒 還是被中斷喚醒的

 private final boolean parkAndCheckInterrupt() {
      // 掛起當前執行緒
        LockSupport.park(this);
       //當被喚醒,返回是否被中斷標記位
        return Thread.interrupted();
    }

最後finally中,執行緒被中斷,出現異常,取消節點;cancelAcquire(Node node) 將該節點移出等待佇列,執行緒都被中斷了,就不再

需要去等待鎖資源了。

出隊有三種情況:

  • 1, 節點在隊尾 ,等於tail

  • 2, 節點在中間位置,既不在head後面,也不是tail

注: 我理解這裡最終實現是要達到第3步的效果,但是在cancelAcquire方法裡並沒有更新第2步藍色箭頭的前置指標,而是留給了其他執行緒在獲取鎖資源時,執行shouldParkAfterFailedAcquire方法查詢安全位置掛起來實現的,這時 node 將沒有再被引用,因此會被GC。

  • 3, 被取消節點在head的後面,第二個節點

    這裡唯一做的就是去喚醒 node 的後繼節點(如果為沒被取消的節點),如果後繼節點為空或者ws =1,那麼就在佇列從後向前遍歷去喚醒一個有效節點,這個節點醒來做的事情是嘗試去獲取鎖,並且將排在它佇列前連續被取消的節點出隊。所以這裡更新節點的前後指標操作還是交給其他執行緒來做的,在 shouldParkAfterFailedAcquire方法實現的。

    原始碼如下:

    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;
       //1,將節點與執行緒解綁
        node.thread = null;
       
       //2,跳過前面所有被取消的節點
        // Skip cancelled predecessors
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
       //3,獲取前置節點後一個位置節點,不一定是 node,有可能是跳過的被取消的節點,留作cas更新用
        Node predNext = pred.next;
        //4,將當前節點狀態改為取消 ws =1
        node.waitStatus = Node.CANCELLED;

       //5,第一種情況:尾節點,將node前置節點更新為tail,並斷開前置節點的next後置指標,node將無引用,被gc
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
          
            int ws;
          // 第三種情況:node在中間位置,要做的就是讓node前置節點 pred的next指標指向node後的沒被取消的節點,跳過node
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
              // 第二種情況,在head後面,那麼要需向node後找沒被取消節點,讓head跳過node指向他,並喚醒該節點的執行緒。
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

喚醒後繼者,該方法在release 方法也被執行,線上程使用完資源後,去喚醒下一個等待該資源的執行緒去執行。

 private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
      // 狀態置為0,初始狀態,也不讓她通知下一個節點,此時節點應該是取消狀態1,是大於0的,為啥還需再判斷一次?,這個方法不只這裡用,釋放資源release也用,正常ws =-1,所以更新狀態為0

       //首先判斷node後置節點,如果不行,再從尾向前遍歷去找一個沒被取消的節點 去喚醒其執行緒
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

5.2、release(int arg) 釋放鎖

  public final boolean release(int arg) {
     //1,釋放資源前提是已經獲取到
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
              //2,head的waitStatus在獨佔模式應該是signal =-1
              // 喚醒下一個節點,程式碼如上
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

同樣,tryRelease() 與 tryAcquire() 一樣,嘗試釋放鎖,AQS都未具體實現,丟擲異常,留個子類具體去實現。

AQS中程式碼:

  protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

這裡還是看一下ReentrantLock 的具體實現

  protected final boolean tryRelease(int releases) {
      //1,在沒有可重入情況下,state =1, releases =1, 此時 c應該 =0 
            int c = getState() - releases;
       //2,判斷當前釋放鎖執行緒是否等於拿到鎖的執行緒,不等就丟擲異常
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
      //3,可見只有c=0,情況才可成功釋放鎖
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

注:因ReentrantLock可重入的鎖,由上程式碼可見,只有在 state=0的情況,才可成功釋放鎖,常見錯誤場景:在使用ReentrantLock lock n次,卻沒有unlock相應的n次,導致沒有成功釋放鎖。

6、總結

1、 AQS的CLH佇列的 pre指標可保證可靠性,next是保證不了的,看上面的程式碼分析,在遍歷佇列時,會向前遍歷來查詢安全點。

2、Condition 佇列與 CLH同步佇列兩者的區別,同步佇列中的所有執行緒是在等待獲取鎖的,Condition條件佇列是某些之前已經獲取到鎖,因為要等待某個事件(如IO事件)或者與某個執行緒同步(等待另一執行緒執行結果),主動呼叫await方法,主動釋放鎖後,將該執行緒放入條件佇列中的,所以在條件佇列中執行緒不是等待獲取鎖,而是在等某個條件,下一篇我會來分析AQS的重要機制- 等待通知,靠其內部類ConditionObject實現,await和 signal來阻塞與 通知機制,可替代傳統Object類提供的wait和 notify功能,而AQS可有多個條件來分類,比之應用更加靈活