ReentrantLock原始碼分析(公平鎖)
阿新 • • 發佈:2019-01-09
ReentranLock通過lock來獲取鎖,下面就通過lock來分析公平鎖的過程
1、lock()方法
final void lock() {
acquire(1);
}
lock()方法通過acquire()方法來獲取鎖,acquire()方法中的引數是用來設定鎖的狀態。對於獨佔鎖而言,可被獲取的狀態為0,如果是初次獲取,狀態就被設定成1,而ReentranLock又是可重入鎖,如果鎖被重複獲取,狀態在原來的基礎上加1。
2、acquire()方法
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
- 首先通過tryAcquire()去獲取鎖,如果獲取成功,直接返回,否則則將該執行緒加入到等待佇列
- 在嘗試獲取鎖失敗後,呼叫addWaiter方法,將該執行緒加入到佇列末尾,等待獲取鎖。
- 執行緒加入到佇列後,會呼叫acquireQueued()方法,根據公平原則獲取鎖。
- 如果執行緒在沉睡期間被中斷過,會進行自我中斷一次。
3、tryAcquire()方法:執行緒嘗試獲取鎖.
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread();//獲取當前執行緒 int c = getState();//獲取鎖的狀態 if (c == 0) {//如果鎖狀態為0,表示可以獲取鎖 if (!hasQueuedPredecessors() &&//佇列中沒有等待的執行緒,或者“當前執行緒”是佇列中的第一個執行緒, compareAndSetState(0, acquires)) {//通過cas函式設定鎖的狀態 setExclusiveOwnerThread(current);//將鎖的擁有者設定為"當前執行緒" return true; } } else if (current == getExclusiveOwnerThread()) {//如果鎖已經被獲取,則判斷擁有鎖的執行緒是否是"當前執行緒" int nextc = c + acquires;//如果是兩個執行緒相等,根據可重入的性質,鎖的狀態加1 if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc);//設定鎖的狀態 return true; } return false; }
注:上面的"當前執行緒"指嘗試獲取鎖的執行緒,不是擁有鎖的執行緒。
- hasQueuedPredecessors()方法:
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
佇列為空時,鎖的狀態為0,當前執行緒可以嘗試去獲取鎖。當 h != head 時說明佇列不為空,這時需要判斷當前當前執行緒是不是佇列的第一個執行緒,如果是,則返回false,否則返回true。(佇列不為空時,(s = h.next) != null ,所以接著判斷後面一個條件)
- compareAndSetState()方法:
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
compareAndSetState是一個原子操作,將期望值與原來的值比較,如果想等,則將原來的值更新為傳入的值。如:如果當前鎖的狀態為expect,則將鎖的狀態設定為update。整個過程是原子性的。
- setExclusiveOwnerThread()方法:將擁有鎖的執行緒設定為當前執行緒
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
4、addWaiter():如果嘗試獲取鎖失敗,則將執行緒加入到等待佇列。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);//將當前執行緒包裝成node節點
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;//將node節點的上一個節點引用指向尾節點
if (compareAndSetTail(pred, node)) {//通過cas函式將node設定成尾節點,如果設定失敗,則通過enq來新增。
pred.next = node;
return node;
}
}
enq(node);//初始化佇列或者將節點新增進等待佇列
return node;
}
- Node 節點結構
static final class Node {
//共享鎖
static final Node SHARED = new Node();
//獨佔鎖
static final Node EXCLUSIVE = null;
//執行緒被取消標誌
static final int CANCELLED = 1;
//節點的後繼節點需要park,並且節點在release後,需要喚醒它的後繼節點
static final int SIGNAL = -1;
//執行緒被阻塞,等待condition喚醒
static final int CONDITION = -2;
static final int PROPAGATE = -3;
//新建立的節點狀態為0。
volatile int waitStatus;
//上一個節點
volatile Node prev;
//下一個節點
volatile Node next;
//節點持有的執行緒
volatile Thread thread;
//區別佇列是“獨佔鎖”佇列,還是“共享鎖”佇列
Node nextWaiter;
//如果是共享鎖,返回true
final boolean isShared() {
return nextWaiter == SHARED;
}
//返回上一個節點
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
- enq()方法:初始化佇列或者新增執行緒到佇列
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize//如果尾節點為null,則說明佇列還沒有初始化,初始化佇列
if (compareAndSetHead(new Node()))//將首節點設定為一個空執行緒的node節點
tail = head;//初始化尾節點
} else {
node.prev = t;//將node節點的上一個節點引用指向尾節點
if (compareAndSetTail(t, node)) {//通過cas函式將尾節點設定為node節點
t.next = node;//上一個節點的下一個節點引用指向node
return t;
}
}
}
}
5、acquireQueued:嘗試去獲取鎖,如果獲取失敗,則執行緒進入睡眠狀態,直到被喚醒。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;//執行緒在睡眠期間是否有過中斷
for (;;) {
final Node p = node.predecessor();//“當前節點”的上一個節點("當前節點"是指在佇列中等待的執行緒)
if (p == head && tryAcquire(arg)) {如果"當前節點"的上一個節點是頭節點,說明"當前節點"是佇列中的第一個節點,嘗試獲取鎖
setHead(node);//如果成功獲取到了鎖,則當前節點節點設定成頭節點
p.next = null; // help GC
failed = false;
return interrupted;//返回中斷標誌
}
if (shouldParkAfterFailedAcquire(p, node) &&//獲取鎖失敗,將執行緒進入睡眠,等待它的上一個節點喚醒
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- shouldParkAfterFailedAcquire
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//當前節點的上一個節點的狀態
if (ws == Node.SIGNAL)//如果前繼節點是SIGNAL,後繼節點進入休眠狀態
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {//小於0說明前繼節點已經取消,迴圈前繼節點,直到前繼節點狀態不是取消狀態
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;將“當前節點”的前繼節點指向它的前繼節點的前繼節點
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//通過cas函式,將前繼節點的狀態設定成SIGNAL
}
return false;
}
- parkAndCheckInterrup
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//阻塞當前執行緒
return Thread.interrupted();//放回執行緒的中斷標誌,因為interrupted方法會清除執行緒的中斷標誌,所以需要自我中斷一次。
}
6、selfInterrupt()
static void selfInterrupt() {
Thread.currentThread().interrupt();//設定中斷標誌
}
總結:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
1、執行緒首先會呼叫tryAcquire()去嘗試獲取鎖,如果獲取失敗,執行緒會通過addWaiter方法,被新增進入等待佇列。
2、進入等待佇列後,通過acquireQueued方法,再嘗試獲取鎖,如果獲取失敗,這時執行緒會進入休眠狀態,直到執行緒被它的前繼節點喚醒,然後繼續獲取鎖。