1. 程式人生 > >ReentrantLock原始碼分析(公平鎖)

ReentrantLock原始碼分析(公平鎖)

ReentranLock通過lock來獲取鎖,下面就通過lock來分析公平鎖的過程

1、lock()方法

 final void lock() {
            acquire(1);
        }

lock()方法通過acquire()方法來獲取鎖,acquire()方法中的引數是用來設定鎖的狀態。對於獨佔鎖而言,可被獲取的狀態為0,如果是初次獲取,狀態就被設定成1,而ReentranLock又是可重入鎖,如果鎖被重複獲取,狀態在原來的基礎上加1。

2、acquire()方法

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
  • 首先通過tryAcquire()去獲取鎖,如果獲取成功,直接返回,否則則將該執行緒加入到等待佇列
  • 在嘗試獲取鎖失敗後,呼叫addWaiter方法,將該執行緒加入到佇列末尾,等待獲取鎖。
  • 執行緒加入到佇列後,會呼叫acquireQueued()方法,根據公平原則獲取鎖。
  • 如果執行緒在沉睡期間被中斷過,會進行自我中斷一次。

3、tryAcquire()方法:執行緒嘗試獲取鎖.

protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();//獲取當前執行緒
            int c = getState();//獲取鎖的狀態
            if (c == 0) {//如果鎖狀態為0,表示可以獲取鎖
                if (!hasQueuedPredecessors() &&//佇列中沒有等待的執行緒,或者“當前執行緒”是佇列中的第一個執行緒,
                    compareAndSetState(0, acquires)) {//通過cas函式設定鎖的狀態
                    setExclusiveOwnerThread(current);//將鎖的擁有者設定為"當前執行緒"
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {//如果鎖已經被獲取,則判斷擁有鎖的執行緒是否是"當前執行緒"
                int nextc = c + acquires;//如果是兩個執行緒相等,根據可重入的性質,鎖的狀態加1
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);//設定鎖的狀態
                return true;
            }
            return false;
        }

:上面的"當前執行緒"指嘗試獲取鎖的執行緒,不是擁有鎖的執行緒。

  • hasQueuedPredecessors()方法:
public final boolean hasQueuedPredecessors() {
        
        Node t = tail;
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

佇列為空時,鎖的狀態為0,當前執行緒可以嘗試去獲取鎖。當 h != head 時說明佇列不為空,這時需要判斷當前當前執行緒是不是佇列的第一個執行緒,如果是,則返回false,否則返回true。(佇列不為空時,(s = h.next)  != null ,所以接著判斷後面一個條件)

  • compareAndSetState()方法:
protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

compareAndSetState是一個原子操作,將期望值與原來的值比較,如果想等,則將原來的值更新為傳入的值。如:如果當前鎖的狀態為expect,則將鎖的狀態設定為update。整個過程是原子性的。

  • setExclusiveOwnerThread()方法:將擁有鎖的執行緒設定為當前執行緒
 protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

4、addWaiter():如果嘗試獲取鎖失敗,則將執行緒加入到等待佇列。

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);//將當前執行緒包裝成node節點
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;//將node節點的上一個節點引用指向尾節點
            if (compareAndSetTail(pred, node)) {//通過cas函式將node設定成尾節點,如果設定失敗,則通過enq來新增。
                pred.next = node;
                return node;
            }
        }
        enq(node);//初始化佇列或者將節點新增進等待佇列
        return node;
    }
  • Node 節點結構
static final class Node {
       //共享鎖
        static final Node SHARED = new Node();
        //獨佔鎖
        static final Node EXCLUSIVE = null;
        //執行緒被取消標誌
        static final int CANCELLED =  1;
       //節點的後繼節點需要park,並且節點在release後,需要喚醒它的後繼節點
        static final int SIGNAL    = -1;
        //執行緒被阻塞,等待condition喚醒
        static final int CONDITION = -2;
        
        static final int PROPAGATE = -3;

        //新建立的節點狀態為0。
        volatile int waitStatus;

        //上一個節點
        volatile Node prev;

        //下一個節點
        volatile Node next;
        //節點持有的執行緒
        volatile Thread thread;

        //區別佇列是“獨佔鎖”佇列,還是“共享鎖”佇列
        Node nextWaiter;
        
       //如果是共享鎖,返回true
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        //返回上一個節點
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }
  • enq()方法:初始化佇列或者新增執行緒到佇列
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize//如果尾節點為null,則說明佇列還沒有初始化,初始化佇列
                if (compareAndSetHead(new Node()))//將首節點設定為一個空執行緒的node節點
                    tail = head;//初始化尾節點
            } else {
                node.prev = t;//將node節點的上一個節點引用指向尾節點
                if (compareAndSetTail(t, node)) {//通過cas函式將尾節點設定為node節點
                    t.next = node;//上一個節點的下一個節點引用指向node
                    return t;
                }
            }
        }
    }

5、acquireQueued:嘗試去獲取鎖,如果獲取失敗,則執行緒進入睡眠狀態,直到被喚醒。

 final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;//執行緒在睡眠期間是否有過中斷
            for (;;) {
                final Node p = node.predecessor();//“當前節點”的上一個節點("當前節點"是指在佇列中等待的執行緒)
                if (p == head && tryAcquire(arg)) {如果"當前節點"的上一個節點是頭節點,說明"當前節點"是佇列中的第一個節點,嘗試獲取鎖
                    setHead(node);//如果成功獲取到了鎖,則當前節點節點設定成頭節點
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;//返回中斷標誌
                }
                if (shouldParkAfterFailedAcquire(p, node) &&//獲取鎖失敗,將執行緒進入睡眠,等待它的上一個節點喚醒
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
  • shouldParkAfterFailedAcquire
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;//當前節點的上一個節點的狀態
        if (ws == Node.SIGNAL)//如果前繼節點是SIGNAL,後繼節點進入休眠狀態
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {//小於0說明前繼節點已經取消,迴圈前繼節點,直到前繼節點狀態不是取消狀態
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;將“當前節點”的前繼節點指向它的前繼節點的前繼節點
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//通過cas函式,將前繼節點的狀態設定成SIGNAL
        }
        return false;
    }
  • parkAndCheckInterrup
 private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);//阻塞當前執行緒
        return Thread.interrupted();//放回執行緒的中斷標誌,因為interrupted方法會清除執行緒的中斷標誌,所以需要自我中斷一次。
    }

6、selfInterrupt()

static void selfInterrupt() {
        Thread.currentThread().interrupt();//設定中斷標誌
    }

總結:

 public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

1、執行緒首先會呼叫tryAcquire()去嘗試獲取鎖,如果獲取失敗,執行緒會通過addWaiter方法,被新增進入等待佇列。

2、進入等待佇列後,通過acquireQueued方法,再嘗試獲取鎖,如果獲取失敗,這時執行緒會進入休眠狀態,直到執行緒被它的前繼節點喚醒,然後繼續獲取鎖。