深入理解 Java 鎖與執行緒阻塞
相信大家對執行緒鎖和執行緒阻塞都很瞭解,無非就是 synchronized, wait/notify 等, 但是你有仔細想過 Java 虛擬機器是如何實現鎖和阻塞的呢?它們之間又有哪些聯絡呢?如果感興趣的話請接著往下看。
為保障多執行緒下處理共享資料的安全性,Java 語言給我們提供了執行緒鎖,保證同一時刻只有一個執行緒能處理共享資料。當一個鎖被某個執行緒持有的時候,另一個執行緒嘗試去獲取這個鎖將產生執行緒阻塞,直到持有鎖的執行緒釋放了該鎖。
除了搶佔鎖的時候會出現執行緒阻塞,另外還有一些方法也會產生執行緒阻塞,比如: Object.wait(), Thread.sleep(), ArrayBlockingQueue.put() 等等,他們都有一個共同特點:不消耗 CPU 時間片。另外值得指出的是 Object.wait() 會釋放持有的鎖,而 Thread.sleep() 不會,相信這點大家都清楚。 當然 while(true){ } 也能產生阻塞執行緒的效果,自旋鎖就是使用迴圈,配合 CAS (compareAndSet) 實現的,這個不在我們的討論之列。
相信大家對執行緒鎖都很熟悉,目前有兩種方法,準確來說是三種,synchronized 方法,synchronized 區塊,ReentrantLock。先說 synchronized,程式碼如下:
public class Lock { public static void synchronized print() { System.out.println("method synchronized"); } public static void print2() { synchronized(Lock.class) { System.out.println("synchronized"); } } public static void main(String[] args) { Lock.print(); Lock.print2(); } }
編譯後通過如下命令檢視其位元組碼
javap -c -v Lock
其中節選方法一(Lock.print)的位元組碼如下:
public static synchronized void print(); descriptor: ()V flags: ACC_PUBLIC, ACC_STATIC, ACC_SYNCHRONIZED Code: stack=2, locals=0, args_size=0 0: getstatic#2// Field java/lang/System.out:Ljava/io/PrintStream; 3: ldc#3// String method synchronized 5: invokevirtual #4// Method java/io/PrintStream.println:(Ljava/lang/String;)V 8: return }
可以看到方法表的訪問標誌位 (flags) 中多了個 ACC_SYNCHRONIZED,然後看位元組碼指令區域 (Code) ,和普通方法沒任何差別, 猜測 Java 虛擬機器通過檢查方法表中是否存在標誌位 ACC_SYNCHRONIZED 來決定是否需要獲取鎖,至於獲取鎖的原理後文會提到。
然後看第二個使用 synchronized 區塊的方法(Lock.print2)位元組碼:
public static void print2(); descriptor: ()V flags: ACC_PUBLIC, ACC_STATIC Code: stack=2, locals=2, args_size=0 0: ldc #5// 將鎖物件 Lock.class 入棧 2: dup // 複製一份,此時棧中有兩個 Lock.class 3: astore_0 // 出棧一個 Lock.class 物件儲存到區域性變量表 Slot 1 中 4: monitorenter // 以棧頂元素 Lock.class 作為鎖,開始同步 5: getstatic #2// 5-10 呼叫 System.out.println("synchronized"); 8: ldc #6 10: invokevirtual #4 13: aload_0 // 將區域性變量表 Slot 1 中的資料入棧,即 Lock.class 14: monitorexit // 使用棧頂資料退出同步 15: goto 23 // 方法結束,跳轉到 23 返回 18: astore_1 // 從這裡開始是異常路徑,將異常資訊儲存至區域性變量表 Slot 2 中,檢視異常表 19: aload_0 // 將區域性變量表 Slot 1 中的 Lock.class 入棧 20: monitorexit // 使用棧頂資料退出同步 21: aload_1 // 將區域性變量表 Slot 2 中的異常資訊入棧 22: athrow // 把異常物件重新丟擲給方法的呼叫者 23: return // 方法正常返回 Exception table: // 異常表 fromtotarget type 51518any // 5-15 出現任何(any)異常跳轉到 18 182118any // 18-21 出現任何(any)異常跳轉到 18
synchronized 區塊的位元組碼相比較 synchronized 方法複雜了許多。每一行位元組碼的含義我都作了詳細註釋,可以看到此時是通過位元組碼指令 monitorenter,monitorexit 來進入和退出同步的。特別值得注意的是,我們並沒有寫 try.catch 捕獲異常,但是位元組碼指令中存在異常處理的程式碼,其實為了保證在方法異常完成時 monitorenter 和 monitorexit 指令依然可以正確配對執行,編譯器會自動產生一個異常處理器,這個異常處理器宣告可處理所有的異常,它的目的就是用來執行 monitorexit 指令。這個機制確保在 synchronized 區塊中產生任何異常都可以正常退出同步,釋放鎖資源。
不管是檢查標誌位中的 ACC_SYNCHRONIZED,還是位元組碼指令 monitorenter,monitorexit,鎖機制的實現最終肯定存在於 JVM 中,後面我們會再提到這點。
接下來繼續看 ReentrantLock 的實現,鑑於篇幅有限,ReentrantLock 的原理不會講的很詳細,感興趣的可以自行研究。ReentrantLock 是基於併發基礎元件 AbstractQueuedSynchronizer 實現的,內部有一個 int 型別的 state 變數來控制同步狀態,為 0 時表示無執行緒佔用鎖資源,等於 1 時表示則說明有執行緒佔用,由於 ReentrantLock 是可重入鎖,state 也可能大於 1 表示該執行緒有多次獲取鎖。AQS 內部還有一個由內部類 Node 構成的佇列用來完成執行緒獲取鎖的排隊。本文只是簡單的介紹一下 lock 和 unLock 方法。
下面先看 ReentrantLock.lock 方法:
// ReentrantLock.java public void lock() { this.sync.lock(); } // ReentrantLock.NonfairSync.class final void lock() { // 使用 cas 設定 state,如果設定成功表示當前無其他執行緒競爭鎖,優先獲取鎖資源 if (this.compareAndSetState(0, 1)) { // 儲存當前執行緒由於後續重入鎖的判斷 this.setExclusiveOwnerThread(Thread.currentThread()); } else { this.acquire(1); } } // AbstractQueuedSynchronizer.java public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); // 如果阻塞被中斷,重新設定中斷通知呼叫者 } // 判斷是否是重入 protected final boolean tryAcquire(int var1) { return this.nonfairTryAcquire(var1); } // 處理等待佇列 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private final boolean parkAndCheckInterrupt() { LockSupport.park(this); // 阻塞執行緒 return Thread.interrupted(); }
對於鎖競爭的情況,最終會呼叫 LockSupport.park(this) 阻塞當前執行緒,同樣的 ReentrantLock.unlock 方法會呼叫 LockSupport.unpark(thread) 來恢復阻塞的執行緒。繼續看 LockSupport 的實現:
public static void unpark(Thread thread) { if (var0 != null) { UNSAFE.unpark(thread); } } public static void park(Object obj) { Thread thread = Thread.currentThread(); setBlocker(thread, obj); UNSAFE.park(false, 0L); setBlocker(thread, (Object)null); }
LockSupport 內部呼叫了 UnSafe 類的 park 和 unpark, 是 native 程式碼,該類由虛擬機器實現,以 Hotspot 虛擬機器為例,檢視 park 方法:
// unsafe.cpp UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time)) UnsafeWrapper("Unsafe_Park"); #ifndef USDT2 HS_DTRACE_PROBE3(hotspot, thread__park__begin, thread->parker(), (int) isAbsolute, time); #else /* USDT2 */ HOTSPOT_THREAD_PARK_BEGIN( (uintptr_t) thread->parker(), (int) isAbsolute, time); #endif /* USDT2 */ JavaThreadParkedState jtps(thread, time != 0); thread->parker()->park(isAbsolute != 0, time); #ifndef USDT2 HS_DTRACE_PROBE1(hotspot, thread__park__end, thread->parker()); #else /* USDT2 */ HOTSPOT_THREAD_PARK_END( (uintptr_t) thread->parker()); #endif /* USDT2 */ UNSAFE_END
呼叫了: thread->parker()->park(isAbsolute != 0, time); 我們可以猜測是這句程式碼阻塞了當前執行緒。HotSpot 虛擬機器裡的 Thread 類對應著一個 OS 的 Thread, JavaThread 類繼承於 Thread, JavaThread 例項對應著一個 Java 層的 Thread.
簡而言之,Java 層的 Thread 對應著一個 OS 的 Thread。使用如下程式碼建立執行緒:
//linux_os.cpp pthread_t tid; int ret = pthread_create(&tid, &attr, (void* (*)(void*)) thread_native_entry, thread);
回到 Thread 類中的 Park,我們檢視 HotSpot 的 thread.hpp, 找到了如下三個 Park:
public: ParkEvent * _ParkEvent ;// for synchronized() ParkEvent * _SleepEvent ;// for Thread.sleep // JSR166 per-thread parker private: Parker*_parker;
從註釋上可以看出分別是用於 synchronized 的阻塞,Thread.sleep 的阻塞還有用於 UnSafe 的執行緒阻塞,繼續檢視 park.hpp 節選:
// A word of caution: The JVM uses 2 very similar constructs: // 1. ParkEvent are used for Java-level "monitor" synchronization. // 2. Parkers are used by JSR166-JUC park-unpark. class Parker : public os::PlatformParker { // 略 } class ParkEvent : public os::PlatformEvent { // 略 }
註釋上更近一步解釋了兩種 Parker 的區別,他們的實現非常相似,那為什麼會存在兩個呢?網路上有解釋說是隻是沒重構而已。下面只看 Parker 的實現,發現 park.cpp 中並沒有實現 park 方法,猜測應該是父類中實現了,因為這是和系統相關的操作,以 Linux 系統為例,檢視 linux_os.cpp 找到了 park 的實現,截取了主要部分:
void Parker::park(bool isAbsolute, jlong time) { // 省略了前置判斷 // 獲取鎖 if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) { return; } if (time == 0) { _cur_index = REL_INDEX; // arbitrary choice when not timed // 呼叫 pthread_cond_wait 阻塞執行緒 status = pthread_cond_wait (&_cond[_cur_index], _mutex) ; } else { _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX; status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ; if (status != 0 && WorkAroundNPTLTimedWaitHang) { pthread_cond_destroy (&_cond[_cur_index]) ; pthread_cond_init(&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr()); } } _cur_index = -1; // 已從 block 中恢復,釋放鎖 _counter = 0 ; status = pthread_mutex_unlock(_mutex) ; // 略 }
總共分三步走,先獲取鎖,再呼叫 pthread_cond_wait 阻塞執行緒,最後阻塞恢復了之後釋放鎖,是不是和我們使用 Object.wait 十分類似,事實上 Object.wait 底層也是這種方式實現的。為了更清楚的瞭解底層的實現,寫了一段 c 程式碼看一下執行緒的建立和鎖的使用:
int counter = 0; // 互斥鎖物件 pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; void* add() { for(int i = 0;i < 2;++i) { // 獲取鎖 pthread_mutex_lock( &mutex ); ++counter; sleep(1); // 釋放鎖 pthread_mutex_unlock( &mutex ); printf("counter = %d\n", counter); } pthread_exit(NULL); } int main() { pthread_t thread_1, thread_2; // 建立執行緒 pthread_create(&thread_1, NULL, add, NULL); pthread_create(&thread_2, NULL, add, NULL); pthread_join(thread_1, NULL); pthread_join(thread_2, NULL); return 0; }
使用 pthread_create 建立執行緒,使用 pthread_mutex_lock 獲取鎖,使用 pthread_mutex_unlock 釋放鎖。那既然 pthread_mutex_lock 和 pthread_mutex_unlock 就能實現鎖了,那為什麼鎖實現的時候還要使用 pthread_cond_wait 來阻塞執行緒呢?回過頭看 PlatformParker :
//os_linux.hpp class PlatformParker { pthread_mutex_t _mutex[1]; //一個是給park用, 另一個是給parkUntil用 pthread_cond_t_cond[2]; // one for relative times and one for abs. //略... };
每個 JavaThread 例項都有自己的 mutex,在上述自己寫的例子中是多個執行緒競爭同一個 mutex,阻塞執行緒佇列管理的邏輯直接由 mutex 實現,而此處的 mutex 執行緒私有,不存在直接競爭關係,事實上,JVM 為了提升平臺通用性(?),只提供了執行緒阻塞和恢復操作,阻塞執行緒佇列的管理工作交給了 Java 層,也就是前面提到的 AQS。對於 Java 層來說 JVM 只需要提供 「阻塞」 和 「喚醒」 的操作即可。
在 Java 中講解 Object.wait, Object.notify 的時候通常會用生產者-消費者作為例子,這裡我也簡單的寫了一個 c 的例子,讓大家瞭解底層執行緒阻塞的原理:
#define TRUE 1 #define FALSE 0 #define BUFFER_SIZE 10 pthread_cond_t msg_cond = PTHREAD_COND_INITIALIZER; pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; char* msgBuffer[BUFFER_SIZE] = {0}; int bufferIndex = -1; int counter = 0; void* readMsg() { while (TRUE) { // 獲取鎖 pthread_mutex_lock( &mutex ); if (bufferIndex < 0) { printf("wait for message\n"); // 訊息佇列如果為空則阻塞等待 pthread_cond_wait( &msg_cond, &mutex); } for(; bufferIndex >= 0; --bufferIndex){ char* msg = msgBuffer[bufferIndex]; msgBuffer[bufferIndex] = 0; printf("read message = %s, %d\n", msg, counter++); // 通知生產者執行緒 pthread_cond_signal(&msg_cond); } sleep(1); // 釋放鎖 pthread_mutex_unlock( &mutex ); } return 0; } void* writeMsg() { // 獲取鎖 pthread_mutex_lock( &mutex ); if (bufferIndex < BUFFER_SIZE - 1) { char* msg = "haha!"; msgBuffer[++bufferIndex] = msg; // 通知消費者執行緒 pthread_cond_signal(&msg_cond); // notify(); // pthread_cond_broadcast(&msg_cond); // notifyAll(); } else { printf("message buffer is full!\n"); // 緩衝佇列已滿阻塞等待 pthread_cond_wait( &msg_cond, &mutex); } // 釋放鎖 pthread_mutex_unlock( &mutex ); return 0; } int main(int argc, char const *argv[]) { pthread_t thread_r; // 建立後臺消費者執行緒 pthread_create(&thread_r, NULL, readMsg, NULL); for(int i = 0; i < 50; i++){ printf("send message %d \n", i); // 生產訊息 writeMsg(); } pthread_join(thread_r, NULL); return 0; }
其中消費者執行緒是一個迴圈,在迴圈中先獲取鎖,然後判斷佇列是否為空,如果為空則呼叫 pthread_cond_wait 阻塞執行緒,這個阻塞操作會自動釋放持有的鎖並出讓 cpu 時間片,恢復的時候自動獲取鎖,消費完佇列之後會呼叫 pthread_cond_signal 通知生產者執行緒,另外還有一個通知所有執行緒恢復的 pthread_cond_broadcast,與 notifyAll 類似。
最後再簡單談一下阻塞中斷,Java 層 Thread 中有個 interrupt 方法,它的作用是線上程收到阻塞的時候丟擲一箇中斷訊號,這樣執行緒就會退出阻塞狀態,但是並不是我們遇到的所有阻塞都會中斷,要看是否會響應中斷訊號,Object.wait, Thread.join,Thread.sleep,ReentrantLock.lockInterruptibly 這些會丟擲受檢異常 InterruptedException 的都會被中斷。synchronized,ReentrantLock.lock 的鎖競爭阻塞是不會被中斷的,interrupt 並不會強制終止執行緒,而是會將執行緒設定成 interrupted 狀態,我們可以通過判斷 isInterrupted 或 interrupted 來獲取中斷狀態,區別在於後者會重置中斷狀態為 false。看一下底層執行緒中斷的程式碼:
// os_linux.cpp void os::interrupt(Thread* thread) { OSThread* osthread = thread->osthread(); if (!osthread->interrupted()) { osthread->set_interrupted(true); OrderAccess::fence(); ParkEvent * const slp = thread->_SleepEvent ; if (slp != NULL) slp->unpark() ; } // For JSR166. Unpark even if interrupt status already was set if (thread->is_Java_thread()) ((JavaThread*)thread)->parker()->unpark(); ParkEvent * ev = thread->_ParkEvent ; if (ev != NULL) ev->unpark() ; }
可以看到,執行緒中斷也是由 unpark 實現的, 即恢復了阻塞的執行緒。並且對之前提到的三個 Parker (_ParkEvent,_SleepEvent,_parker) 都進行了 unpark。
說到這裡相信大家對 Java 執行緒鎖與執行緒阻塞有個大體的瞭解了吧,由於本人水平實在有限,有些地方講的不好或者有錯誤的地方請多包涵,如果發現任何問題,請提出討論,我會及時修改。
>> 轉載請註明來源: 深入理解 Java 鎖與執行緒阻塞●非常感謝您的閱讀,歡迎訂閱 微信公眾號 (右邊掃一掃)以表達對我的認可與支援,我會在第一時間同步文章到公眾號上。當然也可點選下方打賞按鈕為我打賞。
免費分享,隨意打賞
感謝打賞!

微信

支付寶