1. 程式人生 > >原始碼剖析AQS在幾個同步工具類中的使用

原始碼剖析AQS在幾個同步工具類中的使用

感謝網友【張超盟】的投稿

1. 前言

 abstract static class Sync extends AbstractQueuedSynchronizer 

AQS_hierachy

同時每個類內部都包含有這樣一個屬性,連屬性名都一樣!註釋已經暗示了,該類的同步機制正是通過這個AQS的子類來完成的。不得不感嘆:“每個強大的同步工具類,內心都有一把同樣的鎖!

     /** All mechanics via AbstractQueuedSynchronizer subclass */
    private final Sync sync;

幾種同步類提供的功能其實都是委託sync來完成。有些是部分功能,有些則是全部功能。 本文中就是想嘗試比較分析下在幾個同步工具類下面定義的AQS的子類如何來實現工具類要求的功能。當然包括兩部分,一部分是這些工具類如何使用其Sync這種型別的同步器,也就是工具類向外提供的方法中,如何使用sync這個控制代碼;第二部分,就是工具類中自己定義的內部類Sync繼承自AQS,那到底override了哪些方法來做到以父類AQS為基礎,提供受委託工具類的功能要求。

關於第一部分,sync如何被其工具類使用,請允許我無恥的在一個文章中把一個類所有程式碼貼出來。

      public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
}
    public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }
    public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
    }
    public boolean tryAcquire(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    public void release() {
        sync.releaseShared(1);
    }
    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }
    public void acquireUninterruptibly(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireShared(permits);
    }
    public boolean tryAcquire(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.nonfairTryAcquireShared(permits) >= 0;
    }
    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
    }
    public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }
    public int availablePermits() {
        return sync.getPermits();
    }
    public int drainPermits() {
        return sync.drainPermits();
    }
    protected void reducePermits(int reduction) {
	if (reduction < 0) throw new IllegalArgumentException();
        sync.reducePermits(reduction);
    }
    public final boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }
    public final int getQueueLength() {
        return sync.getQueueLength();
    }
    protected Collection<Thread> getQueuedThreads() {
        return sync.getQueuedThreads();
    }

所幸方法很多,總的程式碼行不多,因為每個方法都是一個風格,就是換個名直接呼叫sync的對應方法。這是Semaphore中對sync的使用。是不是覺得寫這個程式碼的作者比寫這個文章的作者還要無恥?在其他幾個工具類中,沒有這麼誇張,b但基本上也是這個風格,即以一個helper的方式向外面的封裝類提供功能支援。所以第一個問題,在文章中說到這裡,後面涉及到也只會簡單描述。 主要是求索第二個問題,即每個工具類中自己定義的Sync到底是什麼樣子,有哪些不同的特徵,其實也就是程式碼上看這些Sync類對父類AQS做了哪些修改。

2. AQS簡介

要介紹子類的特徵,父類總得大致介紹下。AQS的原理、設計等比較系統的東西,在這裡就不想涉及了。可以參照

《深入淺出 Java Concurrency》系列的深入淺出 Java Concurrency (7): 鎖機制 part 2 AQS一節,謝謝這個系列,作者講的確實非常的深入淺出!要想了解更多,可以參考Doug Lea大師的原著The java.util.concurrent Synchronizer Framework。最簡單的辦法其實就是的耐心把AbstractQueuedSynchronizer原始碼前面註釋的javadoc完整的讀一遍就可以了。筆者反正有這樣的習慣。扎著腦袋看程式碼,看註釋,然後自己看看是否能把一個package有個系統的檢視,如果需要再看相關的參考文件來確認這個系統的檢視。

看一個物件有什麼本事,看他的構成是什麼樣,遠比看他由哪些行為來的要深遠。其實在OOP這種以class方式承載功能的程式設計中,即看一個類包含的屬性,比他的方法也更容易理解物件的作用。看AQS類,暫時拋開outline檢視下需要兩屏才能看完的重要方法(還未展開ConditionObject和Node兩個重要的內部類),只看該類包含的三個重要屬性的定義就能看出端倪。

    private transient volatile Node head;
    private transient volatile Node tail;
    private volatile int state;

註釋其實已經告訴我們了,Node型別的head和tail是一個FIFO的wait queue;一個int型別的狀態位state。到這裡也能猜到AQS對外呈現(或者說宣告)的主要行為就是由一個狀態位和一個有序佇列來配合完成。 最簡單的讀一下主要的四個方法:

     //釋放排他鎖
   public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
   //釋放排他鎖
public final boolean release(int arg) {
 if (tryRelease(arg)) {
  Node h = head;
 if (h != null && h.waitStatus != 0)
 unparkSuccessor(h);
 return true;
 }
return false;
 }
   //獲取共享鎖
 public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
   //釋放共享鎖
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

分別對應鎖的獲取和釋放,只是**shared字尾的表示一組表示共享鎖,而另外一組沒有後綴的表示排他鎖。只用關注每個方法的第一行,都是這種try字型的風格:

  if (try*****(arg)) {
}

即做一個判斷,然後做獲取或者釋放鎖。 其實AQS主要的工作思路正是如此:在獲取鎖時候,先判斷當前狀態是否允許獲取鎖,若是可以則獲取鎖,否則獲取不成功。獲取不成功則會阻塞,進入阻塞佇列。而釋放鎖時,一般會修改狀態位,喚醒佇列中的阻塞執行緒。 跟蹤這幾個try字型的方法定義,發現一個驚人的巧合,這幾個方法在AQS中居然都是一樣的定義:

   protected boolean tr***(int arg) {
        throw new UnsupportedOperationException();
    }

即都是父類中只有定義,在子類中實現。子類根據功能需要的不同,有選擇的對需要的方法進行實現。父類中提供一個執行模板,但是具體步驟留給子類來定義,不同的子類有不同的實現。

3. AQS的重要方法定義

下圖表示compareAndSetState(int, int)的呼叫,可以看的更清楚看到,說明幾個同步工具類內定義的Sync類,即自定義子類中其實都涉及到對state的操作。

AQS_state_reference

而同時不小心觀察到AQS中有一大組final的方法,就是子類不能覆蓋的,大致看下方法內的定義,大部分都是直接或間接涉及對head和tail的操作,即對等待佇列的維護。

final_aqs_method

那在AQS的子類中有沒有對有序佇列的操作呢?檢索下對head和tail的引用即可找到結論。

AQS_head_reference

對head的操作僅限於在AQS類內部,觀察方法的修飾,除了final就是private,即表示這些方法不可能被子類override,或者不可能在子類中直接被呼叫。看下圖對於tail的呼叫也是同樣的風格,即對等待佇列的操作全部不超過AQS類內部。

AQS_tail_reference

於是幾乎可以有這樣的結論:在AQS的設計中,在父類AQS中實現了對等待佇列的預設實現,無論是對共享鎖還是對排他鎖。子類中幾乎不用修改該部分功能,而state在子類中根據需要被賦予了不同的意義,子類通過對state的不同操作來提供不同的同步器功能,進而對封裝的工具類提供不同的功能。 在下面嘗試對以上觀點在AQS各個子類在各個工具類中的使用進行驗證。

4. AQS在子類中的使用

對每個考察會從如下幾個方面來進行

  • 工具類的主要作用
  • 主要獲取鎖方法(其他的類似方法如對應的可以更好的處理中斷和超時或者非同步等特性)
  • 主要釋放鎖方法(其他的類似方法如對應的可以更好的處理中斷和超時或者非同步等特性)
  • 工具類的構造方法(構造方法能告訴我們一個類最在意,最根本的屬性)
  • Sync構造方法
  • Sync介面方法
  • Sync對AQS方法的override
  • state的作用
  • state維護重要邏輯

我們的問題就是這些AQS的子類如何配合父類AQS的框架方法來完成各個工具類不同的鎖需求。分析思路是這樣:

  • 這個工具類是幹什麼用的?可以理解為是功能需求。
  • 這個工具類是通過哪些方法來實現這些功能的?可以理解為分解的需求
  • AQS的子類Sync是如何支援這些方法的?可以理解為需求的實現。

按照如下的思路對每個工具類嘗試進行解析,只是注意以上觀點,可能並沒有覆蓋這個工具類的所有內容(其實就是方法)和對應Sync的所有內容。為了表達清楚些,把重點方法的程式碼引用在文章中,並對重點語句做了標記。因為五鍾同步工具類在一起說明,看上去引用的程式碼有點多。

1) Semaphore

A counting semaphore. Conceptually, a semaphore maintains a set of permits. Each acquire blocks if necessary until a permit is available, and then takes it. Each release adds a permit, potentially releasing a blocking acquirer. However, no actual permit objects are used; the Semaphore just keeps a count of the number available and acts accordingly.

訊號量Semaphore的主要作用是來控制同時訪問某個特定資源的運算元量,或者同時執行某個指定操作的數量。 Semaphore只是計數,不包括許可物件,並且Semaphore也不會把許可與執行緒物件關聯起來,因此一個執行緒中獲得的許可可以在另外一個執行緒中釋放。關於這點的理解可以參照What is mutex and semaphore in Java ? What is the main difference ?的說明。 Semphore對外的兩個方法是 acquire()和release()方法。在許可可用前會阻塞每一個 acquire(),然後再獲取該許可。每呼叫 release() 新增一個許可,釋放一個正在阻塞的獲取者。

   public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

 public void release() {
        sync.releaseShared(1);
    }

達到這樣的操作是通過同步器Sync來操作,可以是FairSync,也可以是NonfairSync。 從Sync的構造方法中,就可以看出Semphore中所謂的permit其實就是AQS中的state。

     public Semaphore(int permits, boolean fair) {
        sync = (fair)? new FairSync(permits) : new NonfairSync(permits);
    }
    Sync(int permits) {
            setState(permits);
        }

工具類是通過Sync的acquireSharedInterruptibly和ReleaseShared的方法提供功能。AQS中定義的這兩個final方法呼叫的是子類對應的try*方法。在這裡覆蓋了tryAcquireShared和tryReleaseShared方法。每一次請求acquire()一個許可都會導致計數器減少1,同樣每次釋放一個許可release()都會導致計數器增加1,一旦達到了0,新的許可請求執行緒將被掛起。

         protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int p = getState();
                //釋放鎖時,許可遞加
                if (compareAndSetState(p, p + releases))
                    return true;
            }
        }

每次釋放鎖時先呼叫該方法時,作用就修改state值為state+release,即表示增加新釋放的許可數。 而tryAcquireShared對應於FairSync,NonfairSync有兩種不同的實現。 FairSync中,總是判斷當前執行緒是等待佇列的第一個執行緒時,獲得鎖,且修改state值為state-acquires。

         protected int tryAcquireShared(int acquires) {
            Thread current = Thread.currentThread();
            for (;;) {
                // FairSync中,總是判斷當前執行緒是等待佇列的第一個執行緒時,獲得鎖
                Thread first = getFirstQueuedThread();
                if (first != null && first != current)
                    return -1;
                int available = getState();
                //獲得鎖,則計數遞減
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

對NonfairSync,不用考慮等待佇列,直接修改state許可數。

          protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                //對NonfairSync,不用考慮等待佇列,直接修改state許可數
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

即不管是公平還是非公平,acquire方法總是會判斷是否還有許可可用,如果有,並且當前執行緒可以獲得,則獲得鎖,許可數相應減少。state在此的作用就是許可數。

總結:Semaphore中使用AQS的子類Sync,初始化state表示許可數,在每一次請求acquire()一個許可都會導致計數器減少1,同樣每次釋放一個許可release()都會導致計數器增加1。一旦達到了0,新的許可請求執行緒將被掛起。

2) CountDownLatch

要求完成的功能是: A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes. A CountDownLatch is initialized with a given count. The await methods block until the current count reaches zero due to invocations of the countDown method, after which all waiting threads are released and any subsequent invocations of await return immediately.

就像名字Latch所表達的一樣,把一組執行緒全部關在外面,在某個狀態時候放開。即一種同步機制來保證一個或多個執行緒等待其他執行緒完成。初始化了一個count計數,當count未遞減到0時候,每次呼叫await方法都會阻塞。每次呼叫來是的的count遞減。 這是CountDownLatch 中“規定”的該工具類應該滿足的功能,詳細的使用的例子不再此介紹。只是分析如何藉助Sync同步器來達到以上功能的。 從建構函式中可以看到該類也維護了一個計數count。這個計數其實也是通過AQS的state來完成的,

   public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);}

CountDownLatch的兩個重要方法是await和方法。定義分別如下。定義await方法的作用是在計數器不為0時候阻塞呼叫執行緒,為0時候立即返回;countDown方法的作用是計數遞減。

      public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public void countDown() {
        sync.releaseShared(1);
    }

看到這兩個方法最終的執行還是同步器中的對應方法。在CountDownLatch中也定義了一個繼承於AQS的Sync。在前面的分析中知道父類的acquireSharedInterruptibly方法和releaseShared其實是分別呼叫到了子類中定義的tryAcquireShared和tryReleaseShared方法。 在CountDownLatch的Sync類中也就僅僅實現了這兩個方法。

其中tryAcquireShared方法內容非常簡單,只是一個三元表示式,但是這個state值為0賦值1,不為0卻賦值-1。看著不太符合我們一般的用法,這主要是為了配合父類AQS中的邏輯。當state為0表示計數遞減完成,則返回值為-1,在父類中滿足條件則執行後續的阻塞操作;當state不為0表示計算器遞減未完成,則返回值為1,在父類呼叫中直接方法結束,不阻塞。

    public int tryAcquireShared(int acquires) {
 //當state為0表示計數遞減完成,則返回值為-1,在父類中滿足條件則執行後續的阻塞操作
            return getState() == 0? 1 : -1;
        }

tryReleaseShared方法主要是對state值的維護,當已經為0,則返回false,父類releaseShared方法直接返回;當state不為0(其實就是大於0,因為count初始化是一個正數),則遞減,並通過cas的方式更新state的值。

    public boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                //當已經為0,則返回false,父類releaseShared方法直接返回
                    return false;
                //當state不為0(其實就是大於0,因為count初始化是一個正數),則遞減,並通過cas的方式更新state的值。
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

總結:CountDownLatch 委託自定義的Sync中的,await()和()方法來完成阻塞執行緒到計數器為0的功能和計數器遞減功能。而該這兩個方法委託給自定義的Sync的()和(int arg)方法。真正實現對state(count)維護的是父類AQS中呼叫子類定義的tryReleaseShared(int)來維護計數count。計數count使用的是AQS的狀態位state。每次呼叫方法計數遞減,在計數遞減到0之前,呼叫await的執行緒都會阻塞。

3)ReentrantLock

名字翻譯很好,可重入鎖。功能需求如下 A reentrant mutual exclusion Lock with the same basic behavior and semantics as the implicit monitor lock accessed using synchronized methods and statements, but with extended capabilities. A ReentrantLock is owned by the thread last successfully locking, but not yet unlocking it. A thread invoking lock will return, successfully acquiring the lock, when the lock is not owned by another thread. The method will return immediately if the current thread already owns the lock. This can be checked using methods , and .

可重入鎖應該是幾種同步工具裡面被用的對多的一個。標準的互斥操作,也就是一次只能有一個執行緒持有鎖,可能是AQS中最重要的一個類。基本功能就關鍵字Synchronize所支援的功能。關於ReentrantLock和Synchronize的差別比較等文章很多,可以參照Java 理論與實踐: JDK 5.0 中更靈活、更具可伸縮性的鎖定機制和《Java Concurrency in Practice》的對應章節。 ReentrantLock對外的主要方法是lock(),()和()方法,當然還有其他變種的()、(long timeout, TimeUnit unit)等。

lock的功能是獲取鎖。如果沒有執行緒使用則立即返回,並設定state為1;如果當前執行緒已經佔有鎖,則state加1;如果其他執行緒佔有鎖,則當前執行緒不可用,等待。

      public void lock() {
        sync.lock();
    }

如果鎖可用,則獲取鎖,並立即返回值 true。如果鎖不可用,立即返回值 false。

  public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }

嘗試釋放鎖,如果當前執行緒佔有鎖則count減一,如果count為0則釋放鎖。若佔有執行緒不是當前執行緒,則拋異常。

      public void unlock() {
        sync.release(1);
    }

可以看到也是藉助Sync來完成,我們下面詳細看下Sync是如何實現這些”規定”的需求的。ReentrantLock的建構函式告訴我們,其支援公平和非公平兩種鎖機制。

      public ReentrantLock(boolean fair) {
        sync = (fair)? new FairSync() : new NonfairSync();
    }

在該類中對應定了兩種FairSync和NonfairSync兩種同步器,都繼承者AQS。可以看到對應執行的是lock、release、和Sync的nonfairTryAcquire。從前面AQS原始碼知道release是在父類AQS中定義的方法,lock和nonfairTryAcquire是這個Sync中特定的方法,不是對父類對應方法的覆蓋。 lock方法有對於FairSync和NoFairSync有兩種不同的實現,對於非公平鎖只要當前沒有執行緒持有鎖,就將鎖給當前執行緒;而公平鎖不能這麼做,總是呼叫acquire方法來和其他執行緒一樣公平的嘗試獲取鎖。

        /**NoFairSync**/
     final void lock() {
            if (compareAndSetState(0, 1))
                //對於非公平鎖只要當前沒有執行緒持有鎖,就將鎖給當前執行緒
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
        /**FairSync**/
          final void lock() {
            acquire(1);
        }

(int arg)方法是在父類AQS中定義,在其實現中先會呼叫子類的(int arg)方法。 對於非公平鎖,通過state是否為0判斷,當前是否有執行緒持有鎖,如果沒有則把鎖分配給當前執行緒;否則如果state不為0,說明當前有執行緒持有鎖,則判斷持有鎖的執行緒是否就是當前執行緒,如果是增加state計數,表示持有鎖的執行緒的重入次數增加。當然增加重入數也會檢查是否超過最大值。

      protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

    final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
            //通過state是否為0判斷,當前是否有執行緒持有鎖,如果沒有則把鎖分配給當前執行緒
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
            //否則如果state不為0,說明當前有執行緒持有鎖,則判斷持有鎖的執行緒是否就是當前執行緒,如果是增加state計數,表示持有鎖的執行緒的重入次數增加
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

對於公平鎖,其(int arg)方法中,如果state為0表示沒有執行緒持有鎖,會檢查當前執行緒是否是等待佇列的第一個執行緒,如果是則分配鎖給當前執行緒;否則如果state不為0,說明當前有執行緒持有鎖,則判斷持有鎖的執行緒釋放就是當前執行緒,如果是增加state計數,表示持有鎖的執行緒的重入次數增加。

          protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (isFirst(current) &&
                    compareAndSetState(0, acquires)) {
               //如果state為0表示沒有執行緒持有鎖,會檢查當前執行緒是否是等待佇列的第一個執行緒,如果是則分配鎖給當前執行緒
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
           //如果state不為0,說明當前有執行緒持有鎖,則判斷持有鎖的執行緒釋放就是當前執行緒,如果是增加state計數,表示持有鎖的執行緒的重入次數增加
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

比較公平鎖機制和非公平鎖機制的差別僅僅在於如果當前沒有執行緒持有鎖,是優先把鎖分配給當前執行緒,還是優先分配給等待佇列中隊首的執行緒。 釋放鎖時候呼叫AQS的(int arg)方法,前面定義知道父類的該方法會先呼叫子類的(int arg)方法。在該方法中主要作用是state狀態位減少release個,表示釋放鎖,如果更新後的state為0;表示當前執行緒釋放鎖,如果不為0,表示持有鎖的當前執行緒重入數減少。

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases; //state狀態位減少release個
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
           //如果更新後的state為0,表示當前執行緒釋放鎖
                free = true;
                setExclusiveOwnerThread(null);
            }//如果不為0,表示持有鎖的當前執行緒重入數減少。
            setState(c);
            return free;
        }

總結: ReentrantLock中定義的同步器分為公平的同步器和非公平的同步器。在該同步器中state狀態位表示當前持有鎖的執行緒的重入次數。在獲取鎖時,通過覆蓋AQS的(int arg)方法,如果沒有執行緒持有則立即返回,並設定state為1;如果當前執行緒已經佔有鎖,則state加1;如果其他執行緒佔有鎖,則當前執行緒不可用。釋放鎖時,覆蓋了AQS的(int arg),在該方法中主要作用是state狀態位減少release個,表示釋放鎖,如果更新後的state為0,表示當前執行緒釋放鎖,如果不為0,表示持有鎖的當前執行緒重入數減少。

4)ReentrantReadWriteLock可重入讀寫鎖

讀寫鎖的要求是: A ReadWriteLock maintains a pair of associated locks, one for read-only operations and one for writing. The may be held simultaneously by multiple reader threads, so long as there are no writers. The is exclusive. All ReadWriteLock implementations must guarantee that the memory synchronization effects of writeLock operations (as specified in the Lock interface) also hold with respect to the associated readLock. That is, a thread successfully acquiring the read lock will see all updates made upon previous release of the write lock.

即讀和讀之間是相容的,寫和任何操作都是排他的。這種鎖機制在資料庫系統理論中應用的其實更為普遍。 允許多個讀執行緒同時持有鎖,但是隻有一個寫執行緒可以持有鎖。讀寫鎖允許讀執行緒和寫執行緒按照請求鎖的順序重新獲取讀取鎖或者寫入鎖。當然了只有寫執行緒釋放了鎖,讀執行緒才能獲取重入鎖。寫執行緒獲取寫入鎖後可以再次獲取讀取鎖,但是讀執行緒獲取讀取鎖後卻不能獲取寫入鎖。 ReentrantReadWriteLock鎖從其要求的功能上來看,是對前面的ReentrantLock的擴充套件,因此功能複雜度上來說也提高了,看看該類下面定義的內部類,除了支援公平非公平的Sync外,還有兩種不同的鎖,ReadLock和WriteLock。

readwritelock

在向下進行之前,有必要回答這樣一個問題,WriteLock和ReadLock好像完成的功能不一樣,看上去似乎是兩把鎖。ReentrantReadWriteLock中分別通過兩個public的方法()和()獲得讀鎖和寫鎖。

      private final ReentrantReadWriteLock.ReadLock readerLock;
    private final ReentrantReadWriteLock.WriteLock writerLock;
    private final Sync sync;

   public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
   public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

但是如果是兩把鎖,可以實現前面功能要求的讀鎖和讀鎖直接的相容,寫鎖和寫鎖直接的互斥,這本身共享鎖和排他鎖就能滿足要求,但是如何實現對同一個物件上讀和寫的控制?明顯,只有一把鎖才能做到。 看上面程式碼片段時候,不小心看到了一個熟悉的欄位Sync,前面的幾個同步工具我們知道了,這些工具類的所有操作最終都是委託給AQS的對應子類Sync來完成,這裡只有一個同步器Sync,那是不是就是隻有一把鎖呢。看看後面的建構函式會驗證我們的猜想。

   public ReentrantReadWriteLock(boolean fair) {
        sync = (fair)? new FairSync() : new NonfairSync();
        //使用了同一個this,即統一this裡面的同一個sync來構造讀鎖和寫鎖
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

沒錯,ReadLockWriteLock使用的其實是一個private的同步器Sync。 下面看下可重入讀寫鎖提供哪些鎖的方法來滿足上面的需求的。

看到ReadLock提供了lock()、()、()、(long timeout, TimeUnit unit)和()方法。我們看下主要的幾個方法的實現如下:lock()方法的作用是獲取讀鎖;()的作用是嘗試當前沒有其他執行緒當前持有寫鎖時獲取讀鎖;方法的作用是釋放讀鎖。

   public void lock() {
            sync.acquireShared(1);
        }
       public  boolean tryLock() {
            return sync.tryReadLock();
        }
       public  void unlock() {
            sync.releaseShared(1);
        }

分別呼叫到Sync的三個方法(int arg) 、(int arg)和 tryReadLock()方法,其中前兩個是AQS父類中定義的,後一個是該Sync中根據自己需要實現的方法。 前面AQS父類的介紹中知道,(int arg) 和(int arg)方法是在父類中定義的,呼叫子類的對應try字型的方法,我們看下在子類Sync中定義對應的try*字型的方法怎麼滿足功能的。先看acquireShared中定義的tryAcquireShared

   protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
               //如果有排他鎖,且持有排他鎖的執行緒不是當前執行緒,則獲取失敗。
                return -1;
            if (sharedCount(c) == MAX_COUNT)
              //否則如果已經加讀鎖的個數超過允許的最大值,丟擲異常
                throw new Error("Maximum lock count exceeded");
            if (!readerShouldBlock(current) &&
                compareAndSetState(c, c + SHARED_UNIT)) {
             //否則檢查是否需要阻塞當前執行緒,如果不阻塞,則使用CAS的方式給更新狀態位state。其中readerShouldBlock在Sync的兩個子類中實現,根據公平非公平的策略有不同的判斷條件
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != current.getId())
                    cachedHoldCounter = rh = readHolds.get();
                rh.count++;
                return 1;
            }
            return fullTryAcquireShared(current);
        }

嘗試獲取讀鎖的方法是這樣的:如果有排他鎖,但是持有排他鎖的執行緒不是當前執行緒,則獲取失敗;否則如果已經加讀鎖的個數超過允許的最大值,丟擲異常;否則檢查是否需要阻塞當前執行緒,如果不阻塞,則使用CAS的方式給更新狀態位state。其中readerShouldBlock在Sync的兩個子類中實現,根據公平非公平的策略有不同的判斷條件。

對應的releaseShared中呼叫的tryReleaseShared定義如下

   protected final boolean tryReleaseShared(int unused) {
            HoldCounter rh = cachedHoldCounter;
            Thread current = Thread.currentThread();
            if (rh == null || rh.tid != current.getId())
                rh = readHolds.get();
            if (rh.tryDecrement() <= 0)
                throw new IllegalMonitorStateException();
            for (;;) {
                int c = getState();
              // 釋放讀鎖時更新狀態位的值
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

可以看到主要的作用在準備釋放讀鎖時更新狀態位的值。 Sync中提供給ReadLock用的tryReadLock方法和tryAcquireShared內容和邏輯差不多,而且本文想著重分析的Sync對父類AQS的方法如何改變來達到需要的功能,所以這個方法這裡不分析了。 可以看到加鎖時候state增加了一個SHARED_UNIT,在釋放鎖時state減少了一個SHARED_UNIT。為什麼是SHARED_UNIT,而不是1呢?這個看了下面兩個方法的定義就比較清楚了。

         /** Returns the number of shared holds represented in count  */
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        /** Returns the number of exclusive holds represented in count  */
       static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

原來為了只用一個state狀態位來表示兩種鎖的資訊,高位16位表示共享鎖的狀態位,低位16位表示獨佔鎖的狀態位。至於讀鎖和寫鎖的狀態位的意思,隨著後面分析會逐步更清楚。 在看到這裡的時候,讀鎖的狀態位的意思應該是比較清楚,表示當前持有共享鎖的執行緒數。有一個新的執行緒過了想使用共享鎖,如果其他執行緒也只是加了共享鎖,則當前執行緒就可以加共享鎖,每加一次,狀態位遞加一些,因為儲存在高16位,所以遞加時是加一個SHARED_UNIT。

      public void lock() {
            sync.acquire(1);
        }
   public boolean tryLock( ) {
            return sync.tryWriteLock();
        }
   public void unlock() {
            sync.release(1);
        }

看到分別呼叫了Sync的acquire() release() 和tryWriteLock方法,其中前兩個都是定義在父類AQS的方法。呼叫了子類定義的對應try字型的方法。tryAcquire和tryRelease方法。這裡我們就看下子類的這兩個try*字型的方法做了哪些事情。

   protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
                 //通過state的判斷,當有讀鎖時獲取不成功
                 //當有寫鎖,如果持有寫鎖的執行緒不是當前執行緒,則獲取不成功
                // (Note: if c != 0 and w == 0 then shared count != 0)
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
            }//如果可以獲取,則CAS的方式更新state,並設定當前執行緒排他的獲取鎖
            if ((w == 0 && writerShouldBlock(current)) ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

tryAcquire中嘗試獲取排他鎖。結合排他鎖的語義和程式碼邏輯不難看到:通過state的判斷,當有讀鎖時獲取不成功,當有寫鎖,如果持有寫鎖的執行緒不是當前執行緒,則獲取不成功。如果可以獲取,則CAS的方式更新state,並設定當前執行緒排他的獲取鎖。writerShouldBlock定義在Sync的子類中,對於FaireSync和UnFairSync有不同的判斷。 接下來看tryRelease方法,主要作用是在釋放排他鎖時候更新state,減去releases的數目。看到這裡發現寫鎖中用到的Sync和可重入鎖ReentrantLock整個邏輯都對應的差不多。

           protected final boolean tryRelease(int releases) {
            //釋放排他鎖時候更新state,減去releases的數目
            int nextc = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            if (exclusiveCount(nextc) == 0) {
                setExclusiveOwnerThread(null);
                setState(nextc);
                return true;
            } else {
                setState(nextc);
                return false;
            }
        }

只是觀察到寫鎖state更新加和減和前面的幾種比較類似,直接操作的就是傳入的整形引數,這在讀鎖的時候講過了,因為排他鎖的狀態位是儲存在state的低16位。

總結ReentrantReadWriteLock中提供了兩個Lock:ReentrantReadWriteLock.ReadLockReentrantReadWriteLock.WriteLock。對外提供功能的是兩個lock,但是內部封裝的是一個同步器Sync,有公平和不公平兩個版本。借用了AQS的state狀態位來儲存鎖的計數資訊。高16位表示共享鎖的數量,低16位表示獨佔鎖的重入次數。在AQS子類的對應try字型方法中實現對state的維護。

5)FutureTask

先看需求 A cancellable asynchronous computation. This class provides a base implementation of Future, with methods to start and cancel a computation, query to see if the computation is complete, and retrieve the result of the computation. The result can only be retrieved when the computation has completed; the get method will block if the computation has not yet completed. Once the computation has completed, the computation cannot be restarted or cancelled.

理解其核心需求是,一個執行任務,開始執行後可以被取消,可以檢視執行結果,如果執行結果未完成則阻塞。 一般表示一個輸入待執行任務。線上程池中FutureTask中一般的用法就是構造一個FutureTask,然後提交execute,返回的型別還是FutureTask,呼叫其get方法即可得到執行結果。 run方法定義的就是任務執行的內容,在工作執行緒中被呼叫。通過建構函式可以看到FutureTask封裝的了一個Runnable的物件,另外一個泛型引數result。猜也可以猜到前者就是執行的任務內容,後者是來接收執行結果的。可以看到功能還是委託給Sync物件,構造的引數是一個有執行結果的呼叫Callable,也可以直接使用一個Callable引數。

      public FutureTask(Runnable runnable, V result) {
        sync = new Sync(Executors.callable(runnable, result));
    }

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        sync = new Sync(callable);
    }

FutureTask實現了RunnableFuture介面,也即實現了Runnable和Future介面。作業執行緒執行的內容是FutureTask的的run方法內定義的任務內容。如執行緒池 ThreadPoolExecutor.Worker.runTask(Runnable task)方法可以看到線上程池的Worker執行緒中呼叫到執行任務的run方法。這裡使用Sync的作用,就是在任務執行執行緒和提交任務(同時也是獲取任務執行結果)的執行緒之間維持一個鎖的關係,保證只有執行結束後才能獲取到結果。

FutureTask的任務執行方法是

  public void run() {
        sync.innerRun();
}

獲取執行結果的方法是

      public V get() throws InterruptedException, ExecutionException {
        return sync.innerGet();
    }

設定執行結果的方法是

      protected void set(V v) {
        sync.innerSet(v);
    }

能看到,都是調到對應的Sync的對應方法。最主要的是innerRun方法,通過CAS的方式設定任務執行狀態位RUNNING,執行傳入的回撥,並把執行結果呼叫innerSet進行賦值。

   void innerRun() {
              //設定任務執行狀態位RUNNING
            if (!compareAndSetState(0, RUNNING))
                return;
            try {
                runner = Thread.currentThread();
                if (getState() == RUNNING) // recheck after setting thread
                    //獲取和設定回撥的結果
                    innerSet(callable.call());
                else
                    releaseShared(0); // cancel
            } catch (Throwable ex) {
                innerSetException(ex);
            }
        }

在innerSet方法中設定執行狀態位為執行結束,並把執行結果賦值給result。

       void innerSet(V v) {
	    for (;;) {
		int s = getState();
		if (s == RAN)
		    return;
                if (s == CANCELLED) {
		   releaseShared(0);
                    return;
                }
                   //設定執行狀態位為執行結束,並把執行結果賦值給result
		if (compareAndSetState(s, RAN)) {
                    result = v;
                    releaseShared(0);
                    done();
		    return;
                }
            }
        }

前面方法把執行結果放在result中,我們知道future介面定義的get方法來獲取執行結果,那如何來判斷另外一個執行緒已經執行完畢呢?看到FutureTask的get方法還是呼叫到Sync的innerGet方法。 innerGet方法根據判斷執行狀態來獲取執行結果。acquireSharedInterruptibly方法其實呼叫的是子類中定義的tryAcquireShared來判斷任務釋放執行完畢或者取消。如果未完畢或取消,則掛起當前執行緒。

           V innerGet() throws InterruptedException, ExecutionException {
            //acquireSharedInterruptibly方法其實呼叫的是子類中定義的tryAcquireShared來判斷任務釋放執行完畢或者取消。如果未完畢或取消,則掛起當前執行緒
            acquireSharedInterruptibly(0);
            if (getState() == CANCELLED)
                throw new CancellationException();
            if (exception != null)
                throw new ExecutionException(exception);
            return result;
        }

tryAcquireShared方法的定義如下,呼叫innerIsDone方法,根據state的狀態值做出判斷,如果結束則返回1,未結束返回-1。當tryAcquireShared返回-1,則在父類AQS中獲取共享鎖的執行緒會阻塞。即實現“任務未完成呼叫get方法的執行緒會阻塞”這樣的功能。

     protected int tryAcquireShared(int ignore) {
    //呼叫innerIsDone方法,根據state的狀態值做出判斷,如果結束則返回1,未結束返回-1。當tryAcquireShared返回-1,則在父類AQS中獲取共享鎖的執行緒會阻塞。
            return innerIsDone()? 1 : -1;
        }
    boolean innerIsDone() {
            return ranOrCancelled(getState()) && runner == null;
        }
   private boolean ranOrCancelled(int state) {
            return (state & (RAN | CANCELLED)) != 0;
        }

tryReleaseShared沒有做什麼事情,因為不像前面四種其實都有鎖的意味,需要釋放鎖。在FutureTask中state表示任務的執行狀態,在幾乎每個方法的開始都會判讀和設定狀態。

      protected boolean tryReleaseShared(int ignore) {
            runner = null;
            return true;
        }

總結:在FutureTask實現了非同步的執行和提交,作為可以被Executor提交的物件。通過Sync來維護任務的執行狀態,從而保證只有工作執行緒任務執行完後,其他執行緒才能獲取到執行結果。AQS的子類Sync在這裡主要是借用state狀態位來儲存執行狀態,來完成對對各種狀態以及加鎖、阻塞的實現。

最後終於理解了這早前就算了解的類,名字為什麼叫FutureTask,實現了Future介面(滿足在future的某個時間獲取執行結果,這是Future介面的取名的意義吧),另外在執行中作為對執行任務的封裝,封裝了執行的任務內容,同時也封裝了執行結果,可以安全的把這個任務交給另外的執行緒去執行,只要執行get方法能得到結果,則一定是你想要的結果,真的是很精妙。

5. 總結對照

本文主要側重AQS的子類在各個同步工具類中的使用情況,其實也基本涵蓋了這幾個同步工具類的主要邏輯,但目標並不是對這幾個同步工具類的程式碼進行詳細解析。另外AQS本身的幾個final方法,才是同步器的公共基礎,也不是本文的主題,也未詳細展開。其實寫這篇文章的一個初始目的真的只是想列出如下表格,對比下AQS中的各個子類是怎麼使用state的,居然囉嗦了這麼多。

工具類 工具類作用 工具類加鎖方法 工具類釋放鎖方法 Sync覆蓋的方法 Sync非覆蓋的重要方法 state的作用 鎖型別 鎖維護
Semaphore 控制同時訪問某個特定資源的運算元量 acquire:每次請求一個許可都會導致計數器減少1,,一旦達到了0,新的許可請求執行緒將被掛起 release:每呼叫 新增一個許可,釋放一個正在阻塞的獲取者 tryAcquireShared tryReleaseShared 表示初始化的許可數 共享鎖 每一次請求acquire()一個許可都會導致計數器減少1,同樣每次釋放一個許可release()都會導致計數器增加1,一旦達到了0,新的許可請求執行緒將被掛起。
CountDownLatch 把一組執行緒全部關在外面,在某個狀態時候放開。一種同步機制來保證一個或多個執行緒等待其他執行緒完成。 await:在計數器不為0時候阻塞呼叫執行緒,為0時候立即返回 countDown :計數遞減 tryAcquireShared tryReleaseShared 維護一個計數器 共享鎖 初始化一個計數,每次呼叫countDown方法計數遞減,在計數遞減到0之前,呼叫await的執行緒都會阻塞
ReentrantLock 標準的互斥操作,也就是一次只能有一個執行緒持有鎖 lock:如果沒有執行緒使用則立即返回,並設定state為1;如果當前執行緒已經佔有鎖,則state加1;如果其他執行緒佔有鎖,則當前執行緒不可用,等待 tryLock:如果鎖可用,則獲取鎖,並立即返回值 true。如果鎖不可用,則此方法將立即返回值 false unlock:嘗試釋放鎖,如果當前執行緒佔有鎖則count減一,如果count為0則釋放鎖。如果佔有執行緒不是當前執行緒,則拋異常 tryAcquire tryRelease nonfairTryAcquir state表示獲得鎖的執行緒對鎖的重入次數。 排他鎖。 獲取鎖時,如果沒有執行緒使用則立即返回,並設定state為1;如果當前執行緒已經佔有鎖,則state加1;如果其他執行緒佔有鎖,則當前執行緒不可用。釋放鎖時,在該方法中主要作用是state狀態位減少release個,表示釋放鎖,如果更新後的state為0,表示當前執行緒釋放鎖,如果不為0,表示持有鎖的當前執行緒重入數減少
ReentrantReadWriteLock 讀寫鎖。允許多個讀執行緒同時持有鎖,但是隻有一個寫執行緒可以持有鎖。寫執行緒獲取寫入鎖後可以再次獲取讀取鎖,但是讀執行緒獲取讀取鎖後卻不能獲取寫入鎖 ReadLock#lock :獲取讀鎖 ReadLock#tryLock:嘗試當前沒有其他執行緒當前持有寫鎖時獲取讀鎖 WriteLock#lock:獲取寫鎖 WriteLock#tryLock:嘗試當前沒有其他執行緒持有寫鎖時,呼氣寫鎖。 ReadLock#unlock:釋放讀鎖 WriteLock#unlock:釋放寫鎖 acquireShared releaseShared tryAcquire tryRelease tryReadLock tryWriteLock 高16位表示共享鎖的數量,低16位表示獨佔鎖的重入次數 讀鎖:共享 寫鎖:排他 對於共享鎖,state是計數器的概念。一個共享鎖就相對於一次計數器操作,一次獲取共享鎖相當於計數器加1,釋放一個共享鎖就相當於計數器減1;排他鎖維護類似於可重入鎖。
FutureTask 封裝一個執行任務交給其他執行緒去執行,開始執行後可以被取消,可以檢視執行結果,如果執行結果未完成則阻塞。 V get() run() set(V) cancel(boolean) tryAcquireShared tryReleaseShared innerGet innerRun() innerSet innerIsCancelled state狀態位來儲存執行狀態RUNNING、RUN、CANCELLED 共享鎖 獲取執行結果的執行緒(可以有多個)一直阻塞,直到執行任務的執行緒執行完畢,或者執行任務被取消。

完。

張超盟 an ExTrender,CS資料管理方向工學碩士。與妻兒蝸居於錢江畔,就職一初創安全公司任資料服務團隊負責人,做資料(儲存、挖掘、服務)方面研發。愛資料,愛程式碼,愛技術,愛豆吧