1. 程式人生 > >Java多執行緒之Condition實現原理和原始碼分析(四)

Java多執行緒之Condition實現原理和原始碼分析(四)

章節概覽、

1、概述

上面的幾個章節我們基於lock(),unlock()方法為入口,深入分析了獨佔鎖的獲取和釋放。這個章節我們在此基礎上,進一步分析AQS是如何實現await,signal功能。其功能上和synchronize的wait,notify一樣。但是本質是也是有很多區別。

1.1、 Condition和synchronize各自實現的等待喚醒區別
  1. Condition是基於AQS 佇列同步器實現的。而synchronize的等待喚醒是基於jvm的語義層次上實現的。
  2. Condition的使用不管是await,signal都有維護各自的一個阻塞佇列。而在synchronize所有的阻塞執行緒都被放到同一個阻塞佇列裡面,所以在多執行緒的情況下,可能存在早喚醒,喚醒失敗等情況。

2、入門案例

入門案例採用的是大家比較熟悉的生產者和消費者。

public class ProducerConsumerTest {

    private Lock lock = new ReentrantLock();

    private Condition addCondition = lock.newCondition();

    private Condition removeCondition = lock.newCondition();

    private LinkedList<Integer> resources = new LinkedList<>();

    private int maxSize;

    public ProducerConsumerTest(int maxSize) {
        this.maxSize = maxSize;
    }


    public class Producer implements Runnable {

        private int proSize;

        private Producer(int proSize) {
            this.proSize = proSize;
        }

        @Override
        public void run() {
            lock.lock();
            try {
                for (int i = 1; i < proSize; i++) {
                    while (resources.size() >= maxSize) {
                        System.out.println("當前倉庫已滿,等待消費...");
                        try {
                            addCondition.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println("已經生產產品數: " + i + "\t現倉儲量總量:" + resources.size());
                    resources.add(i);
                    removeCondition.signal();
                }
            } finally {
                lock.unlock();
            }
        }
    }

    public class Consumer implements Runnable {

        @Override
        public void run() {
            String threadName = Thread.currentThread().getName();
            while (true) {
                lock.lock();
                try {
                    while (resources.size() <= 0) {
                        System.out.println(threadName + " 當前倉庫沒有產品,請稍等...");
                        try {
                            // 進入阻塞狀態
                            removeCondition.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    // 消費資料
                    int size = resources.size();
                    for (int i = 0; i < size; i++) {
                        Integer remove = resources.remove();
                        System.out.println(threadName + " 當前消費產品編號為:" + remove);
                    }
                    // 喚醒生產者
                    addCondition.signal();
                } finally {
                    lock.unlock();
                }
            }

        }
    }

    public static void main(String[] args) throws InterruptedException {
        ProducerConsumerTest producerConsumerTest = new ProducerConsumerTest(10);
        Producer producer = producerConsumerTest.new Producer(100);
        Consumer consumer = producerConsumerTest.new Consumer();
        final Thread producerThread = new Thread(producer, "producer");
        final Thread consumerThread = new Thread(consumer, "consumer");
        producerThread.start();
        TimeUnit.SECONDS.sleep(2);
        consumerThread.start();
    }
}

其中維護了一個儲存倉庫。當生產者把生產的物品放入到倉庫中,直到倉庫填滿,進行阻塞等待,此時生產者會釋放當前的鎖。剛開始,消費者會檢查當前倉庫是否有物品,如果沒有物品進行阻塞,等待喚醒。當生產者喚醒消費者,消費者進行消費。我們以此案例的進行逐步分析。

3、 Condition原始碼分析

3.1、 Condition介面原始碼分析
public interface Condition {
	// 當前執行緒進入等待狀態直到被通知(signal)或者中斷
	// 當前執行緒進入執行狀態並從await()方法返回的場景包括:
	//(1)其他執行緒呼叫相同Condition物件的signal/signalAll方法,並且當前執行緒被喚醒;
	//(2)其他執行緒呼叫interrupt方法中斷當前執行緒;
    void await() throws InterruptedException;
    
    // 當前執行緒進入等待狀態直到被通知,在此過程中對中斷訊號不敏感,不支援中斷當前執行緒
    void awaitUninterruptibly();
    
    // 當前執行緒進入等待狀態,直到被通知、中斷或者超時。如果返回值小於等於0,可以認定就是超時了
    boolean await(long time, TimeUnit unit) throws InterruptedException;
	
	// 當前執行緒進入等待狀態,直到被通知、中斷或者超時。如果返回值小於等於0,可以認定就是超時了
	long awaitNanos(long nanosTimeout) throws InterruptedException;
	
	//當前執行緒進入等待狀態,直到被通知、中斷或者超時。如果沒到指定時間被通知,則返回true,否則返回false
	boolean awaitUntil(Date deadline) throws InterruptedException;

	// 喚醒一個等待在Condition上的執行緒,被喚醒的執行緒在方法返回前必須獲得與Condition物件關聯的鎖
	void signal();

	// 喚醒所有等待在Condition上的執行緒,能夠從await()等方法返回的執行緒必須先獲得與Condition物件關聯的鎖
	void signalAll();
}
3.2、Condition 實現類成員變數 建構函式說明

Condition的實現類為:java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject。從類的引用方式可以看出,其是AbstractQueuedSynchronizer的一個內部類。

 public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        /** Condition Queue 裡面的頭節點 */
        private transient Node firstWaiter;
        /** Condition Queue 裡面的尾節點 */
        private transient Node lastWaiter;

       /** 建構函式 */
        public ConditionObject() { }
	  // 下面兩個是用於追蹤 呼叫 awaitXXX 方法時執行緒有沒有被中斷過
	  // REINTERRUPT: 代表執行緒是在 signal 後被中斷的 (REINTERRUPT = re-interrupt再次中斷最後會呼叫 selfInterrupt)
	  // THROW_IE: 代表在接受 signal 前被中斷的, 則直接丟擲異常 (Throw_IE = throw inner exception)
		private static final int REINTERRUPT = 1;
		private static final int THROW_IE = -1;

 }

從原始碼分析可以看出,每個Condition例項裡面都會維護著一個連結串列。通過 firstWaiter,lastWaiter進行儲存。

4、wait()方法核心原始碼分析

4.1、 await()
 public final void await() throws InterruptedException {
 // 判斷當前執行緒是否被阻塞,如果被阻塞,直接丟擲InterruptedException異常
	  if (Thread.interrupted())
	         throw new InterruptedException();
		// 將當前執行緒新增到Condition等待佇列中
		// 詳情請看: 4.2 小節addConditionWaiter()原始碼分析
	     Node node = addConditionWaiter();
	   // 釋放當前擁有的鎖資源:4.3小節fullyRelease(Node node)原始碼分析
	     int savedState = fullyRelease(node);
	     int interruptMode = 0;
	     // 判斷當前節點是否在佇列同步器中。如果在同步器中,阻塞當前執行緒
	     // 死迴圈,直到其他執行緒喚醒當前執行緒
		// 具體判斷是否在同步佇列中,請參考4.4 小節:boolean isOnSyncQueue(Node node)
	     while (!isOnSyncQueue(node)) {
	     // 阻塞當前執行緒,直到其他執行緒對其喚醒
	         LockSupport.park(this);
	      // 檢查 在 awaitXX 方法中的這次喚醒是否是中斷引起的
	      // 中斷檢測原始碼請請4.5小節:int checkInterruptWhileWaiting(Node node)
	         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
	             break;
	     }
	     // 呼叫 acquireQueued在 Sync Queue 裡面進行 獨佔鎖的獲取, 返回值表明在獲取的過程中有沒有被中斷過
	     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
	         interruptMode = REINTERRUPT;
	     // 如果當前lnode節點的nextWaiter != null,則清理當前佇列
	     if (node.nextWaiter != null) // clean up if cancelled
	         unlinkCancelledWaiters();
	    // 根據中斷模式返回異常情況
	     if (interruptMode != 0)
	         reportInterruptAfterWait(interruptMode);
	 }
4.2 、 addConditionWaiter()

當前方法位置:java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#addConditionWaiter+

 private Node addConditionWaiter() {
   Node t = lastWaiter;
     // If lastWaiter is cancelled, clean out.
     // 如果lastWaiter不為空,且其狀態不等於 Node.CONDITION。將當前節點刪除
     if (t != null && t.waitStatus != Node.CONDITION) {
     // 刪除當前節點,詳情檢視:4.2.1小結unlinkCancelledWaiters()分析
         unlinkCancelledWaiters();
         // 重新複製當前node節點t節點
         t = lastWaiter;
     }
     // 建立一個node節點,設定狀態為:Node.CONDITION
     Node node = new Node(Thread.currentThread(), Node.CONDITION);
     // 如果lastWaiter()為空,設定 firstWaiter = node
     // 如果lastWaiter不為空,直接加入t.nextWaiter = node
     if (t == null)
         firstWaiter = node;
     else
         t.nextWaiter = node;
     // 設定當前節點為lastWaiter節點
     lastWaiter = node;
     return node;
 }
4.2.1、unlinkCancelledWaiters()

// 清理Condition佇列中被中斷或者過期的節點 // trail 節點一直維護者一個符合Condition條件佇列的節點。如果當前符合要求,直接尾部追加到trail中。如果當前節點不合法,直接跳過

 private void unlinkCancelledWaiters() {
 	 // 賦值當前頭節點
      Node t = firstWaiter;
      // trail 是中間儲存變數
      Node trail = null;
      // 迴圈遍歷,直到 t == null 退出
      while (t != null) {
          Node next = t.nextWaiter;
          // 遍歷當前不符合Condition佇列規則的,將其清理
          if (t.waitStatus != Node.CONDITION) {
          // 設定當前 t節點的nextWaiter = null,方便 gc
              t.nextWaiter = null;
              // 如果 trail 為空。說明當前不合法的節點是firstWaiter,重新對其進行賦值
              if (trail == null)
                  firstWaiter = next;
              else
              // 如果當前節點不合法,則將當前節點的next 賦值給 trail.nextWaiter 節點
                  trail.nextWaiter = next;
              if (next == null)
                  lastWaiter = trail;
          }
          else
          // 當前節點賦值給t
              trail = t;
          // t賦值為下一個節點
          t = next;
      }
  }
4.3、 fullyRelease(Node node)
完全釋放當前節點的鎖
final int fullyRelease(Node node) {
    boolean failed = true;
      try {
      // 獲取系統中當前的state狀態(可衝入鎖,其狀態可能不為1)
          int savedState = getState();
         // 釋放當前節點的鎖,具體細節請參考上一章節
          if (release(savedState)) {
          // 釋放成功,重置failed的值
              failed = false;
              return savedState;
          } else {
          // 釋放失敗,丟擲異常
              throw new IllegalMonitorStateException();
          }
      } finally {
      // 如果釋放失敗,將當前的node節點設定為刪除狀態
          if (failed)
              node.waitStatus = Node.CANCELLED;
      }
  }
4.4、boolean isOnSyncQueue(Node node)
final boolean isOnSyncQueue(Node node) {
		// 判斷node節點的狀態是否為:Node.CONDITION 或者 其前驅節點為 null,直接返回為 false
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        // 如果其後驅節點不為null,直接返回為true。退出當前迴圈
        if (node.next != null) // If has successor, it must be on queue
            return true;
    	
    	/* node.prev 可以是非null,但尚未在佇列中。因為
          *將其置於佇列中的CAS可能會失敗。 所以我們必須這樣做
          *從尾部遍歷,以確保它實際上成功。 它
          *在呼叫此方法時,它總是接近尾部,並且
          *除非CAS失敗。
         */
         // 詳情請檢視4.4.1 小結:findNodeFromTail(Node node)
        return findNodeFromTail(node);
    }
4.4.1、findNodeFromTail(Node node)
 從佇列尾部進行遍歷,檢視node節點是否存在於佇列中
 private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) {
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }
4.5、int checkInterruptWhileWaiting(Node node)
/**
 * 檢查 在 awaitXX 方法中的這次喚醒是否是中斷引起的
 * 若是中斷引起的, 則將 Node 從 Condition Queue 轉移到 Sync Queue 裡面
 * 返回值的區別:
 *      0: 當前執行緒沒有被中斷,此次喚醒是通過 signal 
 *      THROW_IE: 此次的喚醒是通過 interrupt, 並且在接受 signal 之前
 *      REINTERRUPT: 執行緒的喚醒是 接受過 signal 而後又被中斷
 */
private int checkInterruptWhileWaiting(Node node) {
     return Thread.interrupted() ?
	     // 將 Node 從 Condition Queue 轉移到 Sync Queue 裡面
	     // 詳細原始碼分析參考4.5.1 小節:transferAfterCancelledWait(Node node)
         (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
         0;
 }
4.5.1、 transferAfterCancelledWait(Node node)
final boolean transferAfterCancelledWait(Node node) {
// 通過cas修改當前節點的狀態為0,獨佔鎖預設狀態
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        新增到 Syn的佇列同步器中
            enq(node);
            return true;
        }
        /*
         * If we lost out to a signal(), then we can't proceed
         * until it finishes its enq().  Cancelling during an
         * incomplete transfer is both rare and transient, so just
         * spin.
         */
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }

至此wait()對應的原始碼已經分析完成。主要存在以下幾個核心步驟:

  1. 將當前節點新增到Condition 所維護的佇列中,尾部進行追加;
  2. 釋放當前執行緒鎖持有的鎖;
  3. 迴圈判斷當前執行緒是否在Sync的佇列同步器中;
  4. 如果存在於佇列同步器中,加下來的執行緒交給AQS自己去處理了;

5、signal() 核心原始碼分析

5.1、signal() 入口方法

java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal

 public final void signal() {
 // 判斷當前執行緒是否擁有鎖資源
       if (!isHeldExclusively())
           throw new IllegalMonitorStateException();
       Node first = firstWaiter;
       if (first != null)
       	// 喚醒頭結點
        doSignal(first);
   }
5.2 、void doSignal(Node first)

java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#doSignal

private void doSignal(Node first) {
      do {
             if ( (firstWaiter = first.nextWaiter) == null)
                 lastWaiter = null;
             first.nextWaiter = null;
             // 將當前節點新增到Sync的佇列中,通過unlock()方法的呼叫,進行喚醒
         } while (!transferForSignal(first) &&
                  (first = firstWaiter) != null);
     }
5.3、 boolean transferForSignal(Node node)
final boolean transferForSignal(Node node) {
     // 設定當前節點的狀態為0,通過cas,設定失敗直接返回false
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        // 將當前的節點新增到Sync的佇列同步器中,同時返回當前節點的前一個節點
        Node p = enq(node);
        int ws = p.waitStatus;
        // 如果當前節點的ws > 0 或者設定當前節點的狀態
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
		// 喚醒當前執行緒
            LockSupport.unpark(node.thread);
        return true;
    }

至此signal 原始碼已經分析完成,signal方法主要是將當前的Condition的first節點,轉移到Sync的FIFO中。等待喚醒操作。

6、總結

至此,AQS的核心知識點原始碼分析完成,本人能力有限,如果不妥的地方,歡迎指正。