1. 程式人生 > >java併發程式設計實戰之理解顯示鎖ReentrantLock

java併發程式設計實戰之理解顯示鎖ReentrantLock

前面兩篇部落格分別介紹了通過synchronized關鍵字(內建鎖機制)和volatile關鍵字實現同步機制。由於volatile實現的同步不能保證操作的原子性,因此一般常用內建鎖實現同步機制,但java5.0版本的內建鎖在功能上有很多缺陷:如無法中斷一個正在等待獲取鎖的執行緒、無法在請求獲取一個鎖時無限地等待下去等,基於這些原因,ReentrantLock孕育而生。

1.ReentrantLock如何實現同步機制?

ReentrantLock類實現了Lock介面,介面把加鎖操作抽象為一組方法。

public interface Lock{
	void lock();    // 獲得鎖,如果鎖被佔用,則被阻塞
// 獲得鎖,如果鎖被佔用,則可以中斷被阻塞的鎖 void lockInterruptibly() throws InterruptedException; boolean tryLock(); // 嘗試獲得鎖,如果成功則為true,否則為false // 在time時間內嘗試獲得鎖 boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock(); // 釋放鎖 Condition newCondition(); }

從介面的方法可知,不單單提供簡單的獲得鎖和釋放鎖的操作,還提供其他一些功能,這也是內建鎖所不具備的特點。ReentrantLock類完全實現了Lock介面,並提供了與synchronized相同的互斥性和記憶體可見性。

要理解ReentrantLock類實現同步的原理,就需要看一下原始碼的實現。

1.1ReentrantLock類的建構函式有兩種方式:

    public ReentrantLock() {
        sync = new NonfairSync();
    }

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

ReentrantLock不帶引數的構造方法生成一個(非公平的)sync物件; ReentrantLock帶引數構造方法根據傳入的true或者false分別構造公平的、不公平的sync物件。

NonfairSync類和FairSync類都繼承自抽象類Sync,它們都實現父類Sync的抽象方法:lock()和tryAcquire()。這裡的lock()實現則是ReentrantLock物件實際呼叫的方法。

1.2 ReentrantLock類的lock(),即獲得鎖的過程:

  1. NonfairSync類的lock()方法
    final void lock() {
	      // 如果鎖未被佔有則設定鎖的狀態為1,並把當前執行緒置為鎖的擁有者
	      if (compareAndSetState(0, 1))
	          setExclusiveOwnerThread(Thread.currentThread());
	      else
	           acquire(1);
    }

compareAndSetState()是抽象類AbstractQueuedSynchronizer類的方法,通過unsafe物件的本地方法compareAndSwapInt()來實現(比較並交換操作即CAS操作);AQS類內部有一個state的全域性變數,該變數在ReentrantLock類中的含義是:鎖獲取操作的次數。因此compareAndSetState(0,1)的意思是:如果state值與方法的第一個引數相同,則設定state值為方法的第二個引數,這裡表示state值是否為0,若為0則設定為1並返回true,表示該執行緒獲得鎖,否則返回false。返回true則執行方法setExclusiveOwnerThread(thread)把獲得鎖的執行緒設定為鎖的擁有者。返回false則執行acquire(1)。即獲取鎖失敗後的操作:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

這個方法可以分三步來理解: 第一步:通過tryAcquire(arg)嘗試獲取鎖,該方法是NonfairSync類從父類Sync類繼承過來並自己實現的方法。【它和lock()方法組成了NonfairSync類】tryAcquire(arg)方法實則呼叫Sync類的nonfairTryAcquire(arg)方法:

    final boolean nonfairTryAcquire(int acquires) {
        // 獲取當前請求鎖的執行緒
        final Thread current = Thread.currentThread();
        // 獲取當前鎖的狀態 state變數是AQS的全域性變數
        int c = getState();
        // 若當前鎖狀態為0 表明鎖處於空閒狀態 則
        if (c == 0) {
            // 則通過CAS操作設定鎖狀態為acquires引數的值(預設傳進來值為1) 表明鎖已經被佔有
            if (compareAndSetState(0, acquires)) {
                // 鎖狀態設定成功後 並把當前執行緒置為鎖的擁有者
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 若鎖狀態不為0 表明鎖已經被佔有,則判斷當前執行緒是否是該鎖的擁有者,若不是則返回false,否則返回true 表明同一個執行緒請求鎖 即重入性
        else if (current == getExclusiveOwnerThread()) {
            // state狀態值加1 表明鎖被重複請求,釋放時需要多次釋放
            int nextc = c + acquires;
            // 若鎖狀態小於0,則拋異常
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

如果當前鎖未被佔有,或者請求的執行緒是鎖的擁有者執行緒(重入),則返回true;否則,返回false; 如果是重入則tryAcquire()返回true,acqure()方法直接結束; 如果不是重入,則tryAcquire()返回false,繼續執行acquireQueued(addWaiter(Node.EXCLUSIVE),arg)。

第二步:既然無法獲得鎖,又不是重入,表明該執行緒要被阻塞,放入獲取鎖的等待佇列中,通過呼叫addWaiter(Node mode)方法放入佇列中:

    private Node addWaiter(Node mode) {
        // 把當前執行緒構造成一個node節點 引數mode表示Node節點型別
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        // 若tail節點不為空 tail節點為尾節點 enq方法的head節點為頭結點
        if (pred != null) {
            // 則把tail節點設定為node節點的前節點
            node.prev = pred;
            // 設定node節點為tail節點  node節點的下節點則為tail節點
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 若tail節點為空,則執行enq
        enq(node);
        return node;
    }

enq():

    private Node enq(final Node node) {
        // 內旋迴圈
        for (;;) {
            Node t = tail;
            // 若tail節點為空,則把node節點初始化為head節點
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    // head節點同時設定為tail節點
                    tail = head;
            } else {
                // 若tail節點不為空,則把tail節點設定為node節點的前節點
                node.prev = t;
                // 設定node節點為tail節點 並把tail節點的下一節點指標指向node
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
          }
       }

addWaiter(Node. EXCLUSIVE)方法返回含有當前執行緒的node節點;

第三步:通過acquireQueued()方法阻塞請求鎖失敗後的執行緒:

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 獲取node節點的前驅節點p
                final Node p = node.predecessor();
                // 如果前驅節點p為head節點,則node節點的執行緒嘗試獲取鎖
                if (p == head && tryAcquire(arg)) {
                    // 獲取成功則把node節點設定為head節點,前驅節點p的後節點斷開,便於gc
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        // 若前驅節點狀態為SIGNAL,即為-1,則返回true
        if (ws == Node.SIGNAL)
            return true;
        // 如果前驅節點狀態大於0,即前驅節點被取消了,且去掉被取消的前驅結點
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 如果前驅節點不大於0,則通過CAS設定前驅節點狀態為SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

如果前驅節點狀態為SIGNAL,則返回true,並執行parkAndCheckInterrupt();其他狀態返回false,繼續迴圈直到返回true;

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

最後返回Thread類的interrupted()方法,即執行緒被阻塞

至此執行緒獲取鎖成功和失敗後的流程分析完畢。總結一句話就是:某執行緒請求鎖,如果鎖沒有被佔有或者執行緒是鎖的所有者則可以獲得鎖,即請求成功;否則請求鎖的執行緒通過步驟二放入等待佇列,通過步驟三實現執行緒阻塞,即請求失敗。

  1. FairSync類的lock()方法:
 final void lock() {
     acquire(1);
 }

直接呼叫acquire(1)方法:

 public final void acquire(int arg) {
     if (!tryAcquire(arg) &&
         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
         selfInterrupt();
 }

獲取鎖同樣有三個步驟:

第一步:通過tryAcquire(1)重試獲取鎖;

   protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

在非公平鎖第一步中,當鎖的狀態為0時,請求的執行緒直接獲取鎖;在公平鎖中請求執行緒是否獲得鎖需要通過hasQueuedPredeceddors()方法進行判斷:

   public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

結果1:若等待佇列中head節點等於tail節點,即空佇列,則返回fasle,從而請求執行緒獲得鎖; 結果2:若head節點的後驅節點不為空,且後驅節點的執行緒等於當前請求鎖的執行緒,即重入性,則返回false,從而請求執行緒獲得鎖; 結果3:其他情況返回true,執行緒無法獲得鎖。’

第二步和第三步與非公平鎖呼叫的方法相同,都是AQS類的方法。

公平鎖的lock()和非公平鎖的lock()的區別在於:請求的執行緒是否是直接獲取鎖。

1.3 ReentrantLock的unlock()過程,即釋放鎖

	public void unlock() {
	    sync.release(1);
	}

直接呼叫sync類的release(1)方法:

   public final boolean release(int arg) {
	    // tryRelease方法返回true則釋放鎖,否則不釋放鎖
	    if (tryRelease(arg)) {
	        Node h = head;
	        // 若head節點不為空 且head節點的狀態值不為0,則
	        if (h != null && h.waitStatus != 0)
	            // 喚醒下一個node
	            unparkSuccessor(h);
	        return true;
	    }
	    return false;
	}

tryRelease()的返回值決定是否釋放鎖:

     protected final boolean tryRelease(int releases) {
            // 獲取當前鎖的狀態值,並減去1
            int c = getState() - releases;
            // 若當前執行緒不是該鎖的所有者則丟擲異常
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            // 若鎖的狀態值為0,則表明鎖可以被釋放,並清空鎖的所有者
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
      }

主要根據鎖的狀態值來判斷: >若為0則可以釋放並清空鎖的所有者,並喚醒下一個等待的執行緒; >若不為0則不釋放鎖,且修改當前鎖的狀態值。 ReentrantLock類關係繼承圖 注:更多關於AQS的內容可以參考部落格(https://blog.csdn.net/pfnie/article/details/53191892)

ReentrantLock類是否可以替代Synchronized?

答案當然是不可以。ReentrantLock加鎖和釋放鎖都是顯示的,所以如果在加鎖後的程式碼中發生異常,若沒有正確的釋放鎖則導致該鎖永遠得不到釋放,並且無法查詢最初發生錯誤的位置。這一個巨大的隱患使得ReentrantLock永遠無法替代Synchronized。

ReentrantLock效能 VS Synchronized效能?

java5中的ReentrantLock比內建鎖提供更好的競爭效能;java6改進了內建鎖的演算法,使得兩者效能差不多。使用只要考慮適合的場景即可,即內建鎖無法滿足需求時,才會考慮使用ReentrantLock。

總結

本篇博文首先分析了為什麼會有ReentrantLock類,然後從原始碼的角度解釋ReentrantLock加鎖和釋放鎖的原理,從公平鎖和非公平鎖的角度分別闡述;最後簡單的刪除ReentrantLock和synchronized的效能區別。 注:原始碼分析過程中,有些術語可能不太好理解,具體可參考下面兩篇部落格: https://blog.csdn.net/pfnie/article/details/53191892 https://blog.csdn.net/linxdcn/article/details/72844011 個人感覺這兩篇部落格都很清晰,從實際例子的角度闡述了程式碼的執行過程。

參考書籍:

《java併發程式設計實戰》 《實戰java高併發程式設計》