1. 程式人生 > >曹工談併發:Synchronized升級為重量級鎖後,靠什麼 API 來阻塞自己

曹工談併發:Synchronized升級為重量級鎖後,靠什麼 API 來阻塞自己

# 背景 因為想知道java中的關鍵字,對應的作業系統級別的api是啥,本來打算整理幾個我知道的出來,但是,尷尬的是,我發現java裡最重要的synchronized關鍵字,我就不知道它對應的api是什麼。 #redis中如何獲取鎖 在redis原始碼裡,執行緒如果要進入一個同步區(只能單執行緒進入的程式碼塊),會先獲取一個互斥量,如果獲取到了,則可以執行;否則,會阻塞在在這個互斥量上。 ##互斥量型別定義: ```c // 定義互斥量 static pthread_mutex_t bio_mutex[REDIS_BIO_NUM_OPS]; ``` 型別為 pthread_mutex_t。 ## 互斥量初始化: 使用互斥量前,要先初始化後,才能使用: ```c int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr); ``` pthread_mutex_init 這個函式是作業系統提供出來的api,不過應該是類unix系統才有這個。 shell中執行man pthread_mutex_init,可以看到: ```shell The pthread_mutex_init() function shall initialize the mutex referenced by mutex with attributes specified by attr. If attr is NULL, the default mutex attributes are used; the effect shall be the same as passing the address of a default mutex attributes object. Upon successful initialization, the state of the mutex becomes initialized and unlocked. ``` >pthread_mutex_init 初始化引數mutex指定的互斥量,,使用attr中指定的屬性。如果attr為空,使用預設引數。 > >成功初始化後,互斥量的狀態變為已初始化、未鎖定。 ##如何鎖定、解鎖互斥量 ```c // 1 pthread_mutex_lock(&bio_mutex[type]); // 2 pthread_mutex_unlock(&bio_mutex[type]); ``` * 1處,加鎖 * 2處,解鎖 我們可以看下linux下執行man pthread_mutex_lock後,看到的幫助: ```shell #include int pthread_mutex_lock(pthread_mutex_t *mutex); int pthread_mutex_trylock(pthread_mutex_t *mutex); int pthread_mutex_unlock(pthread_mutex_t *mutex); The mutex object referenced by mutex shall be locked by calling pthread_mutex_lock(). If the mutex is already locked, the calling thread shall block until the mutex becomes available. This operation shall return with the mutex object referenced by mutex in the locked state with the calling thread as its owner. ``` 可以重點看下上面那句註釋:呼叫pthread_mutex_lock,會導致引數mutext引用的互斥量被鎖定;如果該互斥量早已被鎖定,則呼叫執行緒將被阻塞。 ## redis中執行緒使用互斥量的例子 ```c void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) { struct bio_job *job = zmalloc(sizeof(*job)); job->time = time(NULL); job->arg1 = arg1; job->arg2 = arg2; job->arg3 = arg3; // 1 鎖定 pthread_mutex_lock(&bio_mutex[type]); // 將新工作推入佇列 listAddNodeTail(bio_jobs[type],job); bio_pending[type]++; pthread_cond_signal(&bio_condvar[type]); // 2 解鎖 pthread_mutex_unlock(&bio_mutex[type]); } ``` 如要了解更多互斥量,可以看看這篇文章,寫的不錯: [Linux C 程式設計——多執行緒和互斥鎖mutex](http://www.broadview.com.cn/article/297) # jdk中synchronized,不考慮輕鎖、偏向鎖,最終有用到前面的互斥量嗎 ## 參考文章 現在不用考慮各種優化,只考慮最終synchronized已經升級為重量級鎖之後的表現,會使用前面的互斥量嗎? 由於作者本身也是半桶水,搞了半天也沒把jdk的原始碼除錯環境搞起來,只能看看程式碼了,順便結合網路上的一些文章,不過結論應該可靠。 大家先可以參考這兩篇文章: [JVM:鎖實現(synchronized&JSR166)行為分析和相關原始碼](https://blog.csdn.net/luoyuyou/article/details/73741387) [JVM原始碼分析之synchronized實現](https://www.jianshu.com/p/c5058b6fe8e5) ##簡易流程梳理 我這裡也簡單列舉一下整個過程,就從下面這裡開始: ```c++ // Fast Monitor Enter/Exit // This the fast monitor enter. The interpreter and compiler use // some assembly copies of this code. Make sure update those code // if the following function is changed. // The implementation is extremely sensitive to race condition. Be careful. void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) { if (UseBiasedLocking) { if (!SafepointSynchronize::is_at_safepoint()) { BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD); if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) { return; } } else { assert(!attempt_rebias, "can not rebias toward VM thread"); BiasedLocking::revoke_at_safepoint(obj); } assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now"); } // 1 slow_enter (obj, lock, THREAD) ; } ``` 1處,前面都是偏向鎖相關的東西,先跳過,進入slow_enter。 ```c++ void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) { markOop mark = obj->mark(); assert(!mark->has_bias_pattern(), "should not see bias pattern here"); // 1 if (mark->is_neutral()) { // Anticipate successful CAS -- the ST of the displaced mark must // be visible <= the ST performed by the CAS. // 2 lock->
set_displaced_header(mark); // 3 if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) { TEVENT (slow_enter: release stacklock) ; return ; } // Fall through to inflate() ... } else if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) { assert(lock != mark->
locker(), "must not re-lock the same lock"); assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock"); lock->set_displaced_header(NULL); return; } // The object header will never be displaced to this lock, // so it does not matter what the value is, except that it // must be non-zero to avoid looking like a re-entrant lock, // and must not look locked either. lock->
set_displaced_header(markOopDesc::unused_mark()); // 2 ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD); } ``` * 1處,mark->is_neutral(),判斷物件頭,是否是無鎖狀態,neutral本來是中立的意思,這裡表示無鎖。 * 2處,如果無鎖,則呼叫lock的方法,lock本身是當前執行緒在棧內持有的物件,呼叫lock的set_displaced_header方法,引數為待加鎖物件(堆裡)的物件頭,意思是,把待加鎖物件的物件頭,設定到執行緒的棧內變數裡。 lock變數的class 型別如下: ```c++ class BasicLock VALUE_OBJ_CLASS_SPEC { friend class VMStructs; private: // 1 volatile markOop _displaced_header; public: markOop displaced_header() const { return _displaced_header; } // 2 void set_displaced_header(markOop header) { _displaced_header = header; } void print_on(outputStream* st) const; // move a basic lock (used during deoptimization void move_to(oop obj, BasicLock* dest); static int displaced_header_offset_in_bytes() { return offset_of(BasicLock, _displaced_header); } }; ``` 結合這裡的1、2處程式碼,上面那句,意思就是,把待加鎖物件的物件頭,儲存到lock變數 _displaced_header屬性。 * 3處,這裡比較複雜。 ```c++ Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark) ``` 這一句裡面,cmpxchg_ptr,定義為: ```c++ inline static intptr_t cmpxchg_ptr(intptr_t exchange_value, volatile intptr_t* dest, intptr_t compare_value); ``` 這一句就是平時我們說的那種cas操作,表示,如果第二個引數,dest指向的值,和第三個引數,compare_value的值相等,則把第二個引數中的值,設為引數1的值。 重點來看,第二個引數,是什麼鬼意思? Handle obj,說明obj是Handle型別, ```c++ class Handle VALUE_OBJ_CLASS_SPEC { private: oop* _handle; protected: // 2 oop obj() const { return *_handle; } public: //1 oop operator () () const { return obj(); } ``` 那麼,obj()的意思,應該就是,程式碼1處,應該是進行了操作符過載,所以會呼叫obj()方法,obj方法,請看2處,會返回 屬性_handle,當然,這裡對屬性進行了解引用。 所以,基本的意思就是,返回_handle這個屬性,執行的oop物件。 然後,再說說引數3,引數3就是mark。 ```c++ Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark) ``` 這個mark,是在程式碼開頭這樣被賦值的。 ```c++ markOop mark = obj->mark(); ``` 那,我們看看obj的mark方法就行。(不知道為啥,在Handle類裡,沒找到這個方法,不知道為啥,難道是有什麼特殊語法嗎。。。),不過這個mark的意思,肯定就是物件裡的物件頭無誤。 然後,第1個引數呢,就是lock,就是那個,如果上面的第二、三個引數相等,就將本引數,即,本執行緒,棧內物件lock的地址,設定到物件頭中,表示,該物件已經被本執行緒加鎖了。 * 4處,這裡表示如果是當前執行緒重複進入: ```c++ if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) { assert(lock != mark->locker(), "must not re-lock the same lock"); assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock"); lock->set_displaced_header(NULL); return; } ``` * 5處,開始膨脹為重量級鎖,並進入重量級鎖的爭奪 ```c++ // --接前面的程式碼 lock->set_displaced_header(markOopDesc::unused_mark()); ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD); ``` 這裡,會先通過呼叫ObjectSynchronizer::inflate(THREAD, obj()),來完成輕量級鎖到重量級鎖的升級。 ```c++ // Inflate light weight monitor to heavy weight monitor static ObjectMonitor* inflate(Thread * Self, oop obj); ``` 這個註釋就很清晰,升級輕鎖為重鎖,並且,會返回物件的monitor,即對應的重鎖物件。 膨脹的過程,太複雜,看不懂(心累。。),有興趣的可以看看這篇。 https://github.com/farmerjohngit/myblog/issues/15 膨脹後,返回了對應的monitor,然後進入其enter方法。 然後enter也是茫茫多的程式碼,根據網上部落格,即: [JVM:鎖實現(synchronized&JSR166)行為分析和相關原始碼](https://blog.csdn.net/luoyuyou/article/details/73741387) [JVM原始碼分析之synchronized實現](https://www.jianshu.com/p/c5058b6fe8e5) 會進入以下方法: ```c++ ObjectMonitor::EnterI ``` 這個裡面,也是茫茫多的程式碼,而且更可怕的是,註釋也多得很,快比程式碼多了。。 ```c++ void ATTR ObjectMonitor::EnterI (TRAPS) { Thread * Self = THREAD ; assert (Self->is_Java_thread(), "invariant") ; assert (((JavaThread *) Self)->thread_state() == _thread_blocked , "invariant") ; // 1 Try the lock - TATAS if (TryLock (Self) > 0) { assert (_succ != Self , "invariant") ; assert (_owner == Self , "invariant") ; assert (_Responsible != Self , "invariant") ; return ; } DeferredInitialize () ; //2 We try one round of spinning *before* enqueueing Self. if (TrySpin (Self) > 0) { assert (_owner == Self , "invariant") ; assert (_succ != Self , "invariant") ; assert (_Responsible != Self , "invariant") ; return ; } //3 The Spin failed -- Enqueue and park the thread ... //4 Enqueue "Self" on ObjectMonitor's _cxq ObjectWaiter node(Self) ; Self->_ParkEvent->reset() ; node._prev = (ObjectWaiter *) 0xBAD ; node.TState = ObjectWaiter::TS_CXQ ; // 5 Push "Self" onto the front of the _cxq. // Once on cxq/EntryList, Self stays on-queue until it acquires the lock. // Note that spinning tends to reduce the rate at which threads // enqueue and dequeue on EntryList|cxq. ObjectWaiter * nxt ; for (;;) { node._next = nxt = _cxq ; if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ; // Interference - the CAS failed because _cxq changed. Just retry. // As an optional optimization we retry the lock. if (TryLock (Self) > 0) { assert (_succ != Self , "invariant") ; assert (_owner == Self , "invariant") ; assert (_Responsible != Self , "invariant") ; return ; } } TEVENT (Inflated enter - Contention) ; int nWakeups = 0 ; int RecheckInterval = 1 ; for (;;) { if (TryLock (Self) > 0) break ; assert (_owner != Self, "invariant") ; if ((SyncFlags & 2) && _Responsible == NULL) { Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ; } //6 park self if (_Responsible == Self || (SyncFlags & 1)) { TEVENT (Inflated enter - park TIMED) ; Self->_ParkEvent->park ((jlong) RecheckInterval) ; // Increase the RecheckInterval, but clamp the value. RecheckInterval *= 8 ; if (RecheckInterval > 1000) RecheckInterval = 1000 ; } else { TEVENT (Inflated enter - park UNTIMED) ; // 7 Self->_ParkEvent->park() ; } if (TryLock(Self) > 0) break ; ... } ... return ; } ``` * 上面的方法不是完整的,因為太長,刪減了,只留了我們要關注的那部分,1處,嘗試獲取鎖 * 2處,嘗試自旋一下 * 3處,自旋也失敗了,準備進入佇列,並阻塞自己,類似於aqs的實現 * 4、5處都是入隊的相關操作 * 6處,阻塞自己,判斷是阻塞一陣時間,還是一直阻塞。 * 7處,阻塞自己。 我們這裡,重點看6處,阻塞自己,採用的方法為: ```c++ // self的定義,型別為執行緒 Thread * Self = THREAD ; ... Self->_ParkEvent->park() ; ``` 我們看看這個類: ```c++ thread.hpp public: volatile intptr_t _Stalled ; volatile int _TypeTag ; // 1 ParkEvent * _ParkEvent ; // for synchronized() // 2 ParkEvent * _SleepEvent ; // for Thread.sleep // 3 ParkEvent * _MutexEvent ; // for native internal Mutex/Monitor // 4 ParkEvent * _MuxEvent ; // for low-level muxAcquire-muxRelease ``` 有點意思,竟然有好幾個ParkEvent型別的屬性,第一個,看註釋,就是用來,synchronized使用的; 第二個是Thread.sleep使用的,第三個是jdk自身的native方法用的 ## ParkEvent是什麼 [JVM:鎖實現(synchronized&JSR166)行為分析和相關原始碼](https://blog.csdn.net/luoyuyou/article/details/73741387) 大家可以再看下這篇,因為感覺寫得不錯。 這個類本身的屬性,看得一知半解,但是它的父類,是這個。 ```c++ class ParkEvent : public os::PlatformEvent ``` 這個PlatformEvent有意思的很,它是平臺相關的。 ![](https://img2020.cnblogs.com/blog/519126/202005/519126-20200503174359033-1485626865.png) 可以看到,它有5個同名的類,分別在5個檔案,分別是什麼os_windows.hpp、os_linux.hpp、os_solaris.hpp,盲猜也知道,是不同作業系統下的實現。 我們看看linux下, ```c++ class PlatformEvent : public CHeapObj { private: double CachePad [4] ; // increase odds that _mutex is sole occupant of cache line volatile int _Event ; volatile int _nParked ; // 1 pthread_mutex_t _mutex [1] ; pthread_cond_t _cond [1] ; double PostPad [2] ; Thread * _Assoc ; public: // TODO-FIXME: make dtor private ~PlatformEvent() { guarantee (0, "invariant") ; } public: PlatformEvent() { int status; status = pthread_cond_init (_cond, os::Linux::condAttr()); assert_status(status == 0, status, "cond_init"); status = pthread_mutex_init (_mutex, NULL); assert_status(status == 0, status, "mutex_init"); _Event = 0 ; _nParked = 0 ; _Assoc = NULL ; } // Use caution with reset() and fired() -- they may require MEMBARs void reset() { _Event = 0 ; } int fired() { return _Event; } void park () ; void unpark () ; int TryPark () ; int park (jlong millis) ; // relative timed-wait only void SetAssociation (Thread * a) { _Assoc = a ; } } ; ``` 看到1處了嗎,原來,阻塞自己還是用了pthread_mutex_t啊。 看看park怎麼實現的: ```c++ void os::PlatformEvent::park() { // AKA "down()" // Invariant: Only the thread associated with the Event/PlatformEvent // may call park(). // TODO: assert that _Assoc != NULL or _Assoc == Self int v ; for (;;) { v = _Event ; if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ; } guarantee (v >= 0, "invariant") ; if (v == 0) { //1 Do this the hard way by blocking ... int status = pthread_mutex_lock(_mutex); assert_status(status == 0, status, "mutex_lock"); guarantee (_nParked == 0, "invariant") ; ++ _nParked ; while (_Event < 0) { status = pthread_cond_wait(_cond, _mutex); // for some reason, under 2.7 lwp_cond_wait() may return ETIME ... // Treat this the same as if the wait was interrupted if (status == ETIME) { status = EINTR; } assert_status(status == 0 || status == EINTR, status, "cond_wait"); } -- _nParked ; _Event = 0 ; // 2 status = pthread_mutex_unlock(_mutex); ... } guarantee (_Event >= 0, "invariant") ; } ``` 1處,加鎖; 2處,解鎖。 所以,我們本文的答案找到了。 看看其他平臺下呢? ![](https://img2020.cnblogs.com/blog/519126/202005/519126-20200503174914561-925838761.png) 其他平臺就不一一截圖了,除了windows,都是用的pthread_mutex_lock。 # 總結 為了這個答案,花了一天時間,值得嗎,有點不值得,時間花太長了,不過也值得,至少問題解決了。 不過,沒把除錯環境搭起來太慘了,各種標頭檔案找不到,跳轉都點不動,基本上都是全文搜尋。。。 謝謝