LockSupport中的park與unpark原理
LockSupport是用來建立locks的基本執行緒阻塞基元,比如AQS中實現執行緒掛起的方法,就是park,對應喚醒就是unpark。JDK中有使用的如下

park
的時候,會立馬返回,此時許可也會被消費掉,如果沒有許可,則會阻塞。呼叫unpark的時候,如果許可本身不可用,則會使得許可可用
許可只有一個,不可累加
park原始碼跟蹤
park的宣告形式有一下兩大塊

一部分多了一個Object引數,作為blocker,另外的則沒有。blocker的好處在於,在診斷問題的時候能夠知道park的原因
推薦使用帶有Object的park操作
park函式作用
park用於掛起當前執行緒,如果許可可用,會立馬返回,並消費掉許可。
- park(Object): 恢復的條件為 1:執行緒呼叫了unpark; 2:其它執行緒中斷了執行緒;3:發生了不可預料的事情
- parkNanos(Object blocker, long nanos):恢復的條件為 1:執行緒呼叫了unpark; 2:其它執行緒中斷了執行緒;3:發生了不可預料的事情;4:過期時間到了
- parkUntil(Object blocker, long deadline):恢復的條件為 1:執行緒呼叫了unpark; 2:其它執行緒中斷了執行緒;3:發生了不可預料的事情;4:指定的deadLine已經到了 以park的原始碼為例
public static void park(Object blocker) { //獲取當前執行緒 Thread t = Thread.currentThread(); //記錄當前執行緒阻塞的原因,底層就是unsafe.putObject,就是把物件儲存起來 setBlocker(t, blocker); //執行park unsafe.park(false, 0L); //執行緒恢復後,去掉阻塞原因 setBlocker(t, null); } 複製程式碼
從原始碼可以看到真實的實現均在 unsafe
unsafe.park
核心實現如下
JavaThread* thread=JavaThread::thread_from_jni_environment(env); ... thread->parker()->park(isAbsolute != 0, time); 複製程式碼
就是獲取java執行緒的parker物件,然後執行它的park方法。Parker的定義如下
class Parker : public os::PlatformParker { private: //表示許可 volatile int _counter ; Parker * FreeNext ; JavaThread * AssociatedWith ; // Current association public: Parker() : PlatformParker() { //初始化_counter _counter= 0 ; FreeNext= NULL ; AssociatedWith = NULL ; } protected: ~Parker() { ShouldNotReachHere(); } public: void park(bool isAbsolute, jlong time); void unpark(); // Lifecycle operators static Parker * Allocate (JavaThread * t) ; static void Release (Parker * e) ; private: static Parker * volatile FreeList ; static volatile int ListLock ; }; 複製程式碼
它繼承了os::PlatformParker,內建了一個volatitle的 _counter。PlatformParker則是在不同的作業系統中有不同的實現,以linux為例
class PlatformParker : public CHeapObj { protected: //互斥變數型別 pthread_mutex_t _mutex [1] ; //條件變數型別 pthread_cond_t_cond[1] ; public: ~PlatformParker() { guarantee (0, "invariant") ; } public: PlatformParker() { int status; //初始化條件變數,使用pthread_cond_t之前必須先執行初始化 status = pthread_cond_init (_cond, NULL); assert_status(status == 0, status, "cond_init”); // 初始化互斥變數,使用pthread_mutex_t之前必須先執行初始化 status = pthread_mutex_init (_mutex, NULL); assert_status(status == 0, status, "mutex_init"); } } 複製程式碼
上述程式碼均為POSIX執行緒介面使用,所以pthread指的也就是posixThread
parker實現如下
void Parker::park(bool isAbsolute, jlong time) { if (_counter > 0) { //已經有許可了,用掉當前許可 _counter = 0 ; //使用記憶體屏障,確保 _counter賦值為0(寫入操作)能夠被記憶體屏障之後的讀操作獲取記憶體屏障事前的結果,也就是能夠正確的讀到0 OrderAccess::fence(); //立即返回 return ; } Thread* thread = Thread::current(); assert(thread->is_Java_thread(), "Must be JavaThread"); JavaThread *jt = (JavaThread *)thread; if (Thread::is_interrupted(thread, false)) { // 執行緒執行了中斷,返回 return; } if (time < 0 || (isAbsolute && time == 0) ) { //時間到了,或者是代表絕對時間,同時絕對時間是0(此時也是時間到了),直接返回,java中的parkUtil傳的就是絕對時間,其它都不是 return; } if (time > 0) { //傳入了時間引數,將其存入absTime,並解析成absTime->tv_sec(秒)和absTime->tv_nsec(納秒)儲存起來,存的是絕對時間 unpackTime(&absTime, isAbsolute, time); } //進入safepoint region,更改執行緒為阻塞狀態 ThreadBlockInVM tbivm(jt); if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) { //如果執行緒被中斷,或者是在嘗試給互斥變數加鎖的過程中,加鎖失敗,比如被其它執行緒鎖住了,直接返回 return; } //這裡表示執行緒互斥變數鎖成功了 int status ; if (_counter > 0){ // 有許可了,返回 _counter = 0; //對互斥變數解鎖 status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; OrderAccess::fence(); return; } #ifdef ASSERT // Don't catch signals while blocked; let the running threads have the signals. // (This allows a debugger to break into the running thread.) //debug用 sigset_t oldsigs; sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals(); pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs); #endif //將java執行緒所擁有的作業系統執行緒設定成 CONDVAR_WAIT狀態 ,表示在等待某個條件的發生 OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */); //將java的_suspend_equivalent引數設定為true jt->set_suspend_equivalent(); // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self() if (time == 0) { //把呼叫執行緒放到等待條件的執行緒列表上,然後對互斥變數解鎖,(這兩是原子操作),這個時候執行緒進入等待,當它返回時,互斥變數再次被鎖住。 //成功返回0,否則返回錯誤編號 status = pthread_cond_wait (_cond, _mutex) ; } else { //同pthread_cond_wait,只是多了一個超時,如果超時還沒有條件出現,那麼重新獲取胡吃兩然後返回錯誤碼 ETIMEDOUT status = os::Linux::safe_cond_timedwait (_cond, _mutex, &absTime) ; if (status != 0 && WorkAroundNPTLTimedWaitHang) { //WorkAroundNPTLTimedWaitHang 是JVM的執行引數,預設為1 //去除初始化 pthread_cond_destroy (_cond) ; //重新初始化 pthread_cond_init(_cond, NULL); } } assert_status(status == 0 || status == EINTR || status == ETIME || status == ETIMEDOUT, status, "cond_timedwait"); #ifdef ASSERT pthread_sigmask(SIG_SETMASK, &oldsigs, NULL); #endif //等待結束後,許可被消耗,改為0_counter = 0 ; //釋放互斥量的鎖 status = pthread_mutex_unlock(_mutex) ; assert_status(status == 0, status, "invariant") ; // If externally suspended while waiting, re-suspend if (jt->handle_special_suspend_equivalent_condition()) { jt->java_suspend_self(); } //加入記憶體屏障指令 OrderAccess::fence(); } 複製程式碼
從park的實現可以看到
- 無論是什麼情況返回,park方法本身都不會告知呼叫方返回的原因,所以呼叫的時候一般都會去判斷返回的場景,根據場景做不同的處理
- 執行緒的等待與掛起、喚醒等等就是使用的POSIX的執行緒API
- park的許可通過原子變數_count實現,當被消耗時,_count為0,只要擁有許可,就會立即返回
OrderAccess::fence();
在linux中實現原理如下
inline void OrderAccess::fence() { if (os::is_MP()) { #ifdef AMD64 // 沒有使用mfence,因為mfence有時候效能差於使用 locked addl __asm__ volatile ("lock; addl $0,0(%%rsp)" : : : "cc", "memory"); #else__asm__ volatile ("lock; addl $0,0(%%esp)" : : : "cc", "memory"); #endif} } 複製程式碼
ofollow,noindex">記憶體重排序網上的驗證
ThreadBlockInVM tbivm(jt)
這屬於C++新建變數的語法,也就是呼叫建構函式新建了一個變數,變數名為tbivm,引數為jt。類的實現為
class ThreadBlockInVM : public ThreadStateTransition { public: ThreadBlockInVM(JavaThread *thread) : ThreadStateTransition(thread) { // Once we are blocked vm expects stack to be walkable thread->frame_anchor()->make_walkable(thread); //把執行緒由執行狀態轉成阻塞狀態 trans_and_fence(_thread_in_vm, _thread_blocked); } ... }; 複製程式碼
_thread_in_vm 表示執行緒當前在VM中執行,_thread_blocked表示執行緒當前阻塞了,他們是 globalDefinitions.hpp
中定義的列舉
//這個列舉是用來追蹤執行緒在程式碼的那一塊執行,用來給 safepoint code使用,有4種重要的型別,_thread_new/_thread_in_native/_thread_in_vm/_thread_in_Java。形如xxx_trans的狀態都是中間狀態,表示執行緒正在由一種狀態變成另一種狀態,這種方式使得 safepoint code在處理執行緒狀態時,不需要對執行緒進行掛起,使得safe point code執行更快,而給定一個狀態,通過+1就可以得到他的轉換狀態 enum JavaThreadState { _thread_uninitialized=0, // should never happen (missing initialization) _thread_new=2, // just starting up, i.e., in process of being initialized _thread_new_trans=3, // corresponding transition state (not used, included for completeness) _thread_in_native=4, // running in native code. This is a safepoint region, since all oops will be in jobject handles _thread_in_native_trans=5, // corresponding transition state _thread_in_vm=6, // running in VM _thread_in_vm_trans=7, // corresponding transition state _thread_in_Java=8, //Executing either interpreted or compiled Java code running in Java or in stub code _thread_in_Java_trans=9, // corresponding transition state (not used, included for completeness) _thread_blocked= 10, // blocked in vm _thread_blocked_trans= 11, // corresponding transition state _thread_max_state= 12// maximum thread state+1 - used for statistics allocation }; 複製程式碼
父類ThreadStateTransition中定義trans_and_fence如下
void trans_and_fence(JavaThreadState from, JavaThreadState to) { transition_and_fence(_thread, from, to);} //_thread即建構函式傳進來de thread // transition_and_fence must be used on any thread state transition // where there might not be a Java call stub on the stack, in // particular on Windows where the Structured Exception Handler is // set up in the call stub. os::write_memory_serialize_page() can // fault and we can't recover from it on Windows without a SEH in // place. //transition_and_fence方法必須在任何執行緒狀態轉換的時候使用 static inline void transition_and_fence(JavaThread *thread, JavaThreadState from, JavaThreadState to) { assert(thread->thread_state() == from, "coming from wrong thread state"); assert((from & 1) == 0 && (to & 1) == 0, "odd numbers are transitions states"); //標識執行緒轉換中 thread->set_thread_state((JavaThreadState)(from + 1)); // 設定記憶體屏障,確保新的狀態能夠被VM 執行緒看到 if (os::is_MP()) { if (UseMembar) { // Force a fence between the write above and read below OrderAccess::fence(); } else { // Must use this rather than serialization page in particular on Windows InterfaceSupport::serialize_memory(thread); } } if (SafepointSynchronize::do_call_back()) { SafepointSynchronize::block(thread); } //執行緒狀態轉換成最終的狀態,對待這裡的場景就是阻塞 thread->set_thread_state(to); CHECK_UNHANDLED_OOPS_ONLY(thread->clear_unhandled_oops();) } 複製程式碼
作業系統執行緒狀態的一般取值
在osThread中給定了作業系統執行緒狀態的大致取值,它本身是依據平臺而定
enum ThreadState { ALLOCATED,// Memory has been allocated but not initialized INITIALIZED,// The thread has been initialized but yet started RUNNABLE,// Has been started and is runnable, but not necessarily running MONITOR_WAIT,// Waiting on a contended monitor lock CONDVAR_WAIT,// Waiting on a condition variable OBJECT_WAIT,// Waiting on an Object.wait() call BREAKPOINTED,// Suspended at breakpoint SLEEPING,// Thread.sleep() ZOMBIE// All done, but not reclaimed yet }; 複製程式碼
unpark 原始碼追蹤
實現如下
void Parker::unpark() { int s, status ; //給互斥量加鎖,如果互斥量已經上鎖,則阻塞到互斥量被解鎖 //park進入wait時,_mutex會被釋放 status = pthread_mutex_lock(_mutex); assert (status == 0, "invariant") ; //儲存舊的_counter s = _counter; //許可改為1,每次呼叫都設定成發放許可 _counter = 1; if (s < 1) { //之前沒有許可 if (WorkAroundNPTLTimedWaitHang) { //預設執行 ,釋放訊號,表明條件已經滿足,將喚醒等待的執行緒 status = pthread_cond_signal (_cond) ; assert (status == 0, "invariant") ; //釋放鎖 status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; } else { status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; status = pthread_cond_signal (_cond) ; assert (status == 0, "invariant") ; } } else { //一直有許可,釋放掉自己加的鎖,有許可park本身就返回了 pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; } } 複製程式碼
從原始碼可知unpark本身就是發放許可,並通知等待的執行緒,已經可以結束等待了
總結
- park/unpark能夠精準的對執行緒進行喚醒和等待。
- linux上的實現是通過POSIX的執行緒API的等待、喚醒、互斥、條件來進行實現的
- park在執行過程中首選看是否有許可,有許可就立馬返回,而每次unpark都會給許可設定成有,這意味著,可以先執行unpark,給予許可,再執行park立馬自行,適用於producer快,而consumer還未完成的場景參考地址