1. 程式人生 > >Java併發——執行緒間的等待與通知

Java併發——執行緒間的等待與通知

前言:

  前面講完了一些併發程式設計的原理,現在我們要來學習的是執行緒之間的協作。通俗來說就是,當前執行緒在某個條件下需要等待,不需要使用太多系統資源。在某個條件下我們需要去喚醒它,分配給它一定的系統資源,讓它繼續工作。這樣能更好的節約資源。

一、Object的wait()與notify()

  基本概念:

    一個執行緒因執行目標動作的條件未能滿足而被要求暫停就是wait,而一個執行緒滿足執行目標動作的條件之後喚醒被暫停的執行緒就是notify。

  基本模板:

synchronized (obj){
            //保護條件不成立
            while(flag){
                //暫停當前執行緒
                obj.wait();
            }
            //當保護條件成立,即跳出while迴圈執行目標動作
            doAction();
        }

  解析wait():Object.wait()的作用是使執行執行緒被暫停,該執行執行緒生命週期就變更為WAITING,這裡注意一下,是無限等待,直到有notify()方法通知該執行緒喚醒。Object.wait(long timeout)的作用是使執行執行緒超過一定時間沒有被喚醒就自動喚醒,也就是超時等待。Object.wait(long timeout,int naous)是更加精準的控制時間的方法,可以控制到毫微秒。這裡需要注意的是wait()會在當前執行緒擁有鎖的時候才能執行該方法並且釋放當前執行緒擁有的鎖,從而讓該執行緒進入等待狀態,其他執行緒來嘗試獲取當前鎖。也就是需要申請鎖與釋放鎖。

  解析notify():Object.notify()方法是喚醒呼叫了wait()的執行緒,只喚醒最多一個。如果有多個執行緒,不一定能喚醒我們所想要的執行緒。Object.notifyAll()喚醒所有等待的執行緒。notify方法一定是通知執行緒先獲取到了鎖才能進行通知。通知之後當前的通知執行緒需要釋放鎖,然後由等待執行緒來獲取。所以涉及到了一個申請鎖與釋放鎖的步驟。

  wait()與notify()之間存在的三大問題:

  從上面的解析可以看出,notify()是無指向性的喚醒,notifyAll()是無偏差喚醒。所以會產生下面三個問題

  過早喚醒:假設當前有三組等待(w1,w2,w3)與通知(n1,n2,n3)執行緒同步在物件obj上,w1,w2的判斷喚醒條件相同,由執行緒n1更新條件並喚醒,w3的判斷喚醒條件不同,由n2,n3更新條件並喚醒,這時如果n1執行了喚醒,那麼不能執行notify,因為需要叫醒兩條執行緒,只能用notifyAll(),可是用了之後w3的條件未能滿足就被叫醒,就需要一直佔用資源的去等待執行。

  訊號丟失:這個問題主要是程式設計師程式設計出現了問題,並不是內部實現機制出現的問題。程式設計時如果在該使用notifyAll()的地方使用notify()那麼只能喚醒一個執行緒,從而使其他應該喚醒的執行緒未能喚醒,這就是訊號丟失。如果等待執行緒在執行wait()方法前沒有先判斷保護條件是否成立,就會出現通知執行緒在該等待執行緒進入臨界區之前就已經更新了相關共享變數,並且執行了notify()方法,但是由於wait()還未能執行,且沒有設定共享變數的判斷,所以會執行wait()方法,導致執行緒一直處於等待狀態,丟失了一個訊號。

  欺騙性喚醒:等待執行緒並不是一定有notify()/notifyAll()才能被喚醒,雖然出現的概率特別低,但是作業系統是允許這種情況發生的。

  上下文切換問題:首先wait()至少會導致執行緒對相應物件內部鎖的申請與釋放。notify()/notifyAll()時需要持有相應的物件內部鎖並且也會釋放該鎖,會出現上下文切換問題其實就是從RUNNABLE狀態變為非RUNNABLE狀態會出現。

  針對問題的解決方案:

  訊號丟失與欺騙性喚醒問題:都可以使用while迴圈來避免,也就是上面的模板中寫的那樣。

  上下文切換問題:在保證程式正確性的情況下使用notify()代替notifyAll(),notify不會導致過早喚醒,所以減少了上下文的切換。並且使用了notify之後應該儘快釋放相應內部鎖,從而讓wait()能夠更快的申請到鎖。

  過早喚醒:使用java.util.concurrent.locks.Condition中的await與signal。

  PS:由於Object中的wait與notify使用的是native方法,即C++編寫,這裡不做原始碼解析。

二、Condition中的await()與signal()

  這個方法相應的改變了上面所說的無指向性的問題,每個Condition內部都會維護一個佇列,從而讓我們對執行緒之間的操作更加靈活。下面通過分析原始碼讓我們瞭解一下內部機制。Condition是個介面,真正的實現是AbstractQueuedSynchronizer中的內部類ConditionObject。

  基本屬性:

public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;
}

  從基本屬性中可看出維護的是雙端佇列。

  await()方法解析:

public class ConditionObject implements Condition, java.io.Serializable {
  public final void await() throws InterruptedException {
   // 1. 判斷執行緒是否中斷
    if(Thread.interrupted()){                       
        throw new InterruptedException();
    }
   // 2. 將執行緒封裝成一個 Node 放到 Condition Queue 裡面
    Node node = addConditionWaiter();
   // 3. 釋放當前執行緒所獲取的所有的鎖 (PS: 呼叫 await 方法時, 當前執行緒是必須已經獲取了獨佔的鎖)              
    int savedState = fullyRelease(node);           
    int interruptMode = 0;
   // 4. 判斷當前執行緒是否在 Sync Queue 裡面(這裡 Node 從 Condtion Queue 裡面轉移到 Sync Queue 裡面有兩種可能 
   //(1) 其他執行緒呼叫 signal 進行轉移 (2) 當前執行緒被中斷而進行Node的轉移(就在checkInterruptWhileWaiting裡面進行轉移)) while(!isOnSyncQueue(node)){      // 5. 當前執行緒沒在 Sync Queue 裡面, 則進行 block LockSupport.park(this);      // 6. 判斷此次執行緒的喚醒是否因為執行緒被中斷, 若是被中斷, 則會在checkInterruptWhileWaiting的transferAfterCancelledWait 進行節點的轉移;
if((interruptMode = checkInterruptWhileWaiting(node)) != 0){      // 說明此是通過執行緒中斷的方式進行喚醒, 並且已經進行了 node 的轉移, 轉移到 Sync Queue 裡面 break; } }    // 7. 呼叫 acquireQueued在 Sync Queue 裡面進行獨佔鎖的獲取, 返回值表明在獲取的過程中有沒有被中斷過 if(acquireQueued(node, savedState) && interruptMode != THROW_IE){ interruptMode = REINTERRUPT; }    // 8. 通過 "node.nextWaiter != null" 判斷 執行緒的喚醒是中斷還是 signal。
   //因為通過中斷喚醒的話, 此刻代表執行緒的 Node 在 Condition Queue 與 Sync Queue 裡面都會存在 if(node.nextWaiter != null){     // 9. 進行 cancelled 節點的清除 unlinkCancelledWaiters(); }    // 10. "interruptMode != 0" 代表通過中斷的方式喚醒執行緒 if(interruptMode != 0){      // 11. 根據 interruptMode 的型別決定是丟擲異常, 還是自己再中斷一下 reportInterruptAfterWait(interruptMode); }   } }

  上面原始碼可看出Condition內部維護的佇列是一個等待佇列,當需要呼叫signal()方法時就會讓當前執行緒節點從Condition queue轉到Sync queue佇列中去競爭鎖從而喚醒。

  signal()原始碼解析:

public class ConditionObject implements Condition, java.io.Serializable {
    public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }
    private void doSignal(Node first) {
            do {
                //傳入的連結串列下一個節點為空,則尾節點置空
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                //當前節點的下一個節點為空
                first.nextWaiter = null;
                //如果成功將node從condition queue轉換到sync queue,則退出迴圈,節點為空了也退出迴圈。否則就接著在佇列中找尋節點進行喚醒
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
} 

  signal()會使等待佇列中的一個任意執行緒被喚醒,signalAll()則是喚醒該佇列中的所有執行緒。這樣通過不同佇列維護不同執行緒,就可以達到指向性的功能。可以消除由過早喚醒帶來的資源損耗。注意的是在使用signal()方法前需要獲取鎖,即lock(),而後需要儘快unlock(),這樣可以避免上下文切換的損耗。

總結:

  面向物件的世界中,一個類往往需要藉助其他的類來一起完成計算,同樣執行緒的世界也是,多個執行緒可以同時完成一個任務,通過喚醒與等待,能更好的操作執行緒,從而讓執行緒在需要使用資源的時候分配資源給它,而不使用資源的時候就可以將資源讓給其他執行緒操作。關於Condition中提到的Sync queue可參考Java併發——結合CountDownLatch原始碼、Semaphore原始碼及ReentrantLock原始碼來看AQS原理來看內部維護的佇列是如何獲取鎖