1. 程式人生 > >Reentrantlock原始碼解析(一):lock方法

Reentrantlock原始碼解析(一):lock方法

    對於java中實現併發加鎖的方式可以分為兩種,一種是重量級的synchronized,一種是concurrent包下的lock介面。本系列文章將對這兩種鎖和依賴於cpu嗅探技術的volatile進行詳細說明。

    在Lock接口出現前,Java程式是靠synchronized的關鍵字實現鎖功能的,而Java SE 5 下之後,併發包中新增了Lock介面及其相關實現類用來實現鎖功能,它提供了與synchronized關鍵字類似的同步功能,知識在使用時需要顯示的獲取和釋放鎖。雖然它缺少了隱式獲取釋放鎖的便捷性,但是擁有了鎖獲取與釋放的可操作性,可中斷的獲取鎖以及超時獲取鎖等synchronized關鍵字所不具備的同步特性。

    使用範例:這裡使用的是lock的實現類Reentrantlock

private static Lock lock = new ReentrantLock();

public static void critical() {

lock.lock();

    try {

        System.out.println("start " + Thread.currentThread().getName());

Thread.sleep(1000);

System.out.println("end " + Thread.currentThread().getName())

;

} catch (InterruptedException e) {

        e.printStackTrace();

} finally {

lock.unlock();

}

}

     注意:不能在try塊中進行lock,如果程式碼發生異常,會導致鎖的無故釋放

    在介紹Reentrantlock前首先需要介紹一下AbstractQueuedSynchronizer,下面簡稱AQS。它是一個非常重要的同步佇列,concurrent包下許多工具都用到了這個佇列同步器,比如著名的CountDownLatch。通過檢視原始碼可以發現,AQS在內部實現了一個鏈式FIFO的雙向佇列,擁有Head和Tail指標,並且實現了setHead(),setTail()等佇列方法。既然是佇列,那勢必有結點,AQS就是通過對一個個結點的管理實現同步作用的,噹噹前執行緒獲取鎖失敗的時候,AQS將當前執行緒的狀態包裝為一個結點,放入內部的queue中並阻塞當前執行緒。當可以再次獲取鎖的時候,AQS將首結點對應的執行緒喚醒並再次嘗試獲取鎖。

    下面用一張表說明結點內的屬性。

屬    性 定    義
Node SHARED = new Node() 表示Node處於共享模式
Node EXCLUSIVE = null 表示Node處於獨佔模式
int CANCELLED = 1 因為超時或者中斷,Node被設定為取消狀態,被取消的Node不應該去競爭鎖,只能保持取消狀態不變,不能轉換為其他狀態,處於這種狀態的Node會被踢出佇列,被GC回收
int SIGNAL = -1 表示這個Node的繼任Node被阻塞了,到時需要通知它
 int CONDITION = -2 表示這個Node在條件佇列中,因為等待某個條件而被阻塞 
int PROPAGATE = -3 使用在共享模式頭Node有可能處於這種狀態, 表示鎖的下一次獲取可以無條件傳播
 int waitStatus 0,新Node會處於這種狀態 
 Node prev 佇列中某個Node的前驅Node 
 Node next 佇列中某個Node的後繼Node 
Thread thread 這個Node持有的執行緒,表示等待鎖的執行緒
Node nextWaiter 表示下一個等待condition的Node    

    再用一張表說明AQS的屬性和方法

屬性/方法 含    義
Thread exclusiveOwnerThread 這個是AQS父類AbstractOwnableSynchronizer的屬性,表示獨佔模式同步器的當前擁有者
Node 上面已經介紹過了,FIFO佇列的基本單位
Node head FIFO佇列中的頭Node
Node tail FIFO佇列中的尾Node
int state 同步狀態,0表示未鎖
int getState() 獲取同步狀態
setState(int newState) 設定同步狀態
boolean compareAndSetState(int expect, int update)  利用CAS進行State的設定 
 long spinForTimeoutThreshold = 1000L 執行緒自旋等待的時間 
Node enq(final Node node)  插入一個Node到FIFO佇列中 
Node addWaiter(Node mode) 為當前執行緒和指定模式建立並擴充一個等待佇列
void setHead(Node node) 設定佇列的頭Node
void unparkSuccessor(Node node) 如果存在的話,喚起Node持有的執行緒
void doReleaseShared() 共享模式下做釋放鎖的動作
void cancelAcquire(Node node) 取消正在進行的Node獲取鎖的嘗試
boolean shouldParkAfterFailedAcquire(Node pred, Node node) 在嘗試獲取鎖失敗後是否應該禁用當前執行緒並等待
void selfInterrupt() 中斷當前執行緒本身
boolean parkAndCheckInterrupt() 禁用當前執行緒進入等待狀態並中斷執行緒本身
boolean acquireQueued(final Node node, int arg) 佇列中的執行緒獲取鎖
tryAcquire(int arg) 嘗試獲得鎖(由AQS的子類實現它
tryRelease(int arg) 嘗試釋放鎖(由AQS的子類實現它
isHeldExclusively() 是否獨自持有鎖
acquire(int arg) 獲取鎖
release(int arg) 釋放鎖
compareAndSetHead(Node update) 利用CAS設定頭Node
compareAndSetTail(Node expect, Node update) 利用CAS設定尾Node
compareAndSetWaitStatus(Node node, int expect, int update) 利用CAS設定某個Node中的等待狀態

    上表中有很多的CAS操作,這裡解釋一下CAS,全稱為CompareAndSwap,即比較並交換。顧名思義,當需要更新值的時候,先用當前值和舊值進行比較,如果相同則更新,不同則返回一個錯誤碼。它是concurrent存在的基礎,CAS的實現並不是通過程式碼,而是通過CPU對其的支援,不同的CPU對CAS的實現方式不同,但是最後的效果是相同的,當需要觸發CAS的時候,通過呼叫Java的Unsafe類對硬體級別進行操作,從而實現比較並交換這一操作,Unsafe類是Java自己留的後門,從而實現不依賴native方法就可以實現硬體級別的原子操作。

    Reentrantlock中的鎖分為兩種,公平鎖和非公平鎖,通過new Reentrantlock(true)可以獲得公平鎖,預設是非公平鎖。對於公平鎖,每次有執行緒到達時,都會先通過hasQueuedPredecessors()方法檢視等待佇列是否為空,不為空則加入等待佇列。而非公平鎖則不然,可以通過檢視下面我的原始碼分析看出,非公平鎖回多次嘗試對鎖的所有權進行搶佔,全部失敗之後,才會按公平鎖的方式完全加入等待佇列。至於為什麼要用非公平鎖,廢話當然是因為它快啊,這一點在原始碼的註釋中也寫了。Reentrantlock是一種可重入鎖,也就是一個執行緒可以獲取鎖多次。作業系統在切換執行緒的時候,是需要儲存並切換上下文的,這也就是為什麼在單核的cpu上進行多執行緒操作時反而更加費時的原因。如果是公平鎖,當前執行緒讓出鎖之後,必須交給等待佇列的隊首元素對應的執行緒佔有鎖。而非公平鎖則可以直接進行重入,這樣就不會存在上下文切換的時間,從而提高了效率。下文介紹的lock和unlock都是針對非公平鎖的。

1.lock

    首先,我們假設執行緒1先佔有了鎖,這個過程是非常簡單的

    (1)首先,呼叫

private final Sync sync;

public ReentrantLock(boolean fair) {

    sync = fair ? new FairSync() : new NonfairSync();

}

例項化出一個非公平鎖,其中Sync是一個繼承了AQS的類。

    (2)呼叫sync的lock方法

final void lock() {

    if (compareAndSetState(0, 1))

        setExclusiveOwnerThread(Thread.currentThread());

    else

        acquire(1);

}

    (3)呼叫AQS的compareAndSetState方法,將state屬性設為1,表示執行緒1獨佔鎖

protected final boolean compareAndSetState(int expect, int update) {

    // See below for intrinsics setup to support this

    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);

}

    (4)呼叫AQS父類AOS的setExclusiveOwnerThread方法將佔有鎖的執行緒設為當前執行緒

protected final void setExclusiveOwnerThread(Thread thread) {

    exclusiveOwnerThread = thread;

}

    此時執行緒1成功佔有鎖,此時AQS中的state=1,exclusiveOwnerThread = thread1

    執行緒1佔有鎖之後,執行緒2到來,由於是非公平鎖,執行緒2從到來直到加入佇列的過程中,它會多次嘗試搶佔鎖,如果均不成功,最後會阻塞並被加入到AQS的等待佇列中,下圖是呼叫鏈。

(1)呼叫lock()方法

final void lock() {

    if (compareAndSetState(0, 1))

        setExclusiveOwnerThread(Thread.currentThread());

    else

        acquire(1);

}

    在呼叫lock時,首先會嘗試使用CAS判斷state是不是0,如果是0就設為1。這裡是非公平的一個體現,新到執行緒可以去搶佔隊首節點的鎖的所有權。但是現在這裡是失敗的,因為執行緒1已經把state設定成了1。所以跳到第二步

(2)呼叫AQS的acquire方法

public final void acquire(int arg) {

    if (!tryAcquire(arg) &&

        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

        selfInterrupt();

}

    可以看到這個if語句,之前說過,非公平鎖從開始到放入等待佇列結束期間,會多次嘗試獲得鎖,這裡的tryAcquire()就是嘗試獲得鎖,非公平鎖中它最後會走nonfairTryAcquire方法

protected final boolean tryAcquire(int acquires) {

    return nonfairTryAcquire(acquires);

}

final boolean nonfairTryAcquire(int acquires) {

    final Thread current = Thread.currentThread();

    int c = getState();

    if (c == 0) {

        if (compareAndSetState(0, acquires)) {

            setExclusiveOwnerThread(current);

            return true;

        }

    }

    else if (current == getExclusiveOwnerThread()) {

        int nextc = c + acquires;

        if (nextc < 0) // overflow

            throw new Error("Maximum lock count exceeded");

        setState(nextc);

        return true;

    }

    return false;

}

    首先獲取到當前執行緒,獲取當前佔有鎖的執行緒數,如果state等於0說明鎖沒有被佔用。此時嘗試用CAS將state設定為1,如果失敗,再判斷佔有鎖的是不是當前執行緒。如果是,走else if裡的程式碼,則開始重入,這裡是對鎖的重入次數做了一個限制,我們知道,整型的數如果超過範圍會溢位為負數,所以當鎖被當前執行緒重入時,每次都會對state加一,如果state變成負數,那麼說明重入次數已經超過2147483647次,此時丟擲異常。很簡單的就實現了一個偏向鎖。

    如果acquire失敗,走第二個條件嘗試將當前執行緒加入等待佇列中。首先走addWaiter方法將當前執行緒的資訊包裝成node並加入隊尾

private Node addWaiter(Node mode) {

    Node node = new Node(Thread.currentThread(), mode);

    // Try the fast path of enq; backup to full enq on failure

    Node pred = tail;

    if (pred != null) {

        node.prev = pred;

        if (compareAndSetTail(pred, node)) {

            pred.next = node;

            return node;

        }

    }

    enq(node);

    return node;

}

    首先先建立一個節點,因為是獨佔鎖,所以實際上傳進來的mode是個NULL,此時new Node會建立一個獨佔模式的節點。此時先獲取一下尾節點,如果尾節點都是null那麼說明這個佇列就是空,此時走enq方法新建一個佇列。如果存在,用CAS把它設為佇列的頭節點,用CAS的原因是cpu時間片是切換的,可能有其他執行緒把佇列已經建立好了,此時CAS失敗,由於tail是volatile的,所以對當前執行緒可見,此時看見tail不為null再次迴圈後將會將當前節點放到隊尾。

private Node enq(final Node node) {

    for (;;) {

        Node t = tail;

        if (t == null) { // Must initialize

            if (compareAndSetHead(new Node()))

                tail = head;

        } else {

            node.prev = t;

            if (compareAndSetTail(t, node)) {

                t.next = node;

                return t;

            }

        }

    }

}

    在enq中這裡用了一個for的死迴圈保證佇列是一定新建成功的,它會先再次判斷一下佇列是不是不存在,然後新建頭節點,然後把當前節點連在頭節點後面。並將當前節點設為尾節點。這裡其實是這樣的,

    到這裡算是把一個就緒的節點放入AQS的等待佇列中了,開始走acquireQueued(addWaiter(Node.EXCLUSIVE), arg))最外層的acquireQueued方法。這個方法主要是對新加入等待佇列的執行緒進行阻塞。

final boolean acquireQueued(final Node node, int arg) {

    boolean failed = true;

    try {

        boolean interrupted = false;

        for (;;) {

            final Node p = node.predecessor();

            if (p == head && tryAcquire(arg)) {

                setHead(node);

                p.next = null; // help GC

                failed = false;

                return interrupted;

            }

            if (shouldParkAfterFailedAcquire(p, node) &&

                parkAndCheckInterrupt())

                interrupted = true;

        }

    } finally {

        if (failed)

            cancelAcquire(node);

    }

}

    這裡第一個if語句是對當前節點能否獲得鎖進行判斷,因為此時可能鎖已經被釋放。如果當前節點是佇列中的第一個節點而且能夠獲得鎖,那麼就把當前節點設定為頭節點。如果還是不能獲取鎖,那麼需要對這條執行緒進行阻塞。但因為並不是所有執行緒都需要阻塞,比如取消狀態。這裡開始走shouldParkAfterFailedAcquire(p, node)方法判斷是否需要阻塞。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

    int ws = pred.waitStatus;

    if (ws == Node.SIGNAL)

        /*

         * This node has already set status asking a release

         * to signal it, so it can safely park.

         */

        return true;

    if (ws > 0) {

        /*

         * Predecessor was cancelled. Skip over predecessors and

         * indicate retry.

         */

        do {

            node.prev = pred = pred.prev;

        } while (pred.waitStatus > 0);

        pred.next = node;

    } else {

        /*

         * waitStatus must be 0 or PROPAGATE.  Indicate that we

         * need a signal, but don't park yet.  Caller will need to

         * retry to make sure it cannot acquire before parking.

         */

        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

    }

    return false;

}

    因為是多執行緒環境,當前節點的位置是有可能變得,所以這裡需要再判斷一下status,這裡根據status設定三個規則

    規則1:如果前繼的節點狀態為SIGNAL,表明當前節點需要unpark,則返回成功,此時acquireQueued方法的第12行(parkAndCheckInterrupt)將導致執行緒阻塞

    規則2:如果前繼節點狀態為CANCELLED(ws>0),說明前置節點已經被放棄,則回溯到一個非取消的前繼節點,返回false,acquireQueued方法的無限迴圈將遞迴呼叫該方法,直至規則1返回true,導致執行緒阻塞

    規則3:如果前繼節點狀態為非SIGNAL、非CANCELLED,則設定前繼的狀態為SIGNAL,返回false後進入acquireQueued的無限迴圈,與規則2同

    這裡如果返回true,說明執行緒需要被阻塞,那麼通過parkAndCheckInterrupt呼叫native方法進行阻塞。

private final boolean parkAndCheckInterrupt() {

    LockSupport.park(this);

    return Thread.interrupted();

}

public static void park(Object blocker) {

    Thread t = Thread.currentThread();

    setBlocker(t, blocker);

    UNSAFE.park(false, 0L);

    setBlocker(t, null);

}

    至此,lock過程結束。