1. 程式人生 > >Java中的ReentrantLock鎖

Java中的ReentrantLock鎖

# ReentrantLock鎖 ReentrantLock通過原子操作和阻塞實現鎖原理,一般使用lock獲取鎖,unlock釋放鎖 lock的時候可能被其他執行緒獲得所,那麼此執行緒會阻塞自己,關鍵原理底層用到Unsafe類的API: CAS和park ## 使用方式 lock unlock對應 ### lock 拿到鎖,開始執行程式碼邏輯 ### unlock 執行完程式碼後,釋放鎖,讓其他執行緒去獲取,需要注意的是,多個執行緒使用的鎖物件必須是同一個 ```java //建立鎖物件 ReentrantLock lock = new ReentrantLock(); lock.lock(); //獲取鎖(鎖定) // 中間執行程式碼,保證同一時間只有一個執行緒能執行此處的程式碼 lock.unlock(); //鎖釋放 ``` ## 示例 為了體現鎖的作用,這裡sleep睡眠0.1秒,增加哪個執行緒獲取鎖的隨機性 因為執行緒喚醒後,會開始嘗試獲取鎖,多個執行緒下競爭一把鎖是隨機的 ```java package javabasis.threads; import java.util.concurrent.locks.ReentrantLock; public class LockTest implements Runnable { public static ReentrantLock lock = new ReentrantLock();//鎖 private int thold; public LockTest(int h) { this.thold = h; } public static void main(String[] args) { for (int i = 10; i < 15; i++) { new Thread(new LockTest(i),"name-" + i).start(); } } @Override public void run() { try { Thread.sleep(100); lock.lock(); //獲取鎖 System.out.println("lock threadName:" + Thread.currentThread().getName()); { System.out.print(" writeStart "); for (int i = 0; i < 15; i++) { Thread.sleep(100); System.out.print(thold+","); } System.out.println(" writeEnd"); } System.out.println("unlock threadName:" + Thread.currentThread().getName() + "\r\n"); lock.unlock(); //鎖釋放 } catch (InterruptedException e) { } } } ``` 執行main方法輸出結果: ```java lock threadName:name-10 writeStart 10,10,10,10,10,10,10,10,10,10,10,10,10,10,10, writeEnd unlock threadName:name-10 lock threadName:name-14 writeStart 14,14,14,14,14,14,14,14,14,14,14,14,14,14,14, writeEnd unlock threadName:name-14 lock threadName:name-13 writeStart 13,13,13,13,13,13,13,13,13,13,13,13,13,13,13, writeEnd unlock threadName:name-13 lock threadName:name-11 writeStart 11,11,11,11,11,11,11,11,11,11,11,11,11,11,11, writeEnd unlock threadName:name-11 lock threadName:name-12 writeStart 12,12,12,12,12,12,12,12,12,12,12,12,12,12,12, writeEnd unlock threadName:name-12 ``` 這體現在多執行緒情況下,鎖能做到讓執行緒之間有序執行, 如果沒有鎖,情況可能是 12,13,13,10,10,10,12,沒有鎖其他執行緒可能插隊執行`System.out.print` ## 原理 ReentrantLock主要用到unsafe的CAS和park兩個功能實現鎖(CAS + park ) > 多個執行緒同時操作一個數N,使用原子(CAS)操作,原子操作能保證同一時間只能被一個執行緒修改,而修改數N成功後,返回true,其他執行緒修改失敗,返回false, > 這個原子操作可以定義執行緒是否拿到鎖,返回true代表獲取鎖,返回false代表為沒有拿到鎖。 > > 拿到鎖的執行緒,自然是繼續執行後續邏輯程式碼,而沒有拿到鎖的執行緒,則呼叫park,將執行緒(自己)阻塞。 > > 執行緒阻塞需要其他執行緒喚醒,ReentrantLock中用到了連結串列用於存放等待或者阻塞的執行緒,每次執行緒阻塞,先將自己的執行緒資訊放入連結串列尾部,再阻塞自己;之後需要拿到鎖的執行緒,在呼叫unlock 釋放鎖時,從連結串列中獲取阻塞執行緒,呼叫unpark 喚醒指定執行緒 ### Unsafe sun.misc.Unsafe是關鍵類,提供大量偏底層的API 包括CAS park `sun.misc.Unsafe` 此類在openjdk中可以檢視 ### CAS 原子操作 compare and swapz(CAS)比較並交換,是原子性操作, 原理:當修改一個(記憶體中的)變數o的值N的時候,首先有個期望值expected,和一個更新值x,先比較N是否等於expected,等於,那麼更新記憶體中的值為x值,否則不更新。 ```java public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x); ``` 這裡offset據瞭解,是物件的成員變數在記憶體中的偏移地址, 即底層一個物件object存放在記憶體中,讀取的地址是0x2110,此物件的一個成員變數state的值也在記憶體中,但記憶體地址肯定不是0x2110 #### java中的CAS使用 `java.util.concurrent.locks.AbstractQueuedSynchronizer` 類 ```java protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } ``` 在Java中,這個操作如果更新成功,返回true,失敗返回false,通過這個機制,可以定義鎖(樂觀鎖)。 如三個執行緒A,B,C,在目標值為0的情況下,同時執行`compareAndSetState(0,1) ` 去修改它 期望值是0,更新值是1,因為是原子操作,在第一個執行緒操作成功之後目標值變為1,返回true 所以另外兩個執行緒就因為期望值為0不等於1,返回false。 我們可以理解為,返回true的執行緒拿到了鎖。 最終呼叫的Java類是`sun.misc.Unsafe` ### park 阻塞 Java中可以通過unsafe.park()去阻塞(停止)一個執行緒,也可以通過unsafe.unpark()讓一個阻塞執行緒恢復繼續執行 #### unsafe.park() 阻塞(停止)當前執行緒 ```java public native void park(boolean isAbsolute, long time); ``` 根據debug測試,此方法能停止執行緒自己,最後通過其他執行緒喚醒 #### unsafe.unpark() 取消阻塞(喚醒)執行緒 ```java public native void unpark(Object thread); ``` 根據debug測試,此方法可以喚醒其他被park呼叫阻塞的執行緒 #### park與interrupt的區別 interrupt是Thread類的的API,park是Unsafe類的API,兩者是有區別的。 測試瞭解,Thread.currentThread().interrupt(),執行緒會繼續執行,而Unsafe.park(Thread.currentThread())就是直接阻塞執行緒,不繼續執行程式碼。 ### 獲取鎖 執行緒cas操作失敗,可以park阻塞自己,讓其他擁有鎖的執行緒在unlock的時候釋放自己,達到鎖的效果 java.util.concurrent.locks.ReentrantLock的lock方法是 ```java public void lock() { sync.lock(); } ``` 而sync的實現類其中一個是java.util.concurrent.locks.ReentrantLock.NonfairSync 不公平鎖,它的邏輯比較直接 ```java /** NonfairSync */ final void lock() { if (compareAndSetState(0, 1))//cas操作,如果true 則表示操作成功,獲取鎖 setExclusiveOwnerThread(Thread.currentThread()); //設定獲取鎖擁有者為當前執行緒 else acquire(1);//獲取鎖失敗,鎖住執行緒(自己) } ``` #### 獲取失敗後阻塞執行緒 如果獲取鎖失敗,會再嘗試一次,失敗後,將執行緒(自己)阻塞 ```java public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //如果期望值為0,記憶體值也為0,再次嘗試獲取鎖(此時其他執行緒也可能嘗試獲取鎖) if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); //第二次獲取成功,放回true return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; //沒有獲取到鎖,返回false,則 !tryAcquire(arg) 為true,執行acquireQueued(addWaiter(Node.EXCLUSIVE), arg) } ``` 獲取鎖失敗,執行緒會進入迴圈,acquireQueued 方法中for是個無限迴圈,除非獲取鎖成功後,才會return。 ```java //獲取鎖失敗後,準備阻塞執行緒(自己) //阻塞之前,新增節點存放到連結串列,其他執行緒可以通過這個連結串列喚醒此執行緒 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) {//cas操作 pred.next = node; return node; } } enq(node); return node; } // 在此方法直到獲取鎖成功才會跳出迴圈 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; //獲取鎖成功之後才會return跳出此方法 } if (shouldParkAfterFailedAcquire(p, node) && //如果滿足阻塞條件 parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private final boolean parkAndCheckInterrupt() { LockSupport.park(this);//停止執行緒(自己) return Thread.interrupted(); } ``` ### 釋放鎖 一個執行緒拿到鎖之後,執行完關鍵程式碼,必須unlock釋放鎖的,否則其他執行緒永遠拿不到鎖 ```java public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } //java.util.concurrent.locks.ReentrantLock.Sync 的tryRelease protected final boolean tryRelease(int releases) { int c = getState() - releases; //這裡一般是 1 - 1 = 0 if (Thread.currentThread() != getExclusiveOwnerThread()) //只能是鎖的擁有者釋放鎖 throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); //設定state為0,相當於釋放鎖,讓其他執行緒compareAndSetState(0, 1)可能成功 return free; } protected final void setState(int newState) { state = newState; //沒有cas操作 } ``` setState不做cas操作是因為,只有擁有鎖的執行緒才呼叫unlock,不存才併發混亂問題 其他執行緒沒拿到鎖不會設值成功,其他執行緒在此執行緒設定state為0之前,compareAndSetState(0, 1)都會失敗,拿不到鎖,此執行緒設定state為0之後,其他執行緒compareAndSetState(0, 1)才有可能成功,返回true從而拿到鎖 #### 釋放執行緒 執行緒在獲取鎖失敗後,有可能阻塞執行緒(自己),在阻塞之前把阻塞執行緒資訊放入連結串列的 釋放鎖之後,執行緒會嘗試通過連結串列釋放其他執行緒(一個),讓一個阻塞執行緒恢復執行 ```java private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus >
0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; //迴圈,找到連結串列最前面需要被喚醒的執行緒 } if (s != null) LockSupport.unpark(s.thread); //喚醒(釋放)被阻塞的執行緒 } ``` ### 阻塞執行緒被取消阻塞後如何拿到鎖(ReentrantLock中) 有時候執行緒被中斷後,喚醒繼續執行後面的程式碼, 執行緒沒有拿到鎖之後主動阻塞自己的,但所還沒拿到,被喚醒之後怎麼去嘗試重新獲取鎖呢? 裡面有一個for迴圈 ```java final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread());//拿到鎖 else acquire(1); //沒有拿到鎖 } // 上鎖失敗,會新增一個節點,節點包含執行緒資訊,將此節點放入佇列 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } // 存好節點後,將執行緒(自己)中斷,等其他執行緒喚醒(自己) final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) {//迴圈 被喚醒後執行緒還是在此處迴圈 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) {//嘗試獲取鎖 setHead(node); p.next = null; // help GC failed = false; return interrupted; //如果拿到鎖了,才會return } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //沒拿到鎖時,主動中斷Thread.currentThread() interrupted = true; } } finally { if (failed) cancelAcquire(node); } } ``` 被喚醒後繼續執行`compareAndSetState(0, 1)`返回false沒拿到鎖,則繼續迴圈或阻塞 `compareAndSetState(0, 1)` 這個操作是獲取鎖