1. 程式人生 > >jvm原始碼分析之interrupt()

jvm原始碼分析之interrupt()

概述

執行緒的thread.interrupt()方法是中斷執行緒。中斷一個執行緒意味著線上程完成它的任務之前,停止它當前正在執行的操作。

如果執行緒堵塞在object.wait、Thread.join和Thread.sleep,將會清除執行緒的中斷狀態,並丟擲InterruptedException;

如果執行緒堵塞在java.nio.channels.InterruptibleChannel的IO上,Channel將會被關閉,執行緒被置為中斷狀態,並丟擲java.nio.channels.ClosedByInterruptException;

如果執行緒堵塞在java.nio.channels.Selector上,執行緒被置為中斷狀態,select方法會馬上返回,類似呼叫wakeup的效果;

如果不是以上三種情況,thread.interrupt()方法僅僅是設定執行緒的中斷狀態為true。

interrupt方法的jvm原始碼分析

在jvm的Thread類中有三個成員變數:(javaThread就是繼承了這個Thread類)

public:

 ParkEvent * _ParkEvent ;                     // for synchronized()    
 ParkEvent * _SleepEvent ;                    // for Thread.sleep

  // JSR166 per-thread parker
private:
  Parker*    _parker;

interrupt方法的jvm原始碼入口在jvm.cpp檔案:

JVM_ENTRY(void, JVM_Interrupt(JNIEnv* env, jobject jthread))
  JVMWrapper("JVM_Interrupt");

  // Ensure that the C++ Thread and OSThread structures aren't freed before we operate
  oop java_thread = JNIHandles::resolve_non_null(jthread);
  MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock);
  // We need to re-resolve the java_thread, since a GC might have happened during the
  // acquire of the lock
  JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread));
  if (thr != NULL) {
    Thread::interrupt(thr);
  }
JVM_END

JVM_Interrupt對引數進行了校驗,然後直接呼叫Thread::interrupt:

void Thread::interrupt(Thread* thread) {
  trace("interrupt", thread);
  debug_only(check_for_dangling_thread_pointer(thread);)
  os::interrupt(thread);
}

Thread::interrupt呼叫os::interrupt方法實現:

void os::interrupt(Thread* thread) {
  assert(Thread::current() == thread || Threads_lock->owned_by_self(),
    "possibility of dangling Thread pointer");

  //獲取系統native執行緒物件
  OSThread* osthread = thread->osthread();

  if (!osthread->interrupted()) {
    osthread->set_interrupted(true); //設定中斷狀態為true
   //記憶體屏障,使osthread的interrupted狀態對其它執行緒立即可見
    OrderAccess::fence();
    //前文說過,_SleepEvent用於Thread.sleep,執行緒呼叫了sleep方法,則通過unpark喚醒
    ParkEvent * const slp = thread->_SleepEvent ;
    if (slp != NULL) slp->unpark() ;
  }

  //_parker用於concurrent相關的鎖,此處同樣通過unpark喚醒
  if (thread->is_Java_thread())
    ((JavaThread*)thread)->parker()->unpark();
  //Object.wait()喚醒
  ParkEvent * ev = thread->_ParkEvent ;
  if (ev != NULL) ev->unpark() ;

}

由此可見,interrupt()其實就是通過ParkEvent的unpark方法喚醒執行緒。

wait()響應中斷

1、在呼叫ParkEvent的park方法之前,會先判斷執行緒的中斷狀態,如果為true,清除執行緒的中斷狀態,並丟擲InterruptedException,然後結束。

2、在呼叫ParkEvent的park方法阻塞在條件變數之後,當interrupt()呼叫ParkEvent的unpark方法喚醒執行緒,執行緒會從pthread_cond_wait()返回,從而解除阻塞,程式碼繼續往下走,再次判斷執行緒的中斷狀態,如果為true則清除執行緒的中斷狀態,並丟擲InterruptedException,然後結束。

判斷執行緒中斷狀態,並通過布林引數決定是否清除執行緒中斷狀態,方法如下:

bool Thread::is_interrupted(Thread* thread, bool clear_interrupted) {
  trace("is_interrupted", thread);
  debug_only(check_for_dangling_thread_pointer(thread);)
  // Note:  If clear_interrupted==false, this simply fetches and
  // returns the value of the field osthread()->interrupted().
  return os::is_interrupted(thread, clear_interrupted);
}

linux平臺對os::is_interrupted()的實現為:

bool os::is_interrupted(Thread* thread, bool clear_interrupted) {
  assert(Thread::current() == thread || Threads_lock->owned_by_self(),
    "possibility of dangling Thread pointer");

  OSThread* osthread = thread->osthread();

  bool interrupted = osthread->interrupted();  //獲取執行緒中斷狀態

  if (interrupted && clear_interrupted) {
    osthread->set_interrupted(false);  //清除執行緒中斷狀態,重置為false
    // consider thread->_SleepEvent->reset() ... optional optimization
  }

  return interrupted;
}

ObjectMonitor::wait()的實現如下:

 方法開始時, 呼叫thread::is_interrupted(Thread* thread, true)判斷並清除執行緒中斷狀態,如果中斷狀態為true,丟擲中斷異常並結束。

從park()方法返回後,判斷是否是因為中斷返回,再次呼叫thread::is_interrupted(Thread* thread, true)判斷並清除執行緒中斷狀態,如果中斷狀態為true,丟擲中斷異常並結束。

void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
   Thread * const Self = THREAD ;
   assert(Self->is_Java_thread(), "Must be Java thread!");
   JavaThread *jt = (JavaThread *)THREAD;
 
   DeferredInitialize () ;
 
   // Throw IMSX or IEX.
   CHECK_OWNER();
 
   EventJavaMonitorWait event;
 
   // check for a pending interrupt  呼叫is_interrupted(thread,true)判斷並清除執行緒中斷狀態
   if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
     // post monitor waited event.  Note that this is past-tense, we are done waiting.
     if (JvmtiExport::should_post_monitor_waited()) {
        // Note: 'false' parameter is passed here because the
        // wait was not timed out due to thread interrupt.
        JvmtiExport::post_monitor_waited(jt, this, false);  
     }
     if (event.should_commit()) {
       post_monitor_wait_event(&event, 0, millis, false);
     }
     TEVENT (Wait - Throw IEX) ;
     THROW(vmSymbols::java_lang_InterruptedException()); //丟擲InterruptedException
     return ;   //直接結束,不執行下面邏輯
   }
 
   TEVENT (Wait) ;
 
   assert (Self->_Stalled == 0, "invariant") ;
   Self->_Stalled = intptr_t(this) ;
   jt->set_current_waiting_monitor(this);
 
   // create a node to be put into the queue
   // Critically, after we reset() the event but prior to park(), we must check
   // for a pending interrupt.
   ObjectWaiter node(Self);//將執行緒封裝成waitor節點
   node.TState = ObjectWaiter::TS_WAIT ;
   Self->_ParkEvent->reset() ;
   OrderAccess::fence();          // ST into Event; membar ; LD interrupted-flag
 
   // Enter the waiting queue, which is a circular doubly linked list in this case
   // but it could be a priority queue or any data structure.
   // _WaitSetLock protects the wait queue.  Normally the wait queue is accessed only
   // by the the owner of the monitor *except* in the case where park()
   // returns because of a timeout of interrupt.  Contention is exceptionally rare
   // so we use a simple spin-lock instead of a heavier-weight blocking lock.
 
   Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
   AddWaiter (&node) ; //新增到waitset
   Thread::SpinRelease (&_WaitSetLock) ;
 
   if ((SyncFlags & 4) == 0) {
      _Responsible = NULL ;
   }
   intptr_t save = _recursions; // record the old recursion count
   _waiters++;                  // increment the number of waiters
   _recursions = 0;             // set the recursion level to be 1
   exit (true, Self) ;                    // exit the monitor
   guarantee (_owner != Self, "invariant") ;
 
   // As soon as the ObjectMonitor's ownership is dropped in the exit()
   // call above, another thread can enter() the ObjectMonitor, do the
   // notify(), and exit() the ObjectMonitor. If the other thread's
   // exit() call chooses this thread as the successor and the unpark()
   // call happens to occur while this thread is posting a
   // MONITOR_CONTENDED_EXIT event, then we run the risk of the event
   // handler using RawMonitors and consuming the unpark().
   //
   // To avoid the problem, we re-post the event. This does no harm
   // even if the original unpark() was not consumed because we are the
   // chosen successor for this monitor.
   if (node._notified != 0 && _succ == Self) {
      node._event->unpark();
   }
 
   // The thread is on the WaitSet list - now park() it.
   // On MP systems it's conceivable that a brief spin before we park
   // could be profitable.
   //
   // TODO-FIXME: change the following logic to a loop of the form
   //   while (!timeout && !interrupted && _notified == 0) park()
 
   int ret = OS_OK ;
   int WasNotified = 0 ;
   { // State transition wrappers
     OSThread* osthread = Self->osthread();
     OSThreadWaitState osts(osthread, true);
     {
       ThreadBlockInVM tbivm(jt);
       // Thread is in thread_blocked state and oop access is unsafe.
       jt->set_suspend_equivalent();
 
       if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {
           // Intentionally empty
       } else
       if (node._notified == 0) {
         if (millis <= 0) {
            Self->_ParkEvent->park () ;  //呼叫park方法阻塞執行緒
         } else {
            ret = Self->_ParkEvent->park (millis) ; //呼叫park方法在超時時間內阻塞執行緒
         }
       }
 
       // were we externally suspended while we were waiting?
       if (ExitSuspendEquivalent (jt)) {
          // TODO-FIXME: add -- if succ == Self then succ = null.
          jt->java_suspend_self();
       }
 
     } // Exit thread safepoint: transition _thread_blocked -> _thread_in_vm
 
 
     // Node may be on the WaitSet, the EntryList (or cxq), or in transition
     // from the WaitSet to the EntryList.
     // See if we need to remove Node from the WaitSet.
     // We use double-checked locking to avoid grabbing _WaitSetLock
     // if the thread is not on the wait queue.
     //
     // Note that we don't need a fence before the fetch of TState.
     // In the worst case we'll fetch a old-stale value of TS_WAIT previously
     // written by the is thread. (perhaps the fetch might even be satisfied
     // by a look-aside into the processor's own store buffer, although given
     // the length of the code path between the prior ST and this load that's
     // highly unlikely).  If the following LD fetches a stale TS_WAIT value
     // then we'll acquire the lock and then re-fetch a fresh TState value.
     // That is, we fail toward safety.
 
     if (node.TState == ObjectWaiter::TS_WAIT) {
         Thread::SpinAcquire (&_WaitSetLock, "WaitSet - unlink") ;
         if (node.TState == ObjectWaiter::TS_WAIT) {
            DequeueSpecificWaiter (&node) ;       // unlink from WaitSet
            assert(node._notified == 0, "invariant");
            node.TState = ObjectWaiter::TS_RUN ;
         }
         Thread::SpinRelease (&_WaitSetLock) ;
     }
 
     // The thread is now either on off-list (TS_RUN),
     // on the EntryList (TS_ENTER), or on the cxq (TS_CXQ).
     // The Node's TState variable is stable from the perspective of this thread.
     // No other threads will asynchronously modify TState.
     guarantee (node.TState != ObjectWaiter::TS_WAIT, "invariant") ;
     OrderAccess::loadload() ;
     if (_succ == Self) _succ = NULL ;
     WasNotified = node._notified ;
 
     // Reentry phase -- reacquire the monitor.
     // re-enter contended monitor after object.wait().
     // retain OBJECT_WAIT state until re-enter successfully completes
     // Thread state is thread_in_vm and oop access is again safe,
     // although the raw address of the object may have changed.
     // (Don't cache naked oops over safepoints, of course).
 
     // post monitor waited event. Note that this is past-tense, we are done waiting.
     if (JvmtiExport::should_post_monitor_waited()) {
       JvmtiExport::post_monitor_waited(jt, this, ret == OS_TIMEOUT);
     }
 
     if (event.should_commit()) {
       post_monitor_wait_event(&event, node._notifier_tid, millis, ret == OS_TIMEOUT);
     }
 
     OrderAccess::fence() ;
 
     assert (Self->_Stalled != 0, "invariant") ;
     Self->_Stalled = 0 ;
 
     assert (_owner != Self, "invariant") ;
     ObjectWaiter::TStates v = node.TState ;
     if (v == ObjectWaiter::TS_RUN) {
         enter (Self) ;
     } else {
         guarantee (v == ObjectWaiter::TS_ENTER || v == ObjectWaiter::TS_CXQ, "invariant") ;
         ReenterI (Self, &node) ;
         node.wait_reenter_end(this);
     }
 
     // Self has reacquired the lock.
     // Lifecycle - the node representing Self must not appear on any queues.
     // Node is about to go out-of-scope, but even if it were immortal we wouldn't
     // want residual elements associated with this thread left on any lists.
     guarantee (node.TState == ObjectWaiter::TS_RUN, "invariant") ;
     assert    (_owner == Self, "invariant") ;
     assert    (_succ != Self , "invariant") ;
   } // OSThreadWaitState()
 
   jt->set_current_waiting_monitor(NULL);
 
   guarantee (_recursions == 0, "invariant") ;
   _recursions = save;     // restore the old recursion count
   _waiters--;             // decrement the number of waiters
 
   // Verify a few postconditions
   assert (_owner == Self       , "invariant") ;
   assert (_succ  != Self       , "invariant") ;
   assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
 
   if (SyncFlags & 32) {
      OrderAccess::fence() ;
   }
 
   // check if the notification happened  判斷是否發生通知
   if (!WasNotified) {   //如果不是發生通知,則可能是超時或者中斷
     // no, it could be timeout or Thread.interrupt() or both 判斷是否中斷,否則為超時
     // check for interrupt event, otherwise it is timeout 
     //呼叫is_interrupted()判斷並清除中斷狀態
     if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
       TEVENT (Wait - throw IEX from epilog) ;
       THROW(vmSymbols::java_lang_InterruptedException()); //丟擲InterruptedException
     }
   }
 
   // NOTE: Spurious wake up will be consider as timeout.
   // Monitor notify has precedence over thread interrupt.
}

wait()返回只在以下三種情形下發生:通知、超時、中斷。

Thread.interrupt可以看作一種特殊的通知訊號:

Object.wait(timo) will return because of
  // (a) notification
  // (b) timeout
  // (c) thread.interrupt
  //
  // Thread.interrupt and object.notify{All} both call Event::set.
  // That is, we treat thread.interrupt as a special case of notification.
  // The underlying Solaris implementation, cond_timedwait, admits
  // spurious/premature wakeups, but the JLS/JVM spec prevents the
  // JVM from making those visible to Java code.  As such, we must
  // filter out spurious wakeups.  We assume all ETIME returns are valid.

sleep()方法響應中斷

Thread.sleep最終呼叫JVM_Sleep方法:

 方法開始時, 呼叫thread::is_interrupted(Thread* thread, true)判斷並清除執行緒中斷狀態,如果中斷狀態為true,丟擲中斷異常並結束。

呼叫os::sleep方法返回後,判斷返回值是否為OS_INTRPT,如果是則為發生中斷,丟擲中斷異常。

JVM_ENTRY(void, JVM_Sleep(JNIEnv* env, jclass threadClass, jlong millis))
 JVMWrapper("JVM_Sleep"); 
if (millis < 0) {
       THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "timeout value is negative"); } 
//判斷並清除執行緒中斷狀態,如果中斷狀態為true,丟擲中斷異常
if (Thread::is_interrupted (THREAD, true) && !HAS_PENDING_EXCEPTION) { 
      THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted"); 
} 
JavaThreadSleepState jtss(thread); 
EventThreadSleep event; 
if (millis == 0) {       
    if (ConvertSleepToYield) { 
        os::yield(); 
    } else {
        ThreadState old_state = thread->osthread()->get_state();
        thread->osthread()->set_state(SLEEPING); 
        os::sleep(thread, MinSleepInterval, false);//sleep 1ms 
        thread->osthread()->set_state(old_state); 
    }
 } else {
       ThreadState old_state = thread->osthread()->get_state();
       //osthread->thread status mapping: 
       // NEW->NEW //RUNNABLE->RUNNABLE //BLOCKED_ON_MONITOR_ENTER->BLOCKED 
      //IN_OBJECT_WAIT,PARKED->WAITING 
      //SLEEPING,IN_OBJECT_WAIT_TIMED,PARKED_TIMED->TIMED_WAITING /
     //TERMINATED->TERMINATED 
     thread->osthread()->set_state(SLEEPING); 
    //呼叫os::sleep方法,如果sleep()的返回值為OS_INTRPT,則為發生中斷,丟擲中斷異常 
   if (os::sleep(thread, millis, true) == OS_INTRPT) { 
        if (!HAS_PENDING_EXCEPTION) {
             if (event.should_commit()) { 
                 event.set_time(millis); event.commit(); 
              } 
             THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted"); 
        }
    } 
    thread->osthread()->set_state(old_state);
} 
if (event.should_commit()) { 
     event.set_time(millis); 
     event.commit();
 }
JVM_END
 
 

os::sleep方法在死迴圈內呼叫park方法,他只在滿足以下兩種情形之一時從park方法返回並退出死迴圈,否則即使從park方法返回了,也認為喚醒無效,繼續呼叫park方法:

1、呼叫thread.interrupt方法解除執行緒阻塞,讓park方法返回。

     從park()方法返回後,判斷是否是因為中斷返回,呼叫thread::is_interrupted(Thread* thread, true)判斷並清除執行緒中斷狀態,如果中斷狀態為true,return返回OS_INTRPT,退出死迴圈。

2、到達指定睡眠時間,park方法自動返回。

      從park()方法返回後,判斷剩餘時間millis是否小於等於0,如果是,則認為到達指定睡眠時間,return返回OS_OK,退出死迴圈。

int os::sleep(Thread* thread, jlong millis, bool interruptible) {
  assert(thread == Thread::current(),  "thread consistency check");

  ParkEvent * const slp = thread->_SleepEvent ;
  slp->reset() ;
  OrderAccess::fence() ;

  if (interruptible) {
    jlong prevtime = javaTimeNanos();

    for (;;) {
      if (os::is_interrupted(thread, true)) { //判斷並清除執行緒中斷狀態
        return OS_INTRPT;    //發生中斷狀態為true,返回OS_INTRPT
      }

      jlong newtime = javaTimeNanos();

      if (newtime - prevtime < 0) {
        // time moving backwards, should only happen if no monotonic clock
        // not a guarantee() because JVM should not abort on kernel/glibc bugs
        assert(!Linux::supports_monotonic_clock(), "time moving backwards");
      } else {
        millis -= (newtime - prevtime) / NANOSECS_PER_MILLISEC;
      }

      if(millis <= 0) {  //如果剩餘時間小於0,返回OS_OK
        return OS_OK;
      }

      prevtime = newtime;

      {
        assert(thread->is_Java_thread(), "sanity check");
        JavaThread *jt = (JavaThread *) thread;
        ThreadBlockInVM tbivm(jt);
        OSThreadWaitState osts(jt->osthread(), false /* not Object.wait() */);

        jt->set_suspend_equivalent();
        // cleared by handle_special_suspend_equivalent_condition() or
        // java_suspend_self() via check_and_wait_while_suspended()

        slp->park(millis); //呼叫park方法

        // were we externally suspended while we were waiting?
        jt->check_and_wait_while_suspended();
      }
    }
  } else {
    OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
    jlong prevtime = javaTimeNanos();

    for (;;) {
      // It'd be nice to avoid the back-to-back javaTimeNanos() calls on
      // the 1st iteration ...
      jlong newtime = javaTimeNanos();

      if (newtime - prevtime < 0) {
        // time moving backwards, should only happen if no monotonic clock
        // not a guarantee() because JVM should not abort on kernel/glibc bugs
        assert(!Linux::supports_monotonic_clock(), "time moving backwards");
      } else {
        millis -= (newtime - prevtime) / NANOSECS_PER_MILLISEC;
      }

      if(millis <= 0) break ;

      prevtime = newtime;
      slp->park(millis); //呼叫park方法
    }
    return OS_OK ;
  }
}

參考: