1. 程式人生 > >Java-多執行緒-wait/notify

Java-多執行緒-wait/notify

Java-多執行緒-wait/notify

摘要

wait notify 還有個notifyAll都是執行緒通訊的常用手段。本文會簡要介紹其底層實現原理,並和Conditionawaitsignal方法作對比。

有一個先導概念就是物件鎖和類鎖,他們其實都是物件監視器Object Monitor,只不過類鎖是類物件的監視器,可以看另一篇文章:
Java-併發-鎖-synchronized之物件鎖和類鎖

在呼叫wait和notify之前必須持有物件鎖,那麼就必須瞭解synchronized,可以參閱文章:
Java-併發-鎖-synchronized

更多關於Java鎖的資訊,可參考文章:

Java-併發-關於鎖的一切

0x01 wait

1.1 基本概念

  • 作用
    顧名思義,wait其實就是執行緒用來做阻塞等待的。
  • 超時引數
    在JDK的Object中,wait方法分為帶引數和無引數版本,這裡說的引數就是等待超時的引數。
  • 中斷
    其他執行緒在當前執行緒執行wait之前或正在wait時,對當前執行緒呼叫中斷interrupted方法,會丟擲InterruptedException,且中斷標記會被自動清理。

先看沒有引數的版本:

/**
 * 讓當前執行緒等待到指定Object,直到其他執行緒呼叫該物件的notify或notifyAll方法喚醒
 * 該方法等價於呼叫wait(0)
 * 
 * 注意 呼叫該方法前提是擁有該物件的物件鎖。否則會報錯丟擲IllegalMonitorStateException
 * 
 * 當擁有物件鎖並呼叫wait方法時,會釋放物件鎖,
 * 然後等待,直到其他執行緒呼叫該物件的notify或notifyAll方法喚醒那些wait在該物件鎖上的執行緒。
 * 喚醒之後,該執行緒會嘗試去獲取物件鎖,拿不到就等到直到拿到
 * 拿到物件鎖後繼續執行程式。
 *
 * 在單引數的wait方法版本中,中斷和意料之外的喚醒是可能的所以應該這麼做:
 *     synchronized (obj) {
 *         while (condition does not hold)
 *             obj.wait();
 *         ... // Perform action appropriate to condition
 *     }
 *
 * @throws  IllegalMonitorStateException  呼叫執行緒未持有該物件的物件鎖.
 * @throws  InterruptedException 
 * @see        java.lang.Object#notify()
 * @see        java.lang.Object#notifyAll()
 */
public final void wait() throws InterruptedException { wait(0); }

再看看帶1個引數版本的wait方法:

/**
 * 讓當前執行緒等待到指定Object,直到其他執行緒呼叫該物件的notify或notifyAll方法喚醒
 * 或是指定wait超時時間耗盡
 * 
 * 注意 呼叫該方法前提是擁有該物件的物件鎖。否則會報錯丟擲IllegalMonitorStateException
 * 
 * 該方法的原理:
 * 1.呼叫wait方法的執行緒將自己加入該物件的等待結合中
 * 2.然後放棄所有和該物件相關的同步鎖宣告
 * 3.該呼叫執行緒隨後就不能被排程器排程執行了,進入休眠狀態直到以下情況發生:
 *     1.其他執行緒對目標物件呼叫notify方法,剛好選中該執行緒被喚醒
 *     2.其他執行緒對目標物件呼叫notifyAll方法喚醒所有執行緒
 *     3.其他執行緒對該執行緒呼叫interrupt方法發起中斷
 *     4.指定的超時時間耗盡,前提是超時時間不是0
 * 4.該執行緒被喚醒後,從等待該物件的集合中移除,又可以被排程執行了
 * 5.此時會跟其他執行緒一起競爭該物件的同步鎖
 * 6.一旦該執行緒拿到物件同步鎖,所有在wait方法執行前的同步說明都重新起效
 * 7.然後該執行緒就從wait方法中返回了,該過程結束
 * 
 * 除了上面提到的幾種喚醒場景,還有一種極少發生的情況會喚醒執行緒,稱為`偽喚醒`
 * 為了預防,所以應該這麼做:
 *    synchronized (obj) {
 *         while (condition does not hold)
 *             obj.wait();
 *         ... // Perform action appropriate to condition
 *    }
 *
 * <p>If the current thread is {@linkplain java.lang.Thread#interrupt()
 * interrupted} by any thread before or while it is waiting, then an
 * {@code InterruptedException} is thrown.  This exception is not
 * thrown until the lock status of this object has been restored as
 * described above.
 * 這段話沒看的太明白?
 *
 * 注意這個wait方法只會讓該執行緒釋放當前Object的物件鎖,而不會放棄擁有的其他物件鎖!
 * 
 *
 * @param      timeout   the maximum time to wait in milliseconds.
 * @throws  IllegalArgumentException      if the value of timeout is
 *               negative.
 * @throws  IllegalMonitorStateException  if the current thread is not
 *               the owner of the object's monitor.
 * @throws  InterruptedException 
 * @see        java.lang.Object#notify()
 * @see        java.lang.Object#notifyAll()
 */
public final native void wait(long timeout) throws InterruptedException;

1.2 實現原理

可以先點選這裡回顧下關於ObjectWatier的知識。

然後我們繼續分析底層原始碼。

wait/notify/notifyAll程式碼主要在jdk8/hotspot/src/share/vm/runtime/synchronizer.cpp裡。

1.2.1 ObjectSynchronizer::wait

下面看看wait方法底層實現,摘錄部分核心程式碼如下:

// 注意,必須使用重量級monitor來處理wait方法
// 第一個引數是控制代碼指向了我們wait的目標Object
// 第二個引數是wait的毫秒數
// 第三個是呼叫wait的執行緒
void ObjectSynchronizer::wait(Handle obj, jlong millis, TRAPS) {
  if (UseBiasedLocking) {
  // 如果開啟了偏向鎖
    // 嘗試獲取該偏向鎖,注意偏向鎖是可重入的
    BiasedLocking::revoke_and_rebias(obj, false, THREAD);
    assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
  }
  if (millis < 0) {
  // wait超時時間不可小於0
    TEVENT (wait - throw IAX) ;
    THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "timeout value is negative");
  }
  // 膨脹為重量級鎖,得到該ObjectMonitor
  ObjectMonitor* monitor = ObjectSynchronizer::inflate(THREAD, obj());
  // 呼叫該monitor的wait方法
  monitor->wait(millis, true, THREAD);
}

1.2.2 ObjectSynchronizer::wait

ObjectMonitor相關程式碼在

  • /Users/chengc/cc/work/projects/jdk8/hotspot/src/share/vm/runtime/objectMonitor.hpp
  • /Users/chengc/cc/work/projects/jdk8/hotspot/src/share/vm/runtime/objectMonitor.cpp

下面看看wait方法,摘錄部分核心程式碼如下:

void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS){
// 以本身的ObjectMonitor建立一個ObjectWaiter
ObjectWaiter node(Self);
// 將該ObjectWaiter狀態設為TS_WAIT
node.TState = ObjectWaiter::TS_WAIT ;
// 在這個AddWaiter時出現執行緒競爭的情況很少,所以採用了輕量級的自旋鎖
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
// 新增該ObjectWaiter node到雙向連結串列WaitSet中
AddWaiter (&node) ;
Thread::SpinRelease (&_WaitSetLock) ;
// 累加waiter
_waiters++;
// 釋放ObjectMonitor
// 當呼叫返回後,其他執行緒就可以使用enter()方法競爭ObjectMonitor了
exit (true, Self) ;    
// 執行緒現在可以用park()方法阻塞了
// 程式碼註釋說以後要 change the following logic to a loop of the form
//  while (!timeout && !interrupted && _notified == 0) park()               
}

1.2.3 ObjectMonitor::AddWaiter

使用了佇列的尾插法,到WaitSet

inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {
  // 插入雙向連結串列組成的佇列的尾部
  if (_WaitSet == NULL) {
    _WaitSet = node;
    node->_prev = node;
    node->_next = node;
  } else {
    ObjectWaiter* head = _WaitSet ;
    ObjectWaiter* tail = head->_prev;
    assert(tail->_next == head, "invariant check");
    tail->_next = node;
    head->_prev = node;
    node->_next = head;
    node->_prev = tail;
  }
}

1.2.4 os::PlatformEvent::park()

該方法在jdk8/hotspot/src/os/linux/vm/os_linux.cpp,主要是通過以下程式碼實現阻塞:

pthread_mutex_lock(_mutex)
while (_Event < 0) {
   status = pthread_cond_wait(_cond, _mutex);
   // for some reason, under 2.7 lwp_cond_wait() may return ETIME ...
   // Treat this the same as if the wait was interrupted
   if (status == ETIME) { status = EINTR; }
   assert_status(status == 0 || status == EINTR, status, "cond_wait");
}
pthread_mutex_unlock(_mutex);

0x02 notify

2.1 基本概念

  • 該方法用來任意喚醒一個在物件鎖的等待集的執行緒(其實看了原始碼會發現不是任意的,而是一個WaitQueue,FIFO)。
  • 但要注意,被喚醒的執行緒不會馬上開始執行,因為物件鎖還被呼叫notify的執行緒擁有,直到退出synchronized塊。
  • 喚醒後的執行緒跟其他執行緒一起競爭該同步物件鎖。
  • 注意,該方法和wait方法一樣也必須是擁有該物件同步物件鎖的執行緒才能呼叫,否則丟擲IllegalMonitorStateException
public final native void notify();

2.2 實現原理

2.2.1 ObjectSynchronizer::notify

void ObjectSynchronizer::notify(Handle obj, TRAPS) {
 // 也是先用偏向鎖
 if (UseBiasedLocking) {
    BiasedLocking::revoke_and_rebias(obj, false, THREAD);
    assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
  }
  // 獲取物件頭的MarkWord
  markOop mark = obj->mark();
  if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
  // 如果擁有的是輕量級鎖就直接返回了
    return;
  }
  // 否則膨脹為重量級鎖,呼叫得到的ObjectMonitor的notify方法
  ObjectSynchronizer::inflate(THREAD, obj())->notify(THREAD);
}

2.2.2 ObjectMonitor::notify(TRAPS)

摘錄部分核心程式碼如下:

void ObjectMonitor::notify(TRAPS) {
  // 檢查當前執行緒是否擁有該ObjectMonitor
  CHECK_OWNER();
  ObjectWaiter* iterator;
  if (_WaitSet == NULL) {
  // 如果WaitSet為空就返回了
    TEVENT (Empty-NotifyAll) ;
    return ;
  }
  // 自旋鎖方式獲取_WaitSetLock
  Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;
  // 獲取WaitSet首節點,並從WaitSet移除該節點
  ObjectWaiter * iterator = DequeueWaiter() ;
  // 在此之後根據Knob_MoveNotifyee不同,對該節點做不同處理,如加入EntryList等
  // 也就是說讓該執行緒能重新競爭ObjectMonitor

  // 最後釋放_WaitSetLock
  Thread::SpinRelease (&_WaitSetLock) ;
}

0x03 notifyAll

3.1 基本概念

  • 該方法用來喚醒所有在物件鎖的等待集的執行緒。
  • 但要注意,被喚醒的執行緒不會馬上開始執行,因為物件鎖還被呼叫notifyAll的執行緒擁有。
  • 喚醒後的執行緒跟其他執行緒一起競爭該同步物件鎖。
  • 注意,該方法和wait方法一樣也必須是擁有該物件同步物件鎖的執行緒才能呼叫,否則丟擲IllegalMonitorStateException
public final native void notifyAll();

3.2 實現原理

跟notify差不多,其實就是迴圈的方式把WaitSet裡的執行緒節點全部取出,放入EntryList。

0x04 wait與sleep比較

經常面試會問這個問題,往往我們都是網上查資料死記硬背。現在我們都看完了原始碼(sleep原始碼點這裡),可以得出以下結論

  1. wait會釋放ObjectMonitor控制權;sleep不會
  2. wait邏輯複雜,需要首先呼叫synchronized獲取ObjectMonitor控制權,才能呼叫wait,且wait後還有放入WaitSet邏輯,喚醒時還有一系列複雜操作;而sleep實現簡單,不需要別的執行緒喚醒
  3. wait與sleep都能被中斷(除了sleep(0),當然對他中斷沒有意義)

0x05 Condition.await/signal對比wait/notify

關於Condition介紹可以參考這篇文章:Java-併發-Condition

5.1 Condition和Object關係

等待 喚醒 喚醒全部
Object wait notify notifyAll
Condition await signal signalAll

5.2 wait和await對比

中斷 超時精確 Deadline
wait 可中斷 可為納秒 不支援
await 支援可中斷/不可中斷 可為納秒 支援

5.3 notify和signal對比

全部喚醒 喚醒順序 執行前提 邏輯
notify 支援,notifyAll 隨機(jdk寫的,其實cpp原始碼是一個wait_queue,FIFO) 擁有鎖 從wait_list取出,放入entry_list,重新競爭鎖
signal 支援,signalAll 順序喚醒 擁有鎖 從condition_queue取出,放入wait_queue,重新競爭鎖

5.4 底層原理對比

  • Object的阻塞和喚醒,前基於synchronized的。底層實現是在cpp級別,呼叫synchronized的執行緒物件會放入entry_list,競爭到鎖的執行緒處於active狀態。呼叫wait方法後,執行緒物件被放入wait_queue。而notify會按FIFO方法從wait_queue中取得一個物件並放回entry_list,這樣該執行緒可以重新競爭synchronized同步鎖了。
  • Condition的阻塞喚醒,是基於lock的。lock維護了一個wait_queue,用於存放等待鎖的執行緒。而Condition也維護了一個condition_queue。當擁有鎖的執行緒呼叫await方法,就會被放入condition_queue;當呼叫signal方法,會從condition_queue選頭一個滿足要求的節點移除然後放入wait_queue,重新競爭lock。

5.5 應用場景對比

  • Object使用比較單一,只能針對一個條件。
  • 一個ReentrantLock可以有多個Condition,對應不同條件。比如在生產者消費者可以這樣實現:
private static ReentrantLock lock = new ReentrantLock();
	
private static Condition notEmpty = lock.newCondition();
private static Condition notFull = lock.newCondition();

// 生產者
public void produce(E item) {
	lock.lock();
	try {
		while(isFull()) {
		// 資料滿了,生產者就阻塞,等待消費者消費完後喚醒
			notFull.await();
		}
		
		// ...生產資料程式碼
		
		// 喚醒消費者執行緒,告知有資料了,可以消費
		notEmpty.signal();
	} catch (InterruptedException e) {
		e.printStackTrace();
	} finally {
		lock.unlock();
	}
}

// 消費者
public E consume() {
	lock.lock();
	try {
		while(isEmpty()) {
		// 資料空了,消費者就阻塞,等待生產者生產資料後喚醒
			notEmpty.await();
		}
		
		// ...消費資料程式碼
		
		// 喚醒生產者者執行緒,告知有資料了,可以消費
		notFull.signal();
		return item;
	} catch (InterruptedException e) {
		e.printStackTrace();
	} finally {
		lock.unlock();
	}
	return null;
}

這樣好處就很明顯了。如果使用Object,那麼喚醒的時候也許就喚醒了同類的角色執行緒。而使用condition可以在只有一個鎖的情況下,實現我們想要的只喚醒對方角色執行緒的功能。

0x06 總結

Object的阻塞和喚醒,是基於synchronized的。底層實現是在cpp級別。整個流程串起來如下:

  1. 呼叫synchronized的執行緒物件會放入entry_list
  2. 成功競爭到鎖的那個執行緒處於active狀態
  3. 呼叫wait方法後,執行緒物件被放入wait_queue
  4. notify會按FIFO方法從wait_queue中取得一個物件,並放回entry_list
  5. 呼叫wait的執行緒釋放鎖
  6. 此後該執行緒可以重新競爭synchronized同步鎖了
  7. 競爭到鎖的程式,可以繼續同步塊中的執行程式碼了

更多關於Java鎖的資訊,可參考文章:Java-併發-關於鎖的一切

0xFF 參考文件

JVM原始碼分析之Object.wait/notify實現