jvm原始碼分析之interrupt()
概述
執行緒的thread.interrupt()方法是中斷執行緒。中斷一個執行緒意味著線上程完成它的任務之前,停止它當前正在執行的操作。
如果執行緒堵塞在object.wait、Thread.join和Thread.sleep,將會清除執行緒的中斷狀態,並丟擲InterruptedException;
如果執行緒堵塞在java.nio.channels.InterruptibleChannel的IO上,Channel將會被關閉,執行緒被置為中斷狀態,並丟擲java.nio.channels.ClosedByInterruptException;
如果執行緒堵塞在java.nio.channels.Selector上,執行緒被置為中斷狀態,select方法會馬上返回,類似呼叫wakeup的效果;
如果不是以上三種情況,thread.interrupt()方法僅僅是設定執行緒的中斷狀態為true。
interrupt方法的jvm原始碼分析
在jvm的Thread類中有三個成員變數:(javaThread就是繼承了這個Thread類)
public: ParkEvent * _ParkEvent ; // for synchronized() ParkEvent * _SleepEvent ; // for Thread.sleep // JSR166 per-thread parker private: Parker* _parker;
interrupt方法的jvm原始碼入口在jvm.cpp檔案:
JVM_ENTRY(void, JVM_Interrupt(JNIEnv* env, jobject jthread)) JVMWrapper("JVM_Interrupt"); // Ensure that the C++ Thread and OSThread structures aren't freed before we operate oop java_thread = JNIHandles::resolve_non_null(jthread); MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock); // We need to re-resolve the java_thread, since a GC might have happened during the // acquire of the lock JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)); if (thr != NULL) { Thread::interrupt(thr); } JVM_END
JVM_Interrupt對引數進行了校驗,然後直接呼叫Thread::interrupt:
void Thread::interrupt(Thread* thread) {
trace("interrupt", thread);
debug_only(check_for_dangling_thread_pointer(thread);)
os::interrupt(thread);
}
Thread::interrupt呼叫os::interrupt方法實現:
void os::interrupt(Thread* thread) {
assert(Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");
//獲取系統native執行緒物件
OSThread* osthread = thread->osthread();
if (!osthread->interrupted()) {
osthread->set_interrupted(true); //設定中斷狀態為true
//記憶體屏障,使osthread的interrupted狀態對其它執行緒立即可見
OrderAccess::fence();
//前文說過,_SleepEvent用於Thread.sleep,執行緒呼叫了sleep方法,則通過unpark喚醒
ParkEvent * const slp = thread->_SleepEvent ;
if (slp != NULL) slp->unpark() ;
}
//_parker用於concurrent相關的鎖,此處同樣通過unpark喚醒
if (thread->is_Java_thread())
((JavaThread*)thread)->parker()->unpark();
//Object.wait()喚醒
ParkEvent * ev = thread->_ParkEvent ;
if (ev != NULL) ev->unpark() ;
}
由此可見,interrupt()其實就是通過ParkEvent的unpark方法喚醒執行緒。
wait()響應中斷
1、在呼叫ParkEvent的park方法之前,會先判斷執行緒的中斷狀態,如果為true,清除執行緒的中斷狀態,並丟擲InterruptedException,然後結束。
2、在呼叫ParkEvent的park方法阻塞在條件變數之後,當interrupt()呼叫ParkEvent的unpark方法喚醒執行緒,執行緒會從pthread_cond_wait()返回,從而解除阻塞,程式碼繼續往下走,再次判斷執行緒的中斷狀態,如果為true則清除執行緒的中斷狀態,並丟擲InterruptedException,然後結束。
判斷執行緒中斷狀態,並通過布林引數決定是否清除執行緒中斷狀態,方法如下:
bool Thread::is_interrupted(Thread* thread, bool clear_interrupted) {
trace("is_interrupted", thread);
debug_only(check_for_dangling_thread_pointer(thread);)
// Note: If clear_interrupted==false, this simply fetches and
// returns the value of the field osthread()->interrupted().
return os::is_interrupted(thread, clear_interrupted);
}
linux平臺對os::is_interrupted()的實現為:
bool os::is_interrupted(Thread* thread, bool clear_interrupted) {
assert(Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");
OSThread* osthread = thread->osthread();
bool interrupted = osthread->interrupted(); //獲取執行緒中斷狀態
if (interrupted && clear_interrupted) {
osthread->set_interrupted(false); //清除執行緒中斷狀態,重置為false
// consider thread->_SleepEvent->reset() ... optional optimization
}
return interrupted;
}
ObjectMonitor::wait()的實現如下:
方法開始時, 呼叫thread::is_interrupted(Thread* thread, true)判斷並清除執行緒中斷狀態,如果中斷狀態為true,丟擲中斷異常並結束。
從park()方法返回後,判斷是否是因為中斷返回,再次呼叫thread::is_interrupted(Thread* thread, true)判斷並清除執行緒中斷狀態,如果中斷狀態為true,丟擲中斷異常並結束。
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
Thread * const Self = THREAD ;
assert(Self->is_Java_thread(), "Must be Java thread!");
JavaThread *jt = (JavaThread *)THREAD;
DeferredInitialize () ;
// Throw IMSX or IEX.
CHECK_OWNER();
EventJavaMonitorWait event;
// check for a pending interrupt 呼叫is_interrupted(thread,true)判斷並清除執行緒中斷狀態
if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
// post monitor waited event. Note that this is past-tense, we are done waiting.
if (JvmtiExport::should_post_monitor_waited()) {
// Note: 'false' parameter is passed here because the
// wait was not timed out due to thread interrupt.
JvmtiExport::post_monitor_waited(jt, this, false);
}
if (event.should_commit()) {
post_monitor_wait_event(&event, 0, millis, false);
}
TEVENT (Wait - Throw IEX) ;
THROW(vmSymbols::java_lang_InterruptedException()); //丟擲InterruptedException
return ; //直接結束,不執行下面邏輯
}
TEVENT (Wait) ;
assert (Self->_Stalled == 0, "invariant") ;
Self->_Stalled = intptr_t(this) ;
jt->set_current_waiting_monitor(this);
// create a node to be put into the queue
// Critically, after we reset() the event but prior to park(), we must check
// for a pending interrupt.
ObjectWaiter node(Self);//將執行緒封裝成waitor節點
node.TState = ObjectWaiter::TS_WAIT ;
Self->_ParkEvent->reset() ;
OrderAccess::fence(); // ST into Event; membar ; LD interrupted-flag
// Enter the waiting queue, which is a circular doubly linked list in this case
// but it could be a priority queue or any data structure.
// _WaitSetLock protects the wait queue. Normally the wait queue is accessed only
// by the the owner of the monitor *except* in the case where park()
// returns because of a timeout of interrupt. Contention is exceptionally rare
// so we use a simple spin-lock instead of a heavier-weight blocking lock.
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
AddWaiter (&node) ; //新增到waitset
Thread::SpinRelease (&_WaitSetLock) ;
if ((SyncFlags & 4) == 0) {
_Responsible = NULL ;
}
intptr_t save = _recursions; // record the old recursion count
_waiters++; // increment the number of waiters
_recursions = 0; // set the recursion level to be 1
exit (true, Self) ; // exit the monitor
guarantee (_owner != Self, "invariant") ;
// As soon as the ObjectMonitor's ownership is dropped in the exit()
// call above, another thread can enter() the ObjectMonitor, do the
// notify(), and exit() the ObjectMonitor. If the other thread's
// exit() call chooses this thread as the successor and the unpark()
// call happens to occur while this thread is posting a
// MONITOR_CONTENDED_EXIT event, then we run the risk of the event
// handler using RawMonitors and consuming the unpark().
//
// To avoid the problem, we re-post the event. This does no harm
// even if the original unpark() was not consumed because we are the
// chosen successor for this monitor.
if (node._notified != 0 && _succ == Self) {
node._event->unpark();
}
// The thread is on the WaitSet list - now park() it.
// On MP systems it's conceivable that a brief spin before we park
// could be profitable.
//
// TODO-FIXME: change the following logic to a loop of the form
// while (!timeout && !interrupted && _notified == 0) park()
int ret = OS_OK ;
int WasNotified = 0 ;
{ // State transition wrappers
OSThread* osthread = Self->osthread();
OSThreadWaitState osts(osthread, true);
{
ThreadBlockInVM tbivm(jt);
// Thread is in thread_blocked state and oop access is unsafe.
jt->set_suspend_equivalent();
if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {
// Intentionally empty
} else
if (node._notified == 0) {
if (millis <= 0) {
Self->_ParkEvent->park () ; //呼叫park方法阻塞執行緒
} else {
ret = Self->_ParkEvent->park (millis) ; //呼叫park方法在超時時間內阻塞執行緒
}
}
// were we externally suspended while we were waiting?
if (ExitSuspendEquivalent (jt)) {
// TODO-FIXME: add -- if succ == Self then succ = null.
jt->java_suspend_self();
}
} // Exit thread safepoint: transition _thread_blocked -> _thread_in_vm
// Node may be on the WaitSet, the EntryList (or cxq), or in transition
// from the WaitSet to the EntryList.
// See if we need to remove Node from the WaitSet.
// We use double-checked locking to avoid grabbing _WaitSetLock
// if the thread is not on the wait queue.
//
// Note that we don't need a fence before the fetch of TState.
// In the worst case we'll fetch a old-stale value of TS_WAIT previously
// written by the is thread. (perhaps the fetch might even be satisfied
// by a look-aside into the processor's own store buffer, although given
// the length of the code path between the prior ST and this load that's
// highly unlikely). If the following LD fetches a stale TS_WAIT value
// then we'll acquire the lock and then re-fetch a fresh TState value.
// That is, we fail toward safety.
if (node.TState == ObjectWaiter::TS_WAIT) {
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - unlink") ;
if (node.TState == ObjectWaiter::TS_WAIT) {
DequeueSpecificWaiter (&node) ; // unlink from WaitSet
assert(node._notified == 0, "invariant");
node.TState = ObjectWaiter::TS_RUN ;
}
Thread::SpinRelease (&_WaitSetLock) ;
}
// The thread is now either on off-list (TS_RUN),
// on the EntryList (TS_ENTER), or on the cxq (TS_CXQ).
// The Node's TState variable is stable from the perspective of this thread.
// No other threads will asynchronously modify TState.
guarantee (node.TState != ObjectWaiter::TS_WAIT, "invariant") ;
OrderAccess::loadload() ;
if (_succ == Self) _succ = NULL ;
WasNotified = node._notified ;
// Reentry phase -- reacquire the monitor.
// re-enter contended monitor after object.wait().
// retain OBJECT_WAIT state until re-enter successfully completes
// Thread state is thread_in_vm and oop access is again safe,
// although the raw address of the object may have changed.
// (Don't cache naked oops over safepoints, of course).
// post monitor waited event. Note that this is past-tense, we are done waiting.
if (JvmtiExport::should_post_monitor_waited()) {
JvmtiExport::post_monitor_waited(jt, this, ret == OS_TIMEOUT);
}
if (event.should_commit()) {
post_monitor_wait_event(&event, node._notifier_tid, millis, ret == OS_TIMEOUT);
}
OrderAccess::fence() ;
assert (Self->_Stalled != 0, "invariant") ;
Self->_Stalled = 0 ;
assert (_owner != Self, "invariant") ;
ObjectWaiter::TStates v = node.TState ;
if (v == ObjectWaiter::TS_RUN) {
enter (Self) ;
} else {
guarantee (v == ObjectWaiter::TS_ENTER || v == ObjectWaiter::TS_CXQ, "invariant") ;
ReenterI (Self, &node) ;
node.wait_reenter_end(this);
}
// Self has reacquired the lock.
// Lifecycle - the node representing Self must not appear on any queues.
// Node is about to go out-of-scope, but even if it were immortal we wouldn't
// want residual elements associated with this thread left on any lists.
guarantee (node.TState == ObjectWaiter::TS_RUN, "invariant") ;
assert (_owner == Self, "invariant") ;
assert (_succ != Self , "invariant") ;
} // OSThreadWaitState()
jt->set_current_waiting_monitor(NULL);
guarantee (_recursions == 0, "invariant") ;
_recursions = save; // restore the old recursion count
_waiters--; // decrement the number of waiters
// Verify a few postconditions
assert (_owner == Self , "invariant") ;
assert (_succ != Self , "invariant") ;
assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
if (SyncFlags & 32) {
OrderAccess::fence() ;
}
// check if the notification happened 判斷是否發生通知
if (!WasNotified) { //如果不是發生通知,則可能是超時或者中斷
// no, it could be timeout or Thread.interrupt() or both 判斷是否中斷,否則為超時
// check for interrupt event, otherwise it is timeout
//呼叫is_interrupted()判斷並清除中斷狀態
if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
TEVENT (Wait - throw IEX from epilog) ;
THROW(vmSymbols::java_lang_InterruptedException()); //丟擲InterruptedException
}
}
// NOTE: Spurious wake up will be consider as timeout.
// Monitor notify has precedence over thread interrupt.
}
wait()返回只在以下三種情形下發生:通知、超時、中斷。
Thread.interrupt可以看作一種特殊的通知訊號:
Object.wait(timo) will return because of
// (a) notification
// (b) timeout
// (c) thread.interrupt
//
// Thread.interrupt and object.notify{All} both call Event::set.
// That is, we treat thread.interrupt as a special case of notification.
// The underlying Solaris implementation, cond_timedwait, admits
// spurious/premature wakeups, but the JLS/JVM spec prevents the
// JVM from making those visible to Java code. As such, we must
// filter out spurious wakeups. We assume all ETIME returns are valid.
sleep()方法響應中斷
Thread.sleep最終呼叫JVM_Sleep方法:
方法開始時, 呼叫thread::is_interrupted(Thread* thread, true)判斷並清除執行緒中斷狀態,如果中斷狀態為true,丟擲中斷異常並結束。
呼叫os::sleep方法返回後,判斷返回值是否為OS_INTRPT,如果是則為發生中斷,丟擲中斷異常。
JVM_ENTRY(void, JVM_Sleep(JNIEnv* env, jclass threadClass, jlong millis))
JVMWrapper("JVM_Sleep");
if (millis < 0) {
THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "timeout value is negative"); }
//判斷並清除執行緒中斷狀態,如果中斷狀態為true,丟擲中斷異常
if (Thread::is_interrupted (THREAD, true) && !HAS_PENDING_EXCEPTION) {
THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
}
JavaThreadSleepState jtss(thread);
EventThreadSleep event;
if (millis == 0) {
if (ConvertSleepToYield) {
os::yield();
} else {
ThreadState old_state = thread->osthread()->get_state();
thread->osthread()->set_state(SLEEPING);
os::sleep(thread, MinSleepInterval, false);//sleep 1ms
thread->osthread()->set_state(old_state);
}
} else {
ThreadState old_state = thread->osthread()->get_state();
//osthread->thread status mapping:
// NEW->NEW //RUNNABLE->RUNNABLE //BLOCKED_ON_MONITOR_ENTER->BLOCKED
//IN_OBJECT_WAIT,PARKED->WAITING
//SLEEPING,IN_OBJECT_WAIT_TIMED,PARKED_TIMED->TIMED_WAITING /
//TERMINATED->TERMINATED
thread->osthread()->set_state(SLEEPING);
//呼叫os::sleep方法,如果sleep()的返回值為OS_INTRPT,則為發生中斷,丟擲中斷異常
if (os::sleep(thread, millis, true) == OS_INTRPT) {
if (!HAS_PENDING_EXCEPTION) {
if (event.should_commit()) {
event.set_time(millis); event.commit();
}
THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
}
}
thread->osthread()->set_state(old_state);
}
if (event.should_commit()) {
event.set_time(millis);
event.commit();
}
JVM_END
os::sleep方法在死迴圈內呼叫park方法,他只在滿足以下兩種情形之一時從park方法返回並退出死迴圈,否則即使從park方法返回了,也認為喚醒無效,繼續呼叫park方法:
1、呼叫thread.interrupt方法解除執行緒阻塞,讓park方法返回。
從park()方法返回後,判斷是否是因為中斷返回,呼叫thread::is_interrupted(Thread* thread, true)判斷並清除執行緒中斷狀態,如果中斷狀態為true,return返回OS_INTRPT,退出死迴圈。
2、到達指定睡眠時間,park方法自動返回。
從park()方法返回後,判斷剩餘時間millis是否小於等於0,如果是,則認為到達指定睡眠時間,return返回OS_OK,退出死迴圈。
int os::sleep(Thread* thread, jlong millis, bool interruptible) {
assert(thread == Thread::current(), "thread consistency check");
ParkEvent * const slp = thread->_SleepEvent ;
slp->reset() ;
OrderAccess::fence() ;
if (interruptible) {
jlong prevtime = javaTimeNanos();
for (;;) {
if (os::is_interrupted(thread, true)) { //判斷並清除執行緒中斷狀態
return OS_INTRPT; //發生中斷狀態為true,返回OS_INTRPT
}
jlong newtime = javaTimeNanos();
if (newtime - prevtime < 0) {
// time moving backwards, should only happen if no monotonic clock
// not a guarantee() because JVM should not abort on kernel/glibc bugs
assert(!Linux::supports_monotonic_clock(), "time moving backwards");
} else {
millis -= (newtime - prevtime) / NANOSECS_PER_MILLISEC;
}
if(millis <= 0) { //如果剩餘時間小於0,返回OS_OK
return OS_OK;
}
prevtime = newtime;
{
assert(thread->is_Java_thread(), "sanity check");
JavaThread *jt = (JavaThread *) thread;
ThreadBlockInVM tbivm(jt);
OSThreadWaitState osts(jt->osthread(), false /* not Object.wait() */);
jt->set_suspend_equivalent();
// cleared by handle_special_suspend_equivalent_condition() or
// java_suspend_self() via check_and_wait_while_suspended()
slp->park(millis); //呼叫park方法
// were we externally suspended while we were waiting?
jt->check_and_wait_while_suspended();
}
}
} else {
OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
jlong prevtime = javaTimeNanos();
for (;;) {
// It'd be nice to avoid the back-to-back javaTimeNanos() calls on
// the 1st iteration ...
jlong newtime = javaTimeNanos();
if (newtime - prevtime < 0) {
// time moving backwards, should only happen if no monotonic clock
// not a guarantee() because JVM should not abort on kernel/glibc bugs
assert(!Linux::supports_monotonic_clock(), "time moving backwards");
} else {
millis -= (newtime - prevtime) / NANOSECS_PER_MILLISEC;
}
if(millis <= 0) break ;
prevtime = newtime;
slp->park(millis); //呼叫park方法
}
return OS_OK ;
}
}
參考: