ReentrantLock原始碼解讀
寫這篇文章之前,還是先安利一本書:《java併發程式設計的藝術》。這本書對鎖的實現的很多細節都解釋的還是很清楚的,加上自己配合原始碼進行理解,讀懂ReentrantLock這個類的實現應該不是那麼困難。本文只對獨佔模式 進行分析。
一行行分析ReentrantLock原始碼
直接步入正題,先貼一段程式碼看看如何使用ReentrantLock:
public class ReentrantLockTest { public static void main(String[] args) { ReentrantLock lock = new ReentrantLock(true);//1 lock.lock();//2 try { //do something } finally { lock.unlock(); //3 } } } 複製程式碼
ReentrantLock的構造
上面程式碼的步驟1是呼叫ReentrantLock構造方法進行初始化,這裡ReentrantLock給我們提供了兩種鎖的實現,一個是公平鎖,一個是非公平鎖。這兩種鎖顧名思義,一個排隊幹活,一個搶著幹~~
//預設建構函式,得到的是非公平鎖的實現 public ReentrantLock() { sync = new NonfairSync(); } //傳入true得到公平鎖的實現,傳入false則得到公平鎖的實現 public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } 複製程式碼
lock方法的解析
ReentrantLock鎖的使用的入口在lock方法,下面咱們針對公平鎖 lock方法的實現進行分析一波(能看懂這個相信對非公平鎖的lock的實現的理解也就不會有什麼難度了)。
這裡我把所有的方法都放在一起,方便大家閱讀:
//這裡在併發情況下會有競爭 final void lock() { acquire(1); } //來至於父類AQS中 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } //公平鎖自身提供的實現方法,來保證鎖的獲取是按照FIFO原則.也就是佇列模型,先入先出。 protected final boolean tryAcquire(int acquires) { //獲取當前執行緒 final Thread current = Thread.currentThread(); //拿到鎖標記的狀態值,為0則代表這把鎖沒人佔用 int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { //將幹活的人的身份標記一下 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { //這裡是重入鎖的關鍵程式碼,只要是獲取鎖的執行緒再次去拿這把鎖,則可以直接獲取成功, //並將state的值+1後重新設定,供後面釋放鎖的時候進行多次釋放使用。 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); //這裡有個優雅的小細節:咱們發現設定狀態時並沒有使用compareAndSetState這種方法, //而是直接設定。那是因為在這種條件下不會有競爭,只可能是獲取鎖的執行緒才能去改變這個值。 setState(nextc); return true; } return false; } //用來判斷是否在它之前已經有人排在隊列當中了,如果有,則返回true public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; //這裡返回時的判斷條件可能有點難理解。假設當前是A執行緒。 //1.第一種情況發生在有一個B執行緒進度比A快,已經準備開始排隊了。可以看下面addWaiter方法 //的呼叫,在進行compareAndSetTail交換後,有可能還沒來得及將pred.next指向這個新節點node, //這個時候說明已經有人在A執行緒前面去排隊拿鎖了。 //2.第二種情況簡單明瞭。A執行緒不是排在佇列的第一個的,也證明了有人排在他前面了。 return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } //用來新增新的節點到佇列的尾部。 private Node addWaiter(Node mode) { //根據傳進來的引數mode=Node.EXCLUSIVE,表示將要構造一個獨佔鎖。 Node node = new Node(Thread.currentThread(), mode); Node pred = tail; //tail為空的情況下直接呼叫enq方法去進行head和tail的初始化。 if (pred != null) { //tail不為空的情況下,將新構造節點的前驅設定為原尾部節點。 node.prev = pred; //使用CAS進行交換,如果成功,則將原尾部節點的後繼節點設定為新節點,做雙向列表關聯; //(這裡要注意一點,交換成功的同時有其他執行緒讀取該列表,有可能讀取不到新節點。例如A執行緒 //執行完下方步驟1後,還未執行步驟2,遍歷的時候將會獲取不到新節點,這也是 //hasQueuedPredecessors方法中的第一種情況) //如果不成功,則代表有競爭,有其他執行緒修改了尾部,則去呼叫下方enq方法 if (compareAndSetTail(pred, node)) {//1 pred.next = node;//2 return node; } } enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize //初始化head和tail,初始化完成後,會繼續執行外面的死迴圈,進行compareAndSetTail將 //新節點設定到尾部,和上述執行流程一樣,這裡就不詳述了。 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } //再進行一次嘗試和進入堵塞 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //獲取當前node的前驅 final Node p = node.predecessor(); //如果前驅是head的話就再進行一次嘗試,這種設計會節約很多的資源。 //這裡嘗試成功後該執行緒就不會有後續的park和unpark之說了。 if (p == head && tryAcquire(arg)) { //如果獲取成功就將head設定成當前node,並將儲存的thread和prev都清空 setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } //來判斷進行嘗試獲取失敗後是否進行park private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //node的waitStatus初始化都是0 int ws = pred.waitStatus; if (ws == Node.SIGNAL) //第一次進來肯定不是-1狀態的,需要compareAndSetWaitStatus方法進行設定後才會是-1 return true; if (ws > 0) { //這裡的作用是用來剔除被cancel後的節點,只要是cancel後的節點waitStatus 都會被標記成1。 //用該狀態來過濾掉這些節點。 //由於節點的喚醒是由它的prev節點來進行喚醒的,我們必須要保證它的prev是處於活著的狀態 //所以這裡一直遍歷往上找,總會找到一個正常的prev來幫助其unpark。 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //設定prev為-1狀態,(該狀態下能夠喚醒它的下一個去幹活)。 //這裡結束後會跳到acquireQueued的死迴圈再次迴圈一次。 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } //要執行這個方法的前提是shouldParkAfterFailedAcquire這個方法必須返回true private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } //阻塞執行緒的方法 public static void park(Object blocker) { Thread t = Thread.currentThread(); // 設定Blocker,設定為當前lock。 setBlocker(t, blocker); // 等待獲取許可,這裡會進行堵塞,直到有人幫忙呼叫該執行緒的unpark方法才會獲取到許可, //並繼續走下面的流程。 UNSAFE.park(false, 0L); // 設定Blocker,將該執行緒的parkBlocker欄位設定為null,這個是線上程被喚醒後執行的。 setBlocker(t, null); } 複製程式碼
unlock方法的解析
//呼叫該方法進行解鎖 public void unlock() { sync.release(1); } //改變state的值並喚醒佇列中的下一個執行緒來幹活 public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; //這裡會判斷頭部是不是null,並看其waitStatus 狀態是否有喚醒它的後繼節點的資格。 //這裡的頭部其實也就是當前執行緒所代表的節點。 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } //嘗試著釋放鎖 protected final boolean tryRelease(int releases) { //將鎖標記state的值-1 int c = getState() - releases; //如果幹活的人和自己的身份不一致,則拋異常出去 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //這裡判斷狀態-1後是不是等於0。 //如果不是,則代表重入了很多次,鎖暫時不釋放。 //如果是,則將free置為true,釋放鎖,將身份標記置為null。 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } //去喚醒後繼節點中的thread來幹活 private void unparkSuccessor(Node node) { int ws = node.waitStatus; //如果head中的waitStatus<0,則置為0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); //這裡會檢查head的下一個節點是不是null以及是否是cancel狀態 Node s = node.next; if (s == null || s.waitStatus > 0) { //如果next是cancel狀態,則將s置為空,並重佇列尾部進行往前遍歷,直到找到最後 //一個waitStatus <=0的node來做為next節點去喚醒 s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //去喚醒s指向的next節點,呼叫這裡可以讓UNSAFE.park(false, 0L);處的執行緒獲取到許可。 //到這裡解鎖的功能就執行完畢了~ LockSupport.unpark(s.thread); } 複製程式碼