關於同步的一點思考-下
在ofollow,noindex"><關於同步的一點思考-上> 中介紹了幾種實現鎖的方式以及linux底層futex的實現原理 ReentrantLock的實現網上有很多文章了,本篇文章會簡單介紹下其java層實現,重點放在分析競爭鎖失敗後如何阻塞執行緒。 因篇幅有限,synchronized的內容將會放到下篇文章。
更多文章見個人部落格:github.com/farmerjohng…
Java Lock的實現
ReentrantLock是jdk中常用的鎖實現,其實現邏輯主語基於AQS(juc包中的大多數同步類實現都是基於AQS);接下來會簡單介紹AQS的大致原理,關於其實現細節以及各種應用,之後會寫一篇文章具體分析。
AQS
AQS是類AbstractQueuedSynchronizer.java的簡稱,JUC包下的ReentrantLock、CyclicBarrier、CountdownLatch都使用到了AQS。
其大致原理如下:
- AQS維護一個叫做state的int型變數和一個雙向連結串列,state用來表示同步狀態,雙向連結串列儲存的是等待鎖的執行緒
- 加鎖時首先呼叫tryAcquire嘗試獲得鎖,如果獲得鎖失敗,則將執行緒插入到雙向連結串列中,並呼叫LockSupport.park()方法阻塞當前執行緒。
- 釋放鎖時呼叫LockSupport.unpark()喚起連結串列中的第一個節點的執行緒。被喚起的執行緒會重新走一遍競爭鎖的流程。
其中tryAcquire方法是抽象方法,具體實現取決於實現類,我們常說的公平鎖和非公平鎖的區別就在於該方法的實現。
ReentrantLock
ReentrantLock分為公平鎖和非公平鎖,我們只看公平鎖。 ReentrantLock.lock會呼叫到ReentrantLock#FairSync.lock中:
FairSync.java
static final class FairSync extends Sync { final void lock() { acquire(1); } /** * Fair version of tryAcquire.Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } 複製程式碼
AbstractQueuedSynchronizer.java
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 複製程式碼
可以看到FairSync.lock呼叫了AQS的acquire
方法,而在acquire
中首先呼叫tryAcquire
嘗試獲得鎖,以下兩種情況返回true:
重入
如果tryAcquire
失敗則呼叫acquireQueued
阻塞當前執行緒。acquireQueued
最終會呼叫到LockSupport.park()
阻塞執行緒。
LockSupport.park
個人認為,要深入理解鎖機制,一個很重要的點是理解系統是如何阻塞執行緒的。
LockSupport.java
public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); UNSAFE.park(false, 0L); setBlocker(t, null); } 複製程式碼
park
方法的引數blocker是用於負責這次阻塞的同步物件,在AQS的呼叫中,這個物件就是AQS本身。我們知道synchronized關鍵字是需要指定一個物件的(如果作用於方法上則是當前物件或當前類),與之類似blocker就是LockSupport指定的物件。
park
方法呼叫了native方法UNSAFE.park
,第一個引數代表第二個引數是否是絕對時間,第二個引數代表最長阻塞時間。
其實現如下,只保留核心程式碼,完整程式碼看檢視unsafe.cpp
Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time){ ... thread->parker()->park(isAbsolute != 0, time); ... } 複製程式碼
park方法在os_linux.cpp中(其他作業系統的實現在os_xxx中)
void Parker::park(bool isAbsolute, jlong time) { ... //獲得當前執行緒 Thread* thread = Thread::current(); assert(thread->is_Java_thread(), "Must be JavaThread"); JavaThread *jt = (JavaThread *)thread; //如果當前執行緒被設定了interrupted標記,則直接返回 if (Thread::is_interrupted(thread, false)) { return; } if (time > 0) { //unpacktime中根據isAbsolute的值來填充absTime結構體,isAbsolute為true時,time代表絕對時間且單位是毫秒,否則time是相對時間且單位是納秒 //absTime.tvsec代表了對於時間的秒 //absTime.tv_nsec代表對應時間的納秒 unpackTime(&absTime, isAbsolute, time); } //呼叫mutex trylock方法 if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) { return; } //_counter是一個許可的數量,跟ReentrantLock裡定義的許可變數基本都是一個原理。 unpack方法呼叫時會將_counter賦值為1。 //_counter>0代表已經有人呼叫了unpark,所以不用阻塞 int status ; if (_counter > 0){ // no wait needed _counter = 0; //釋放mutex鎖 status = pthread_mutex_unlock(_mutex); return; } //設定執行緒狀態為CONDVAR_WAIT OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */); ... //等待 _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX; pthread_cond_timedwait(&_cond[_cur_index], _mutex,&absTime); ... //釋放mutex鎖 status = pthread_mutex_unlock(_mutex) ; } 複製程式碼
park
方法用POSIX的pthread_cond_timedwait
方法阻塞執行緒,呼叫pthread_cond_timedwait
前需要先獲得鎖,因此park
主要流程為:
pthread_mutex_trylock pthread_cond_timedwait pthread_mutex_unlock
另外,在阻塞當前執行緒前,會呼叫OSThreadWaitState
的構造方法將執行緒狀態設定為CONDVAR_WAIT
,在Jvm中Thread狀態列舉如下
enum ThreadState { ALLOCATED,// Memory has been allocated but not initialized INITIALIZED,// The thread has been initialized but yet started RUNNABLE,// Has been started and is runnable, but not necessarily running MONITOR_WAIT,// Waiting on a contended monitor lock CONDVAR_WAIT,// Waiting on a condition variable OBJECT_WAIT,// Waiting on an Object.wait() call BREAKPOINTED,// Suspended at breakpoint SLEEPING,// Thread.sleep() ZOMBIE// All done, but not reclaimed yet }; 複製程式碼
Linux的timedwait
由上文我們可以知道LockSupport.park方法最終是由POSIX的pthread_cond_timedwait
的方法實現的。
我們現在就進一步看看pthread_mutex_trylock
,pthread_cond_timedwait
,pthread_mutex_unlock
這幾個方法是如何實現的。
Linux系統中相關程式碼在glibc庫中。
pthread_mutex_trylock
先看trylock的實現,
程式碼在glibc的pthread_mutex_trylock.c
檔案中,該方法程式碼很多,我們只看主要程式碼
//pthread_mutex_t是posix中的互斥鎖結構體 int __pthread_mutex_trylock (mutex) pthread_mutex_t *mutex; { int oldval; pid_t id = THREAD_GETMEM (THREAD_SELF, tid); switch (__builtin_expect (PTHREAD_MUTEX_TYPE (mutex), PTHREAD_MUTEX_TIMED_NP)) { case PTHREAD_MUTEX_ERRORCHECK_NP: case PTHREAD_MUTEX_TIMED_NP: case PTHREAD_MUTEX_ADAPTIVE_NP: /* Normal mutex.*/ if (lll_trylock (mutex->__data.__lock) != 0) break; /* Record the ownership.*/ mutex->__data.__owner = id; ++mutex->__data.__nusers; return 0; } } //以下程式碼在lowlevellock.h中 #define __lll_trylock(futex) \ (atomic_compare_and_exchange_val_acq (futex, 1, 0) != 0) #define lll_trylock(futex) __lll_trylock (&(futex)) 複製程式碼
mutex預設用的是PTHREAD_MUTEX_NORMAL
型別(與PTHREAD_MUTEX_TIMED_NP
相同);
因此會先呼叫lll_trylock
方法,lll_trylock
實際上是一個cas操作,如果mutex->__data.__lock==0則將其修改為1並返回0,否則返回1。
如果成功,則更改mutex中的owner為當前執行緒。
pthread_mutex_unlock
pthread_mutex_unlock.c
int internal_function attribute_hidden __pthread_mutex_unlock_usercnt (mutex, decr) pthread_mutex_t *mutex; int decr; { if (__builtin_expect (type, PTHREAD_MUTEX_TIMED_NP) == PTHREAD_MUTEX_TIMED_NP) { /* Always reset the owner field.*/ normal: mutex->__data.__owner = 0; if (decr) /* One less user.*/ --mutex->__data.__nusers; /* Unlock.*/ lll_unlock (mutex->__data.__lock, PTHREAD_MUTEX_PSHARED (mutex)); return 0; } } 複製程式碼
pthread_mutex_unlock
將mutex中的owner清空,並呼叫了lll_unlock
方法
lowlevellock.h
#define __lll_unlock(futex, private)\ ((void) ({\ int *__futex = (futex);\ int __val = atomic_exchange_rel (__futex, 0);\ \ if (__builtin_expect (__val > 1, 0))\ lll_futex_wake (__futex, 1, private);\ })) #define lll_unlock(futex, private) __lll_unlock(&(futex), private) #define lll_futex_wake(ftx, nr, private)\ ({\ DO_INLINE_SYSCALL(futex, 3, (long) (ftx),\ __lll_private_flag (FUTEX_WAKE, private),\ (int) (nr));\ _r10 == -1 ? -_retval : _retval;\ }) 複製程式碼
lll_unlock
分為兩個步驟:
- 將futex設定為0並拿到設定之前的值(使用者態操作)
-
如果futex之前的值>1,代表存在鎖衝突,也就是說有執行緒呼叫了
FUTEX_WAIT
在休眠,所以通過呼叫系統函式FUTEX_WAKE
喚醒休眠執行緒
FUTEX_WAKE
在上一篇文章有分析,futex機制的核心是當獲得鎖時,嘗試cas更改一個int型變數(使用者態操作),如果integer原始值是0,則修改成功,該執行緒獲得鎖,否則就將當期執行緒放入到 wait queue中,wait queue中的執行緒不會被系統排程(核心態操作)。
futex變數的值有3種:0代表當前鎖空閒,1代表有執行緒持有當前鎖,2代表存在鎖衝突。futex的值初始化時是0;當呼叫try_lock的時候會利用cas操作改為1(見上面的trylock函式);當呼叫lll_lock
時,如果不存在鎖衝突,則將其改為1,否則改為2。
#define __lll_lock(futex, private)\ ((void) ({\ int *__futex = (futex);\ if (__builtin_expect (atomic_compare_and_exchange_bool_acq (__futex,\ 1, 0), 0))\ {\ if (__builtin_constant_p (private) && (private) == LLL_PRIVATE)\ __lll_lock_wait_private (__futex);\ else\ __lll_lock_wait (__futex, private);\ }\ })) #define lll_lock(futex, private) __lll_lock (&(futex), private) void __lll_lock_wait_private (int *futex) { //第一次進來的時候futex==1,所以不會走這個if if (*futex == 2) lll_futex_wait (futex, 2, LLL_PRIVATE); //在這裡會把futex設定成2,並呼叫futex_wait讓當前執行緒等待 while (atomic_exchange_acq (futex, 2) != 0) lll_futex_wait (futex, 2, LLL_PRIVATE); } 複製程式碼
pthread_cond_timedwait
pthread_cond_timedwait
用於阻塞執行緒,實現執行緒等待,
程式碼在glibc的pthread_cond_timedwait.c
檔案中,程式碼較長,你可以先簡單過一遍,看完下面的分析再重新讀一遍程式碼
int int __pthread_cond_timedwait (cond, mutex, abstime) pthread_cond_t *cond; pthread_mutex_t *mutex; const struct timespec *abstime; { struct _pthread_cleanup_buffer buffer; struct _condvar_cleanup_buffer cbuffer; int result = 0; /* Catch invalid parameters.*/ if (abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000) return EINVAL; int pshared = (cond->__data.__mutex == (void *) ~0l) ? LLL_SHARED : LLL_PRIVATE; //1.獲得cond鎖 lll_lock (cond->__data.__lock, pshared); //2.釋放mutex鎖 int err = __pthread_mutex_unlock_usercnt (mutex, 0); if (err) { lll_unlock (cond->__data.__lock, pshared); return err; } /* We have one new user of the condvar.*/ //每執行一次wait(pthread_cond_timedwait/pthread_cond_wait),__total_seq就會+1 ++cond->__data.__total_seq; //用來執行futex_wait的變數 ++cond->__data.__futex; //標識該cond還有多少執行緒在使用,pthread_cond_destroy需要等待所有的操作完成 cond->__data.__nwaiters += 1 << COND_NWAITERS_SHIFT; /* Remember the mutex we are using here.If there is already a different address store this is a bad user bug.Do not store anything for pshared condvars.*/ //儲存mutex鎖 if (cond->__data.__mutex != (void *) ~0l) cond->__data.__mutex = mutex; /* Prepare structure passed to cancellation handler.*/ cbuffer.cond = cond; cbuffer.mutex = mutex; /* Before we block we enable cancellation.Therefore we have to install a cancellation handler.*/ __pthread_cleanup_push (&buffer, __condvar_cleanup, &cbuffer); /* The current values of the wakeup counter.The "woken" counter must exceed this value.*/ //記錄futex_wait前的__wakeup_seq(為該cond上執行了多少次sign操作+timeout次數)和__broadcast_seq(代表在該cond上執行了多少次broadcast) unsigned long long int val; unsigned long long int seq; val = seq = cond->__data.__wakeup_seq; /* Remember the broadcast counter.*/ cbuffer.bc_seq = cond->__data.__broadcast_seq; while (1) { //3.計算要wait的相對時間 struct timespec rt; { #ifdef __NR_clock_gettime INTERNAL_SYSCALL_DECL (err); int ret; ret = INTERNAL_VSYSCALL (clock_gettime, err, 2, (cond->__data.__nwaiters & ((1 << COND_NWAITERS_SHIFT) - 1)), &rt); # ifndef __ASSUME_POSIX_TIMERS if (__builtin_expect (INTERNAL_SYSCALL_ERROR_P (ret, err), 0)) { struct timeval tv; (void) gettimeofday (&tv, NULL); /* Convert the absolute timeout value to a relative timeout.*/ rt.tv_sec = abstime->tv_sec - tv.tv_sec; rt.tv_nsec = abstime->tv_nsec - tv.tv_usec * 1000; } else # endif { /* Convert the absolute timeout value to a relative timeout.*/ rt.tv_sec = abstime->tv_sec - rt.tv_sec; rt.tv_nsec = abstime->tv_nsec - rt.tv_nsec; } #else /* Get the current time.So far we support only one clock.*/ struct timeval tv; (void) gettimeofday (&tv, NULL); /* Convert the absolute timeout value to a relative timeout.*/ rt.tv_sec = abstime->tv_sec - tv.tv_sec; rt.tv_nsec = abstime->tv_nsec - tv.tv_usec * 1000; #endif } if (rt.tv_nsec < 0) { rt.tv_nsec += 1000000000; --rt.tv_sec; } /*---計算要wait的相對時間 end---- */ //是否超時 /* Did we already time out?*/ if (__builtin_expect (rt.tv_sec < 0, 0)) { //被broadcast喚醒,這裡疑問的是,為什麼不需要判斷__wakeup_seq? if (cbuffer.bc_seq != cond->__data.__broadcast_seq) goto bc_out; goto timeout; } unsigned int futex_val = cond->__data.__futex; //4.釋放cond鎖,準備wait lll_unlock (cond->__data.__lock, pshared); /* Enable asynchronous cancellation.Required by the standard.*/ cbuffer.oldtype = __pthread_enable_asynccancel (); //5.呼叫futex_wait /* Wait until woken by signal or broadcast.*/ err = lll_futex_timed_wait (&cond->__data.__futex, futex_val, &rt, pshared); /* Disable asynchronous cancellation.*/ __pthread_disable_asynccancel (cbuffer.oldtype); //6.重新獲得cond鎖,因為又要訪問&修改cond的資料了 lll_lock (cond->__data.__lock, pshared); //__broadcast_seq值發生改變,代表發生了有執行緒呼叫了廣播 if (cbuffer.bc_seq != cond->__data.__broadcast_seq) goto bc_out; //判斷是否是被sign喚醒的,sign會增加__wakeup_seq //第二個條件cond->__data.__woken_seq != val的意義在於 //可能兩個執行緒A、B在wait,一個執行緒呼叫了sign導致A被喚醒,這時B因為超時被喚醒 //對於B執行緒來說,執行到這裡時第一個條件也是滿足的,從而導致上層拿到的result不是超時 //所以這裡需要判斷下__woken_seq(即該cond已經被喚醒的執行緒數)是否等於__wakeup_seq(sign執行次數+timeout次數) val = cond->__data.__wakeup_seq; if (val != seq && cond->__data.__woken_seq != val) break; /* Not woken yet.Maybe the time expired?*/ if (__builtin_expect (err == -ETIMEDOUT, 0)) { timeout: /* Yep.Adjust the counters.*/ ++cond->__data.__wakeup_seq; ++cond->__data.__futex; /* The error value.*/ result = ETIMEDOUT; break; } } //一個執行緒已經醒了所以這裡__woken_seq +1 ++cond->__data.__woken_seq; bc_out: // cond->__data.__nwaiters -= 1 << COND_NWAITERS_SHIFT; /* If pthread_cond_destroy was called on this variable already, notify the pthread_cond_destroy caller all waiters have left and it can be successfully destroyed.*/ if (cond->__data.__total_seq == -1ULL && cond->__data.__nwaiters < (1 << COND_NWAITERS_SHIFT)) lll_futex_wake (&cond->__data.__nwaiters, 1, pshared); //9.cond資料修改完畢,釋放鎖 lll_unlock (cond->__data.__lock, pshared); /* The cancellation handling is back to normal, remove the handler.*/ __pthread_cleanup_pop (&buffer, 0); //10.重新獲得mutex鎖 err = __pthread_mutex_cond_lock (mutex); return err ?: result; } 複製程式碼
上面的程式碼雖然加了註釋,但相信大多數人第一次看都看不懂。
我們來簡單梳理下,上面程式碼有兩把鎖,一把是mutex鎖,一把cond鎖。另外,在呼叫pthread_cond_timedwait
前後必須呼叫pthread_mutex_lock(&mutex);
和pthread_mutex_unlock(&mutex);
加/解mutex鎖。
因此pthread_cond_timedwait
的使用大致分為幾個流程:
pthread_cond_timedwait pthread_cond_timedwait
看到這裡,你可能有幾點疑問:為什麼需要兩把鎖?mutex鎖和cond鎖的作用是什麼?
mutex鎖
說mutex鎖的作用之前,我們回顧一下java的Object.wait的使用。Object.wait必須是在synchronized同步塊中使用。試想下如果不加synchronized也能執行Object.wait的話會存在什麼問題?
Object condObj=new Object(); voilate int flag = 0; public void waitTest(){ if(flag == 0){ condObj.wait(); } } public void notifyTest(){ flag=1; condObj.notify(); } 複製程式碼
如上程式碼,A執行緒呼叫waitTest,這時flag==0,所以準備呼叫wait方法進行休眠,這時B執行緒開始執行,呼叫notifyTest將flag置為1,並呼叫notify方法,注意:此時A執行緒還沒呼叫wait,所以notfiy沒有喚醒任何執行緒。然後A執行緒繼續執行,呼叫wait方法進行休眠,而之後不會有人來喚醒A執行緒,A執行緒將永久wait下去!
Object condObj=new Object(); voilate int flag = 0; public void waitTest(){ synchronized(condObj){ if(flag == 0){ condObj.wait(); } } } public void notifyTest(){ synchronized(condObj){ flag=1; condObj.notify(); } } 複製程式碼
在有鎖保護下的情況下, 當呼叫condObj.wait時,flag一定是等於0的,不會存在一直wait的問題。
回到pthread_cond_timedwait
,其需要加mutex鎖的原因就呼之欲出了:保證wait和其wait條件的原子性
不管是glibc的pthread_cond_timedwait
/pthread_cond_signal
還是java層的Object.wait
/Object.notify
,Jdk AQS的Condition.await
/Condition.signal
,所有的Condition機制都需要在加鎖環境下才能使用,其根本原因就是要保證進行執行緒休眠時,條件變數是沒有被篡改的。
注意下mutex鎖釋放的時機,回顧上文中pthread_cond_timedwait
的流程,在第2步時就釋放了mutex鎖,之後呼叫futex_wait
進行休眠,為什麼要在休眠前就釋放mutex鎖呢?原因也很簡單:如果不釋放mutex鎖就開始休眠,那其他執行緒就永遠無法呼叫signal方法將休眠執行緒喚醒(因為呼叫signal方法前需要獲得mutex鎖)。
線上程被喚醒之後還要在第10步中重新獲得mutex鎖是為了保證鎖的語義(思考下如果不重新獲得mutex鎖會發生什麼)。
cond鎖
cond鎖的作用其實很簡單: 保證物件cond->data
的執行緒安全。
在pthread_cond_timedwait
時需要修改cond->data
的資料,如增加__total_seq(在這個cond上一共執行過多少次wait)增加__nwaiters(現在還有多少個執行緒在wait這個cond),所有在修改及訪問cond->data
時需要加cond鎖。
這裡我沒想明白的一點是,用mutex鎖也能保證cond->data
修改的執行緒安全,只要晚一點釋放mutex鎖就行了。為什麼要先釋放mutex,重新獲得cond來保證執行緒安全? 是為了避免mutex鎖住的範圍太大嗎?
如何喚醒休眠執行緒
喚醒休眠執行緒的程式碼比較簡單,主要就是呼叫lll_futex_wake。
int __pthread_cond_signal (cond) pthread_cond_t *cond; { int pshared = (cond->__data.__mutex == (void *) ~0l) ? LLL_SHARED : LLL_PRIVATE; //因為要操作cond的資料,所以要加鎖 lll_lock (cond->__data.__lock, pshared); /* Are there any waiters to be woken?*/ if (cond->__data.__total_seq > cond->__data.__wakeup_seq) { //__wakeup_seq為執行sign與timeout次數的和 ++cond->__data.__wakeup_seq; ++cond->__data.__futex; ... //喚醒wait的執行緒 lll_futex_wake (&cond->__data.__futex, 1, pshared); } /* We are done.*/ lll_unlock (cond->__data.__lock, pshared); return 0; } 複製程式碼
End
本文對Java簡單介紹了ReentrantLock實現原理,對LockSupport.park底層實現pthread_cond_timedwait
機制做了詳細分析。
看完這篇文章,你可能還會有疑問:Synchronized鎖的實現和ReentrantLock是一樣的嗎?Thread.sleep/Object.wait休眠執行緒的原理和LockSupport.park有什麼區別?linux核心層的futex的具體是如何實現的?
這些問題,之後的文章會一一解答,盡請期待~