1. 程式人生 > >Java並發之AQS同步器學習

Java並發之AQS同步器學習

public return null orien 需要 pri pre 後繼節點 this

AQS隊列同步器學習

在學習並發的時候,我們一定會接觸到 JUC 當中的工具,JUC 當中為我們準備了很多在並發中需要用到的東西,但是它們都是基於AQS(AbstractQueuedSynchronizer)隊列同步器來實現的,也就是我們如果能夠去梳理清楚AQS當中的知識點,對我們以後了解其他並發功能鍵有很大的幫助。

CLH隊列

隊列同步器(AbstractQueuedSynchronizer),是用來構建鎖或者其他同步組件的基礎框架,它使用了一個int變量來表示同步狀態,通過內置的FIFO隊列來完成資源獲取線程的排隊工作,並發包的作者Doug Lea期望她能夠成為實現大部分同步需求的基礎。

而這個內置的隊列就是CLH雙向隊列

,當前線程如果獲取鎖失敗的時候,會將當前線程、狀態等信息封裝成一個Node節點添加到CLH隊列當中去--也就是一個Node節點其實就是一個線程,而當有線程釋放時,會喚醒CLH隊列並取其首節點進行再次獲取:

  static final class Node {
          /** Marker to indicate a node is waiting in shared mode */
       //共享模式節點
          static final Node SHARED = new Node();

          /** Marker to indicate a node is waiting in exclusive mode */
       //獨占模式節點
          static final Node EXCLUSIVE = null;
  ?
          /** waitStatus value to indicate thread has cancelled */
       //處於取消的等待狀態
       /* 因為超時或中斷就會處於該狀態,並且處於該狀態的節點不會轉變為其他狀態
          處於該狀態的節點不會再次被阻塞*/
          static final int CANCELLED =  1;

          /** waitStatus value to indicate successor‘s thread needs unparking */
       //等待狀態
       /*  表示後繼節點是否需要被喚醒 */
          static final int SIGNAL    = -1;

          /** waitStatus value to indicate thread is waiting on condition */
       /* 該節點處於條件隊列當中,該節點不會用作同步隊列直到設置狀態0用來傳輸時才會移到同步隊列當中,並且加入對同步狀態的獲取 */
          static final int CONDITION = -2;
          /**
           * waitStatus value to indicate the next acquireShared should
           * unconditionally propagate
           */
       /* 表示下一次共享式同步狀態獲取將會無條件地傳播下去 */
          static final int PROPAGATE = -3;
  ?
       //線程等待狀態
          volatile int waitStatus;
  ?
         //當前節點的前置節點
          volatile Node prev;
  ?
          //當前節點的後置節點
          volatile Node next;
  ?
          //節點所在的線程
          volatile Thread thread;
  ?
         //條件隊列當中的下一個等待節點
          Node nextWaiter;
  ?
          /**
           * 判斷節點是否共享模式
           */
          final boolean isShared() {
              return nextWaiter == SHARED;
          }
  ?
          /**
           * 獲取前置節點
           */
          final Node predecessor() throws NullPointerException {
              Node p = prev;  //獲取前置節點
              if (p == null)  //為空則拋空指針異常
                  throw new NullPointerException();
              else
                  return p;
          }
  ?
          Node() {    // Used to establish initial head or SHARED marker
          }
  ?
          Node(Thread thread, Node mode) {     // Used by addWaiter
              this.nextWaiter = mode;
              this.thread = thread;
          }
  ?
          Node(Thread thread, int waitStatus) { // Used by Condition
              this.waitStatus = waitStatus;
              this.thread = thread;
          }
      }

通過上面對Node節點的源代碼進解說,我想對於之後的內容會有很大的幫助的,因為後面的方法當中會有特別多的狀態判斷。

技術分享圖片

當我們重寫同步器的時候,需要使用同步器的3個方法來訪問和修改同步的狀態。分別是:

  • getState():獲取當前同步狀態

  • setState(int newState):設置當前同步狀態

  • compareAndSetState(int expect, int update):通過CAS來設置當前狀態,該方法可以保證設置狀態操作的原子性

入列

我們在上面既然已經講到了AQS當中維護著的是CLH雙向隊列,並且是FIFO,既然是隊列,那肯定就存在著入列和出列的操作,我們來先從入列看起:

acquire(int arg)方法

該方法是獨占模式下線程獲取同步狀態的入口,如果當前線程獲取同步狀態成功,則由該方法返回,如獲取不成功將會進入CLH隊列當中進行等待。

在該方法當中會調用重寫的tryAcquire(int arg)方法。

  public final void acquire(int arg) {
      if (!tryAcquire(arg) &&
          acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
          selfInterrupt();
  }
  • tryAcquire(int arg)

    很多人剛看到這個方法的時候,會不會有種一臉懵逼的感覺,方法體居然只是返回一個異常而已,說好的業務邏輯代碼呢?

    回到我們一開始說的,AQS實際上只是作為一個同步組件的基礎框架,具體的實現要交由自定義的同步器去自己實現,所以該方法當中只有一句異常。

此方法由用戶自定義的同步器去實現,嘗試獲取獨占資源,如果成功則返回true,如果失敗則返回false

      protected boolean tryAcquire(int arg) {
          throw new UnsupportedOperationException();
      }
  • addWaiter(Node mode)

    將當前線程添加到CLH隊列的隊尾,並且指定獨占模式。

    Node有兩種模式,分別是獨占模式和共享模式,也就是Node.EXCLUSIVENode.SHARED

    private Node addWaiter(Node mode) {
              //將當前線程以指定模式來創建Node節點
              Node node = new Node(Thread.currentThread(), mode);
              // Try the fast path of enq; backup to full enq on failure
              Node pred = tail;  //獲取隊列尾部給變量pred
              if (pred != null) {  //若隊尾不為空
                  node.prev = pred;  //將當前節點的前置節點指向原來的tail
                  if (compareAndSetTail(pred, node)) {  //通過CAS將tail設置為Node
                      /*
                      *如果設置成功,表示此操作沒有別的線程執行成功
      */ 
                      pred.next = node;  //將原來tail節點的後置節點指向node節點
                      return node;  //返回node節點
                  }
              }
              enq(node);
              return node;
          }
  • enq(Node )

    該方法是將節點插入到CLH隊列的尾部,並且通過自旋(死循環)來保證Node節點的正確添加

      private Node enq(final Node node) {
              for (;;) {  //自旋--死循環添加節點
                  Node t = tail;  //獲取原來tial節點至t變量
                  if (t == null) { // Must initialize  隊列為空
                      if (compareAndSetHead(new Node()))  //設置一個空節點作為head節點
                          tail = head;  //head和tail是同一個節點
                  } else {  //隊列不為空的正常情況
                      node.prev = t;  //設置當前節點的前置節點為原tail節點
                      if (compareAndSetTail(t, node)) {  //通過CAS設置當前節點為tail節點
                          t.next = node;  //原tail節點後置節點是當前節點
                          return t;  //返回原tail節點結束循環
                      }
                  }
              }
          }
  • acquireQueued(final Node node, int arg)

    來到這個方法,證明已經通過tryAcquire獲取同步狀態失敗了,並且調用了addWaiter方法將當前線程添加至CLH隊列的尾部了,剩下的就是在等待狀態當中等其他線程來喚醒自己去獲取同步狀態了。

    對於已經處於CLH隊列當中的線程,是以獨占並且不可中斷的模式去獲取同步狀態。

      final boolean acquireQueued(final Node node, int arg) {
          boolean failed = true;  //成功獲取資源的狀態
          try {
              boolean interrupted = false;  //是否被中斷的狀態
              for (;;) { //自旋--死循環
                  final Node p = node.predecessor();  //獲取當前節點的前置節點
                  //如果前置節點是首節點,並且已經成功獲取同步狀態
      if (p == head && tryAcquire(arg)) { 
                      setHead(node);  //設置當前節點為head節點,並且將當前node節點的前置節點置null
                      p.next = null; //設置原head節點的後置節點為null,方便GC回收原來的head節點
      failed = false; 
                      return interrupted; //返回是否被中斷
                  }
                  //獲取同步狀態失敗後,判斷是否需要阻塞或中斷
                  if (shouldParkAfterFailedAcquire(p, node) &&
                      parkAndCheckInterrupt())
                      interrupted = true;  //如果被中斷過,設置標記為true
              }
          } finally {
              if (failed)
                  cancelAcquire(node);  //取消當前節點繼續獲取同步狀態的嘗試
          }
      }
  • shouldParkAfterFailedAcquire(Node pred, Node node)

    對於獲取狀態失敗的節點,檢查並更新其狀態,如果線程阻塞就返回true,這是所有獲取狀態循環的信號控制方法。

    要求pred == node.prev

實際上除非鎖獲取成功,要不然都會被阻塞起來

      private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
          int ws = pred.waitStatus;  //獲取前驅節點的狀態
          //狀態為-1,表示後繼節點已經處於waiting等待狀態,等該節點釋放或取消,就會通知後繼節點
      if (ws == Node.SIGNAL) 
              return true;
          //如果狀態大於0--取消狀態,就跳過該節點循環往前找,找到一個非cancel狀態的節點
          if (ws > 0) {
              do {
                  node.prev = pred = pred.prev;
              } while (pred.waitStatus > 0);
              //賦值pred的後繼節點為node節點
              pred.next = node;
          } else {  //如果狀態小於0
              //必須是PROPAGATE或者0--表示無狀態,當是-2的時候,在condition queue隊列當中
              //通過CAS設置pred節點狀態為signal
              compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
          }
          return false;
      }
  • parkAndCheckInterrupt()

    還有當該節點的前驅節點狀態為signal時,才可以將該節點所在線程pack起來,否則無法將線程pack。

      private final boolean parkAndCheckInterrupt() {
          //通過LockSupport工具阻塞當前線程
          LockSupport.park(this);
          return Thread.interrupted();  //清除中斷標識,返回清除前的標識
      }
  • cancelAcquire(Node node)

    該方法是取消節點所在線程對同步狀態的獲取,那說白了就是將節點的狀態改為cancelled.

      private void cancelAcquire(Node node) {
          // Ignore if node doesn‘t exist
          if (node == null)  //節點為空則返回
              return;
      ?
          node.thread = null;  //節點所在線程設為null
      ?
          // Skip cancelled predecessors
          //獲取node節點的前驅節點
          Node pred = node.prev;
          //循環獲取前驅節點的狀態,找到第一個狀態不為cancelled的前驅節點
          while (pred.waitStatus > 0)
              node.prev = pred = pred.prev;
      ?
          // predNext is the apparent node to unsplice. CASes below will
          // fail if not, in which case, we lost race vs another cancel
          // or signal, so no further action is necessary.
          //獲取pred節點的後繼節點
          Node predNext = pred.next;
    
          //設置node節點狀態為CANCELLED
          node.waitStatus = Node.CANCELLED;
      ?
          //如果node節點是tail節點,通過CAS設置tail節點為pred
          if (node == tail && compareAndSetTail(node, pred)) {
              //通過CAS將pred節點的next節點設置null
              compareAndSetNext(pred, predNext, null);
          } else {  //如果不是tail節點
      ?
              int ws;  //初始化node節點狀態變量
    
              /*
              *如果pred不是head節點,並且狀態是SIGNAL或者狀態小於0並且設置pred
              *狀態為SIGNAL成功,。並且pred所封裝的線程不為空
              */
              if (pred != head &&
                  ((ws = pred.waitStatus) == Node.SIGNAL ||
                   (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                  pred.thread != null) {
                  //獲取node節點的後繼節點
                  Node next = node.next;
                  //如果後繼節點部位null並且狀態不為cancelled
                  if (next != null && next.waitStatus <= 0)
                      //設置pred的後繼節點為next,也就是將pred的後繼節點不再是node
                      compareAndSetNext(pred, predNext, next);
              } else {
                  unparkSuccessor(node);  //釋放後繼節點
              }
      ?
              node.next = node; // help GC
          }
      }
  • unparkSuccessor(Node node)
      private void unparkSuccessor(Node node) {
          //獲取node節點的狀態
          int ws = node.waitStatus;
          if (ws < 0)  //如果狀態小於0,為SIGNAL -1 或 CONDITION -2 或 PROPAGATE -3
              //通過CAS將node節點狀態設置為0
              compareAndSetWaitStatus(node, ws, 0);
      ?
      //獲取node節點的後繼節點 
          Node s = node.next;
          //如果後繼節點為空或者狀態大於0--cancelled
          if (s == null || s.waitStatus > 0) {
              //後繼節點置為空
              s = null;
              //從tail節點開始往前遍歷
              for (Node t = tail; t != null && t != node; t = t.prev)
                  if (t.waitStatus <= 0)  //判斷狀態小於等於0,就是為了找到狀態不為cancelled的節點
                      s = t;  //找到最前的狀態小於等於0的節點
          }
          if (s != null)  //如果由以上方法找到的節點不為空
              //通過LockSupport工具釋放s節點封裝的線程
              LockSupport.unpark(s.thread);
      }

經過了以上的分析,我想我們對入列的代碼也有了一個比較好的了解吧,那我們也可以嘗試畫一下入列的流程圖。
技術分享圖片

出列

出列的操作相對於入列來說就真的是簡單的多了,畢竟入列的時候需要考慮的因素太多,要考慮前驅和後繼節點,還要考慮節點的狀態等等一堆因素,而出列就是指CLH隊列的頭部節點,所以麻煩的因素就會少了很多。

release(int arg)

我們廢話都不多說了,直接上代碼吧。

這也是以獨占模式來釋放對象

  public final boolean release(int arg) {
      if (tryRelease(arg)) {
          Node h = head;  //獲取head節點
          //如果head節點不為空並且狀態不為0,也就是初始節點
  if (h != null && h.waitStatus != 0) 
              unparkSuccessor(h);  //喚醒後繼節點
          return true;
      }
      return false;
  }
  • tryRelease(int arg)

    這個方法與入列的tryAcquire一樣,是只有一個異常的,也就是證明這個方法也是由自定義的同步組件自己去實現,在AQS同步器當中只是定義一個方法而已。

      protected boolean tryRelease(int arg) {
          throw new UnsupportedOperationException();
      }
  • unparkSuccessor(Node node)

    這個方法實際在入列的時候已經講過了,我直接搬上面的代碼解釋下來。

      private void unparkSuccessor(Node node) {
          //獲取node節點的狀態
          int ws = node.waitStatus;
          if (ws < 0)  //如果狀態小於0,為SIGNAL -1 或 CONDITION -2 或 PROPAGATE -3
              //通過CAS將node節點狀態設置為0
              compareAndSetWaitStatus(node, ws, 0);
      ?
      //獲取node節點的後繼節點 
          Node s = node.next;
          //如果後繼節點為空或者狀態大於0--cancelled
          if (s == null || s.waitStatus > 0) {
              //後繼節點置為空
              s = null;
              //從tail節點開始往前遍歷
              for (Node t = tail; t != null && t != node; t = t.prev)
                  if (t.waitStatus <= 0)  //判斷狀態小於等於0,就是為了找到狀態不為cancelled的節點
                      s = t;  //找到最前的狀態小於等於0的節點
          }
          if (s != null)  //如果由以上方法找到的節點不為空
              //通過LockSupport工具釋放s節點封裝的線程
              LockSupport.unpark(s.thread);
      }

    這上面就是出列也就是釋放的代碼了,其實看起來不是很難。

小結

花了整整3天左右的時間去看了一下AQS的源碼,會去看也純屬是想要把自己的並發方面的知識能夠豐富起來,但是這次看源碼也還是不太順利,因為很多代碼或者方法,單獨分開來看的時候或許能理解,感覺方法的作用也的確是那麽回事,但是當一整個流程串起來的時候也還是不太明白這樣做的具體作用,以及整個的執行流程。更加沒辦法理解那些自旋裏的代碼,每一次執行會出現怎樣的結果,對CLH隊列的影響。

不過,自己也是有收獲的,至少相較於一開始來說,自己對AQS有了一點皮毛的理解,不至於以後聞起來完完全全是一問三不知的狀態。

同時也希望我這篇文章能夠對想要了解AQS的程序猿能夠起一點作用,以後自己也還是將自己的一些學習心得或者資料共享出來。

參考資料

方騰飛:《Java並發編程的藝術》

如需轉載,請務必註明出處,畢竟一塊塊搬磚也不是容易的事情。

Java並發之AQS同步器學習