死磕Synchronized底層實現--輕量級鎖
本文為死磕Synchronized底層實現第三篇文章,內容為輕量級鎖實現。
輕量級鎖並不複雜,其中很多內容在 ofollow,noindex">偏向鎖 一文中已提及過, 與本文內容會有部分重疊 。
另外輕量級鎖的背景和基本流程在 概論 中已有講解。 強烈建議在看過兩篇文章的基礎下閱讀本文 。
本系列文章將對HotSpot的 synchronized
鎖實現進行全面分析,內容包括偏向鎖、輕量級鎖、重量級鎖的加鎖、解鎖、鎖升級流程的原理及原始碼分析,希望給在研究 synchronized
路上的同學一些幫助。主要包括以下幾篇文章:
死磕Synchronized底層實現--重量級鎖(待更新)
更多文章見個人部落格: github.com/farmerjohng…
本文分為兩個部分:
1.輕量級鎖獲取流程
2.輕量級鎖釋放流程
本人看的JVM版本是jdk8u,具體版本號以及程式碼可以在這裡看到。
輕量級鎖獲取流程
下面開始輕量級鎖獲取流程分析,程式碼在 bytecodeInterpreter.cpp#1816 。
CASE(_monitorenter): { oop lockee = STACK_OBJECT(-1); ... if (entry != NULL) { ... // 上面省略的程式碼中如果CAS操作失敗也會呼叫到InterpreterRuntime::monitorenter // traditional lightweight locking if (!success) { // 構建一個無鎖狀態的Displaced Mark Word markOop displaced = lockee->mark()->set_unlocked(); // 設定到Lock Record中去 entry->lock()->set_displaced_header(displaced); bool call_vm = UseHeavyMonitors; if (call_vm || Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) { // 如果CAS替換不成功,代表鎖物件不是無鎖狀態,這時候判斷下是不是鎖重入 // Is it simple recursive case? if (!call_vm && THREAD->is_lock_owned((address) displaced->clear_lock_bits())) { entry->lock()->set_displaced_header(NULL); } else { // CAS操作失敗則呼叫monitorenter CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception); } } } UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1); } else { istate->set_msg(more_monitors); UPDATE_PC_AND_RETURN(0); // Re-execute } } 複製程式碼
如果鎖物件不是偏向模式或已經偏向其他執行緒,則 success
為 false
。這時候會構建一個無鎖狀態的 mark word
設定到 Lock Record
中去,我們稱 Lock Record
中儲存物件 mark word
的欄位叫 Displaced Mark Word
。
如果當前鎖的狀態不是無鎖狀態,則CAS失敗。如果這是一次鎖重入,那直接將 Lock Record
的 Displaced Mark Word
設定為 null
。
我們看個demo,在該demo中重複3次獲得鎖,
synchronized(obj){ synchronized(obj){ synchronized(obj){ } } } 複製程式碼
假設鎖的狀態是輕量級鎖,下圖反應了 mark word
和執行緒棧中 Lock Record
的狀態,可以看到右邊執行緒棧中包含3個指向當前鎖物件的 Lock Record
。其中棧中最高位的 Lock Record
為第一次獲取鎖時分配的。其 Displaced Mark word
的值為鎖物件的加鎖前的 mark word
,之後的鎖重入會線上程棧中分配一個 Displaced Mark word
為 null
的 Lock Record
。

為什麼JVM選擇線上程棧中新增 Displaced Mark word
為null的 Lock Record
來表示重入計數呢?首先鎖重入次數是一定要記錄下來的,因為每次解鎖都需要對應一次加鎖,解鎖次數等於加鎖次數時,該鎖才真正的被釋放,也就是在解鎖時需要用到說鎖重入次數的。一個簡單的方案是將鎖重入次數記錄在物件頭的 mark word
中,但 mark word
的大小是有限的,已經存放不下該資訊了。另一個方案是隻建立一個 Lock Record
並在其中記錄重入次數,Hotspot沒有這樣做的原因我猜是考慮到效率有影響:每次重入獲得鎖都需要遍歷該執行緒的棧找到對應的 Lock Record
,然後修改它的值。
所以最終Hotspot選擇每次獲得鎖都新增一個 Lock Record
來表示鎖的重入。
接下來看看 InterpreterRuntime::monitorenter
方法
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem)) ... Handle h_obj(thread, elem->obj()); assert(Universe::heap()->is_in_reserved_or_null(h_obj()), "must be NULL or an object"); if (UseBiasedLocking) { // Retry fast entry if bias is revoked to avoid unnecessary inflation ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK); } else { ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK); } ... IRT_END 複製程式碼
fast_enter
的流程在偏向鎖一文已經分析過,如果當前是偏向模式且偏向的執行緒還在使用鎖,那會將鎖的 mark word
改為輕量級鎖的狀態,同時會將偏向的執行緒棧中的 Lock Record
修改為輕量級鎖對應的形式。程式碼位置在 biasedLocking.cpp#212 。
// 執行緒還存活則遍歷執行緒棧中所有的Lock Record GrowableArray<MonitorInfo*>* cached_monitor_info = get_or_compute_monitor_info(biased_thread); BasicLock* highest_lock = NULL; for (int i = 0; i < cached_monitor_info->length(); i++) { MonitorInfo* mon_info = cached_monitor_info->at(i); // 如果能找到對應的Lock Record說明偏向的執行緒還在執行同步程式碼塊中的程式碼 if (mon_info->owner() == obj) { ... // 需要升級為輕量級鎖,直接修改偏向執行緒棧中的Lock Record。為了處理鎖重入的case,在這裡將Lock Record的Displaced Mark Word設定為null,第一個Lock Record會在下面的程式碼中再處理 markOop mark = markOopDesc::encode((BasicLock*) NULL); highest_lock = mon_info->lock(); highest_lock->set_displaced_header(mark); } else { ... } } if (highest_lock != NULL) { // 修改第一個Lock Record為無鎖狀態,然後將obj的mark word設定為執行該Lock Record的指標 highest_lock->set_displaced_header(unbiased_prototype); obj->release_set_mark(markOopDesc::encode(highest_lock)); ... } else { ... } 複製程式碼
我們看 slow_enter
的流程。
void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) { markOop mark = obj->mark(); assert(!mark->has_bias_pattern(), "should not see bias pattern here"); // 如果是無鎖狀態 if (mark->is_neutral()) { //設定Displaced Mark Word並替換物件頭的mark word lock->set_displaced_header(mark); if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) { TEVENT (slow_enter: release stacklock) ; return ; } } else if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) { assert(lock != mark->locker(), "must not re-lock the same lock"); assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock"); // 如果是重入,則設定Displaced Mark Word為null lock->set_displaced_header(NULL); return; } ... // 走到這一步說明已經是存在多個執行緒競爭鎖了 需要膨脹為重量級鎖 lock->set_displaced_header(markOopDesc::unused_mark()); ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD); } 複製程式碼
輕量級鎖釋放流程
CASE(_monitorexit): { oop lockee = STACK_OBJECT(-1); CHECK_NULL(lockee); // derefing's lockee ought to provoke implicit null check // find our monitor slot BasicObjectLock* limit = istate->monitor_base(); BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base(); // 從低往高遍歷棧的Lock Record while (most_recent != limit ) { // 如果Lock Record關聯的是該鎖物件 if ((most_recent)->obj() == lockee) { BasicLock* lock = most_recent->lock(); markOop header = lock->displaced_header(); // 釋放Lock Record most_recent->set_obj(NULL); // 如果是偏向模式,僅僅釋放Lock Record就好了。否則要走輕量級鎖or重量級鎖的釋放流程 if (!lockee->mark()->has_bias_pattern()) { bool call_vm = UseHeavyMonitors; // header!=NULL說明不是重入,則需要將Displaced Mark Word CAS到物件頭的Mark Word if (header != NULL || call_vm) { if (call_vm || Atomic::cmpxchg_ptr(header, lockee->mark_addr(), lock) != lock) { // CAS失敗或者是重量級鎖則會走到這裡,先將obj還原,然後呼叫monitorexit方法 most_recent->set_obj(lockee); CALL_VM(InterpreterRuntime::monitorexit(THREAD, most_recent), handle_exception); } } } //執行下一條命令 UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1); } //處理下一條Lock Record most_recent++; } // Need to throw illegal monitor state exception CALL_VM(InterpreterRuntime::throw_illegal_monitor_state_exception(THREAD), handle_exception); ShouldNotReachHere(); } 複製程式碼
輕量級鎖釋放時需要將 Displaced Mark Word
替換到物件頭的 mark word
中。如果CAS失敗或者是重量級鎖則進入到 InterpreterRuntime::monitorexit
方法中。
//%note monitor_1 IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorexit(JavaThread* thread, BasicObjectLock* elem)) Handle h_obj(thread, elem->obj()); ... ObjectSynchronizer::slow_exit(h_obj(), elem->lock(), thread); // Free entry. This must be done here, since a pending exception might be installed on //釋放Lock Record elem->set_obj(NULL); ... IRT_END 複製程式碼
monitorexit
呼叫完 slow_exit
方法後,就釋放 Lock Record
。
void ObjectSynchronizer::slow_exit(oop object, BasicLock* lock, TRAPS) { fast_exit (object, lock, THREAD) ; } void ObjectSynchronizer::fast_exit(oop object, BasicLock* lock, TRAPS) { ... markOop dhw = lock->displaced_header(); markOop mark ; if (dhw == NULL) { // 重入鎖,什麼也不做 ... return ; } mark = object->mark() ; // 如果是mark word==Displaced Mark Word即輕量級鎖,CAS替換物件頭的mark word if (mark == (markOop) lock) { assert (dhw->is_neutral(), "invariant") ; if ((markOop) Atomic::cmpxchg_ptr (dhw, object->mark_addr(), mark) == mark) { TEVENT (fast_exit: release stacklock) ; return; } } //走到這裡說明是重量級鎖或者解鎖時發生了競爭,膨脹後呼叫重量級鎖的exit方法。 ObjectSynchronizer::inflate(THREAD, object)->exit (true, THREAD) ; } 複製程式碼
該方法中先判斷是不是輕量級鎖,如果是輕量級鎖則將替換 mark word
,否則膨脹為重量級鎖並呼叫 exit
方法,相關邏輯將在重量級鎖的文章中講解。