1. 程式人生 > >java.util.concurrent包詳細分析

java.util.concurrent包詳細分析

概述

java.util.concurrent 包含許多執行緒安全、測試良好、高效能的併發構建塊。不客氣地說,建立java.util.concurrent 的目的就是要實現 Collection 框架對資料結構所執行的併發操作。通過提供一組可靠的、高效能併發構建塊,開發人員可以提高併發類的執行緒安全、可伸縮性、效能、可讀性和可靠性。

此包包含locks,concurrent,atomic 三個包。

Atomic:原子資料的構建。

Locks:基本的鎖的實現,最重要的AQS框架和lockSupport

Concurrent:構建的一些高階的工具,如執行緒池,併發佇列等。

其中都用到了CAS(compare-and-swap)操作。CAS 是一種低級別的、細粒度的技術,它允許多個執行緒更新一個記憶體位置,同時能夠檢測其他執行緒的衝突並進行恢復。它是許多高效能併發演算法的基礎。在 JDK 5.0 之前,Java 語言中用於協調執行緒之間的訪問的惟一原語是同步,同步是更重量級和粗粒度的。公開 CAS 可以開發高度可伸縮的併發 Java 類。這些更改主要由 JDK 庫類使用,而不是由開發人員使用。

CAS操作都封裝在java 不公開的類庫中,sun.misc.Unsafe。此類包含了對原子操作的封裝,具體用原生代碼實現。本地的C程式碼直接利用到了硬體上的原子操作。

Atomic原子資料

 這個包裡面提供了一組原子變數類。其基本的特性就是在多執行緒環境下,當有多個執行緒同時執行這些類的例項包含的方法時,具有排他性,即當某個執行緒進入方法,執行其中的指令時,不會被其他執行緒打斷,而別的執行緒就像自旋鎖一樣,一直等到該方法執行完成,才由JVM從等待佇列中選擇一個另一個執行緒進入,這只是一種邏輯上的理解。實際上是藉助硬體的相關指令來實現的,不會阻塞執行緒(或者說只是在硬體級別上阻塞了)。可以對基本資料、陣列中的基本資料、對類中的基本資料進行操作。原子變數類相當於一種泛化的volatile變數,能夠支援原子的和有條件的讀-改-寫操作。

 java.util.concurrent.atomic中的類可以分成4組:

標量類(Scalar):AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference

陣列類:AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray

更新器類:AtomicLongFieldUpdater,AtomicIntegerFieldUpdater,AtomicReferenceFieldUpdater

複合變數類:AtomicMarkableReference,AtomicStampedReference

標量類

第一組AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference這四種基本型別用來處理布林,整數,長整數,物件四種資料,其內部實現不是簡單的使用synchronized,而是一個更為高效的方式CAS (compare and swap) + volatile和native方法,從而避免了synchronized的高開銷,執行效率大為提升。

他們的實現都是依靠 真正的值為volatile 型別,通過Unsafe 包中的原子操作實現。最基礎就是CAS,他是一切的基礎。如下 。其中offset是 在記憶體中 value相對於基地址的偏移量。(它的獲得也由Unsafe 原生代碼獲得)。關於加鎖的原理見附錄。

核心程式碼如下,其他都是在compareAndSet基礎上構建的。

1. private static final Unsafe unsafe = Unsafe.getUnsafe();

2. private volatile int value;  

3. public final int get() {  

4.         return value;  

5. }  

6. public final void set(int newValue) {  

7.         value = newValue;  

8. }  

9. public final boolean compareAndSet(int expect, int update) {  

10.    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);  

11.}

void set()和void lazySet():set設定為給定值,直接修改原始值;lazySet延時設定變數值,這個等價於set()方法,但是由於欄位是volatile型別的,因此次欄位的修改會比普通欄位(非volatile欄位)有稍微的效能延時(儘管可以忽略),所以如果不是想立即讀取設定的新值,允許在“後臺”修改值,那麼此方法就很有用。

getAndSet( )方法,利用compareAndSet迴圈自旋實現。

原子的將變數設定為新資料,同時返回先前的舊資料。

其本質是get( )操作,然後做set( )操作。儘管這2個操作都是atomic,但是他們合併在一起的時候,就不是atomic。在Java的源程式的級別上,如果不依賴synchronized的機制來完成這個工作,是不可能的。只有依靠native方法才可以。

Java程式碼  

1.  public final int getAndSet(int newValue) {  

2.      for (;;) {  

3.          int current = get();  

4.          if (compareAndSet(current, newValue))  

5.              return current;  

6.      }  

7. 

對於 AtomicInteger、AtomicLong還提供了一些特別的方法。貼別是如,

getAndAdd( ):以原子方式將給定值與當前值相加, 相當於執行緒安全的t=i;i+=delta;return t;操作。

以實現一些加法,減法原子操作。(注意 --i、++i不是原子操作,其中包含有3個操作步驟:第一步,讀取i;第二步,加1或減1;第三步:寫回記憶體)

陣列類

第二組AtomicIntegerArray,AtomicLongArray還有AtomicReferenceArray類進一步擴充套件了原子操作,對這些型別的陣列提供了支援。這些類在為其陣列元素提供 volatile 訪問語義方面也引人注目,這對於普通陣列來說是不受支援的。

他們內部並不是像AtomicInteger一樣維持一個valatile變數,而是全部由native方法實現,如下
AtomicIntegerArray的實現片斷:

Java程式碼  

1.  private static final Unsafe unsafe = Unsafe.getUnsafe();  

2.  private static final int base = unsafe.arrayBaseOffset(int[].class);  //陣列基地址

3.  private static final int scale = unsafe.arrayIndexScale(int[].class);  //陣列元素佔的大小精度

4.  private final int[] array;  

5.  public final int get(int i) {  

6.          return unsafe.getIntVolatile(array, rawIndex(i));  

7.  }  

8.  public final void set(int i, int newValue) {  

9.          unsafe.putIntVolatile(array, rawIndex(i), newValue);  

10. }

11.

12.  private longrawIndex(int i) {//獲取具體某個元素的偏移量

13.         if (i <0 || i >= array.length)

14.             thrownew IndexOutOfBoundsException("index " + i);

15.         return base+ (long) i * scale;

16. }

更新器類

第三組AtomicLongFieldUpdater,AtomicIntegerFieldUpdater,AtomicReferenceFieldUpdater基於反射的實用工具,可以對指定類的指定 volatile 欄位進行原子更新。API非常簡單,但是也是有一些約束:

(1)欄位必須是volatile型別的

(2)欄位的描述型別(修飾符public/protected/default/private)是與呼叫者與操作物件欄位的關係一致。也就是說 呼叫者能夠直接操作物件欄位,那麼就可以反射進行原子操作。但是對於父類的欄位,子類是不能直接操作的,儘管子類可以訪問父類的欄位。

(3)只能是例項變數,不能是類變數,也就是說不能加static關鍵字。

(4)只能是可修改變數,不能使final變數,因為final的語義就是不可修改。實際上final的語義和volatile是有衝突的,這兩個關鍵字不能同時存在。

(5)對於AtomicIntegerFieldUpdater 和AtomicLongFieldUpdater 只能修改int/long型別的欄位,不能修改其包裝型別(Integer/Long)。如果要修改包裝型別就需要使用AtomicReferenceFieldUpdater 。

複合變數類

防止ABA問題出現而構造的類。如什麼是ABA問題呢,當某些流程在處理過程中是順向的,也就是不允許重複處理的情況下,在某些情況下導致一個數據由A變成B,再中間可能經過0-N個環節後變成了A,此時A不允許再變成B了,因為此時的狀態已經發生了改變,他們都是對atomicReference的進一步包裝,AtomicMarkableReferenceAtomicStampedReference功能差不多,有點區別的是:它描述更加簡單的是與否的關係,通常ABA問題只有兩種狀態,而AtomicStampedReference是多種狀態,那麼為什麼還要有AtomicMarkableReference呢,因為它在處理是與否上面更加具有可讀性。

Lcoks 鎖

此包中實現的最基本的鎖,阻塞執行緒的LockSupport。核心是AQS框架(AbstractQueuedSynchronizer),是J U C(java util concurrent) 最複雜的一個類。

Lock 和Synchronized

J U C 中的Lock和synchronized具有同樣的語義和功能。不同的是,synchronized 鎖在退出塊時自動釋放。而Lock 需要手動釋放,且Lock更加靈活。Syschronizd 是 java 語言層面的,是系統關鍵字;Lock則是java 1.5以來提供的一個類。

Synchronized 具有以下缺陷,它無法中斷一個正在等候獲得鎖的執行緒;也無法通過投票得到鎖,如果不想等下去,也就沒法得到鎖;同步還要求鎖的釋放只能在與獲得鎖所在的堆疊幀相同的堆疊幀中進行。

而Lock(如ReentrantLock)除了與Synchronized 具有相同的語義外,還支援鎖投票定時鎖等候可中斷鎖等候(就是說在等待鎖的過程中,可以被中斷)的一些特性。

Lock. lockInterruptibly ,呼叫後,或者獲得鎖,或者被中斷後丟擲異常。優先響應異常。這點可以用 類似以下程式碼測試。

 Thread a = new Thread(task1, "aa");

       Thread b = new Thread(task1, "bb");

       a.start();

       b.start();

       b.interrupt();

LockSupport 和java內建鎖

   在LockSupport出現之前,如果要block/unblock某個Thread,除了使用Java語言內建的monitor機制之外,只能通過Thread.suspend()和Thread.resume()。然而Thread.suspend()和Thread.resume()基本上不可用,除了可能導致死鎖之外,它們還存在一個無法解決的競爭條件:如果在呼叫Thread.suspend()之前呼叫了Thread.resume(),那麼該Thread.resume()呼叫沒有任何效果。LockSupport最主要的作用,便是通過一個許可(permit)狀態,解決了這個問題。LockSupport 只能阻塞當前執行緒,但是可以喚醒任意執行緒。

     那麼LockSupport和Java語言內建的monitor機制有什麼區別呢?它們的語義是不同的。LockSupport是針對特定Thread來進行block/unblock操作的;wait()/notify()/notifyAll()是用來操作特定物件的等待集合的。為了防止知識生鏽,在這裡簡單介紹一下Java語言內建的monitor機制(詳見:http://whitesock.iteye.com/blog/162344 )。正如每個Object都有一個鎖,每個Object也有一個等待集合(wait set),它有wait、notify、notifyAll和Thread.interrupt方法來操作。同時擁有鎖和等待集合的實體,通常被成為監視器(monitor)。每個Object的等待集合是由JVM維護的。等待集合一直存放著那些因為呼叫物件的wait方法而被阻塞的執行緒。由於等待集合和鎖之間的互動機制,只有獲得目標物件的同步鎖時,才可以呼叫它的wait、notify和notifyAll方法。這種要求通常無法靠編譯來檢查,如果條件不能滿足,那麼在執行的時候呼叫以上方法就會導致其丟擲IllegalMonitorStateException。

    wait() 方法被呼叫後,會執行如下操作:

·        如果當前執行緒已經被中斷,那麼該方法立刻退出,然後丟擲一個InterruptedException異常。否則執行緒會被阻塞。

·        JVM把該執行緒放入目標物件內部且無法訪問的等待集合中。

·        目標物件的同步鎖被釋放,但是這個執行緒鎖擁有的其他鎖依然會被這個執行緒保留著。當執行緒重新恢復質執行時,它會重新獲得目標物件的同步鎖。

    notify()方法被呼叫後,會執行如下操作:

·        如果存在的話,JVM會從目標物件內部的等待集合中任意移除一個執行緒T。如果等待集合中的執行緒數大於1,那麼哪個執行緒被選中完全是隨機的。

·        T必須重新獲得目標物件的同步鎖,這必然導致它將會被阻塞到呼叫Thead.notify()的執行緒釋放該同步鎖。如果其他執行緒在T獲得此鎖之前就獲得它,那麼T就要一直被阻塞下去。

·        T從執行wait()的那點恢復執行。

    notifyAll()方法被呼叫後的操作和notify()類似,不同的只是等待集合中所有的執行緒(同時)都要執行那些操作。然而等待集合中的執行緒必須要在競爭到目標物件的同步鎖之後,才能繼續執行。

 在標準的Sun jdk 中,Locksupport的實現基於Unsafe,都是原生代碼,android的實現不全是原生代碼。

一個執行緒呼叫park阻塞之後,如果被其他執行緒呼叫interrupt(),那麼他它會響應中斷,解除阻塞,但是不會丟擲interruption 異常。這點在構造可中斷獲取鎖的時候用到了。

AbstractQueuedSynchronizer

AQS框架是 J U C包的核心。是構建同步、鎖、訊號量和自定義鎖的基礎。也是構建高階工具的基礎。

從上圖可以看到,鎖,訊號量的實現內部都有兩個內部類,都繼承AQS。

由於AQS的構建上採用模板模式(Template mode),即 AQS定義一些框架,而它的實現延遲到子類。如tryAcquire()方法。由於這個模式,我們如果直接看AQS原始碼會比較抽象。所以從某個具體的實現切入簡單易懂。這裡選澤ReentrantLock ,它和Synchronized具有同樣的語義。

簡單說來,AbstractQueuedSynchronizer會把所有的請求執行緒構成一個CLH佇列,當一個執行緒執行完畢(lock.unlock())時會啟用自己的後繼節點,但正在執行的執行緒並不在佇列中,而那些等待執行的執行緒全 部處於阻塞狀態,經過調查執行緒的顯式阻塞是通過呼叫LockSupport.park()完成,而LockSupport.park()則呼叫 sun.misc.Unsafe.park()本地方法,再進一步,HotSpot在Linux中中通過呼叫pthread_mutex_lock函式把 執行緒交給系統核心進行阻塞。

ReentrantLock

從ReentrantLock(可重入鎖)開始,分析AQS。首先需要知道這個鎖和java 內建的同步Synchronized具有同樣的語義。如下程式碼解釋重入的意思

Lock lock = new ReentrantLock();

    public void test() {

       lock.lock();

       System.out.print("I am test1");

       test(); // 遞迴呼叫 ……………………………1 遞迴呼叫不會阻塞,因為已經獲得了鎖,這就是重入的含義

       // test2();// 呼叫test2 ………………………2

       lock.unlock();// 這裡應該放在finally 塊中,這裡簡單省略,以後一樣。

    }

    public void test2() {

       lock.lock();

       System.out.println("I am test1");

       test2();//

       lock.unlock();

    }

重入的意思就是,如果已經獲得了鎖,如果執行期間還需要獲得這個鎖的話,會直接獲得所,不會被阻塞,獲得鎖的次數加1;每執行一次unlock,持有鎖的次數減1,當為0時釋放鎖。這點,Synchronized 具有同樣語義。

檢視原始碼,可以看到ReentrantLock 對Lock介面的實現,把所有的操作都委派給一個叫Sync的類,如下原始碼:

其中Sync的定義如右圖

所以這個Syc類是關鍵。而Sync 基礎AQS。Sync又有兩個子類,

final static class NonfairSync extends Sync 

final static class FairSync extends Sync 

顯然是為了支援公平鎖和非公平鎖而定義,預設情況下為非公平鎖。

先理一下Reentrant.lock()方法的呼叫過程(預設非公平鎖):

這 些討厭的Template模式導致很難直觀的看到整個呼叫過程,其實通過上面呼叫過程及AbstractQueuedSynchronizer的註釋可以發現,AbstractQueuedSynchronizer中抽象了絕大多數Lock的功能,而只把tryAcquire方法延遲到子類中實現。 tryAcquire方法的語義在於用具體子類判斷請求執行緒是否可以獲得鎖,無論成功與否AbstractQueuedSynchronizer都將處理後面的流程。

NonfairSync 和 FairSync 不同的是執行lock時做的操作,如下為 NonfairSync 的操作,其中compareAndSetState(intexpect, int des) 為AQS的方法,設定同步狀態,NonfairSync 通過修改同步狀態獲得鎖,鎖定不成功才執行acquire(1),此方法也在AQS中定義。而 FairSync.lock 直接執行acquire(1)。

final void lock() {

            if (compareAndSetState(0, 1))

                setExclusiveOwnerThread(Thread.currentThread());

            else

                acquire(1);

}

AQS中的Acquire(int)方法呼叫子類中的tryAcquire(int)實現,這裡正是模板模式。如下面的原始碼。自此已經進入到了AQS的實現。

public final void acquire(int arg) {

        if (!tryAcquire(arg) &&

            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

            selfInterrupt();

}

其他方法的呼叫順序類似,如unlock 呼叫AQS的release ,release 呼叫Sync的tryRelease()。

下面看NonfairSync.tryAcquire,它呼叫Sync.nonfairTryAcquire。以下為實現,首先獲取同步狀態c,o代表鎖沒有執行緒正在競爭鎖。如果c=0,那麼嘗試用CAS操作獲得鎖;或者c!=0,但是鎖被當前執行緒擁有,那麼獲得鎖的次數增加 acquires 次,這就是重入的概念。以上兩種情況都成功獲得鎖,返回真。如果不是以上兩種情況,就沒有獲得鎖,返回假。

final boolean nonfairTryAcquire(int acquires) {

            final Thread current = Thread.currentThread();

            int c = getState();

            if (c == 0) {

                if (compareAndSetState(0, acquires)) {

                    setExclusiveOwnerThread(current);

                    return true;

                }

            }

            else if (current == getExclusiveOwnerThread()) {

                int nextc = c + acquires;

                if (nextc < 0) // overflow

                    throw new Error("Maximum lock count exceeded");

                setState(nextc);

                return true;

            }

            return false;

        }

如果沒有獲得鎖,即NonfairSync.tryAcqiuer()返回假,那麼可以看出 AQS.acquire 將執行acquireQueued(addWaiter(Node.EXCLUSIVE), arg);將此執行緒追加到等待佇列的隊尾。其中Node是AQS的一個內部類,他是等待佇列節點的抽象。

private Node addWaiter(Node mode) {

        Node node = new Node(Thread.currentThread(), mode);

        // Try the fast path of enq; backup to full enq on failure

        Node pred = tail;

        if (pred != null) {

            node.prev = pred;

            if (compareAndSetTail(pred, node)) {

                pred.next = node;

                return node;

            }

        }

        enq(node);

        return node;

}

其中mode指的是模式,NULL 為獨佔,否則為共享鎖。RetranLock為獨佔鎖。首先把執行緒包裝為一個節點。然後獲取等待佇列的尾,如果不為NULL的話(這說明有其他執行緒在待佇列中行),就把初始化node的前驅為pred.( node.prev = pred) 然後通過CAS操作把node 設定為新的隊尾,如果成功則設定pred的後繼為 node.至此 快速進隊完成。

但是如果pred為null(此時沒有執行緒在等待,一開始tail 就是null) ,或者CAS設定隊尾失敗。則需要執行下面的入隊流程。 這裡可能是整個阻塞佇列的初始化過程。Tail 為null

private Node enq(final Node node) {

        for (;;) {

            Node t = tail;

            if (t == null) { // Must initialize

                Node h = new Node(); // Dummy header

                h.next = node;

                node.prev = h;

                if (compareAndSetHead(h)) {

                    tail = node;

                    return h;

                }

            }

            else {

                node.prev = t;

                if (compareAndSetTail(t, node)) {

                    t.next = node;

                    return t;

                }

            }

       }

    }

該方法就是迴圈呼叫CAS,即使有高併發的場景,無限迴圈將會最終成功把當前執行緒追加到隊尾(或設定隊頭)。總而言之,addWaiter的目的就是通過CAS把當前現在追加到隊尾,並返回包裝後的Node例項。

把執行緒要包裝為Node物件的主要原因,除了用Node構造供虛擬佇列外,還用Node包裝了各種執行緒狀態,這些狀態被精心設計為一些數字值:

SIGNAL(-1) :執行緒的後繼執行緒正/已被阻塞,當該執行緒release或cancel時要重新這個後繼執行緒(unpark)

CANCELLED(1):因為超時或中斷,該執行緒已經被取消

CONDITION(-2):表明該執行緒被處於條件佇列,就是因為呼叫了Condition.await而被阻塞。

PROPAGATE(-3):傳播共享鎖

0:0代表無狀態

接下來執行acquireQueued(Node)方法。acquireQueued的主要作用是把已經追加到佇列的執行緒節點(addWaiter方法返回值)進行阻塞,但阻塞前又通過tryAccquire重試是否能獲得鎖,如果重試成功能則無需阻塞,直接返回。

final boolean acquireQueued(final Node node, int arg) {

        try {

            boolean interrupted = false;

            for (;;) {

                final Node p = node.predecessor();

                if (p == head && tryAcquire(arg)) {

                    setHead(node);

                    p.next = null; // help GC

                    return interrupted;

                }

                if (shouldParkAfterFailedAcquire(p, node) &&

                    parkAndCheckInterrupt())

                    interrupted = true;

            }

        } catch (RuntimeException ex) {

            cancelAcquire(node);

            throw ex;

        }

    }

以上的迴圈不會無限進行,因為接下來執行緒會被阻塞。這由parkAndCheckInterrupt()方法實現,但是它只有在shouldParkAfterFailedAcquire 方法返回 true 的時候後才會繼續執行進而阻塞。所以看 shouldParkAfterFailedAcquire方法,從方法的名字看 意思是,當獲取鎖失敗的時候是否應該阻塞。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

        int ws = pred.waitStatus;

        if (ws == Node.SIGNAL)

            /*

             * This node has already set status asking a release

             * to signal it, so it can safely park

             */

            return true;

        if (ws > 0) {

            /*

             * Predecessor was cancelled. Skip over predecessors and

             * indicate retry.

             */

        do {

        node.prev = pred = pred.prev;

        } while (pred.waitStatus > 0);

        pred.next = node;

        } else {

            /*

             * waitStatus must be 0 or PROPAGATE. Indicate that we

             * need a signal, but don't park yet. Caller will need to

             * retry to make sure it cannot acquire before parking.

             */

            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

        }

        return false;

    }

此方法的作用是根據它的前驅節點決定本節點做什麼樣的操作。前面已經說過Node的節點的waitState 表示它個後繼節點 需要做什麼操作。這裡就是對執行緒狀態的檢查,所有這個方法引數中有前驅節點。

檢查原則在於:

規則1:如果前繼的節點狀態為SIGNAL,表明當前節點需要unpark,則返回成功,此時acquireQueued方法的第12行(parkAndCheckInterrupt)將導致執行緒阻塞

規則2:如果前繼節點狀態為CANCELLED(ws>0),說明前置節點已經被放棄,則回溯到一個非取消的前繼節點,返回false,acquireQueued方法的無限迴圈將遞迴呼叫該方法,直至規則1返回true,導致執行緒阻塞

規則3:如果前繼節點狀態為非SIGNAL、非CANCELLED,則設定前繼的狀態為SIGNAL,返回false後進入acquireQueued的無限迴圈,與規則2同

總體看來,shouldParkAfterFailedAcquire就是靠前繼節點判斷當前執行緒是否應該被阻塞,如果前繼節點處於CANCELLED狀態,則順便刪除這些節點重新構造佇列。

至此,獲取鎖完畢。

請求鎖不成功的執行緒會被掛起在acquireQueued方法的第12行,12行以後的程式碼必須等執行緒被解鎖鎖才能執行,假如被阻塞的執行緒得到解鎖,則執行第13行,即設定interrupted = true,之後又進入無限迴圈。

解鎖的過程相對簡單一些。

呼叫關係如下順序 ReentrantLock.unlock()    AQS.release()  --Synx.tryRealse()

無限迴圈的程式碼可以看出,並不是得到解鎖的執行緒一定能獲得鎖,必須在第6行中呼叫tryAccquire重新競爭,因為鎖是非公平的,有可能被新加入的線程獲得,從而導致剛被喚醒的執行緒再次被阻塞,這個細節充分體現了非公平的精髓。此可以看到,把tryAcquire方法延遲到子類中實現的做法非常精妙並具有極強的可擴充套件性,令人歎為觀止!當然精妙的不是這個Templae設計模式,而是Doug Lea對鎖結構的精心佈局。

public void unlock() {

        sync.release(1);

}

release的語義在於:如果可以釋放鎖,則喚醒佇列第一個執行緒(Head.next)。release先呼叫tryRelease呼叫是否解鎖成功,解鎖成長才進行下一步操作。

public final boolean release(int arg) {

        if (tryRelease(arg)) {

            Node h = head;

            if (h != null && h.waitStatus != 0)

                unparkSuccessor(h);

            return true;

        }

        return false;

    }

tryReleasetryAcquire語義相同,把如何釋放的邏輯延遲到子類中。tryRelease語義很明確:如果執行緒多次鎖定,則進行多次釋放,直至status==0則真正釋放鎖,所謂釋放鎖即設定status0,因為無競爭所以沒有使用CAS。如下原始碼

protected final boolean tryRelease(int releases) {

            int c = getState() - releases;

            if (Thread.currentThread() != getExclusiveOwnerThread())

                throw new IllegalMonitorStateException();

            boolean free = false;

            if (c == 0) {

                free = true;

                setExclusiveOwnerThread(null);

            }

            setState(c);

            return free;

        }

下面的原始碼是喚醒佇列的第一個執行緒。但是其可能被取消,當被取消的時候,從隊尾往前找執行緒。(不從對頭開始的原因是,隊尾一直在變化,不容易判斷)

private void unparkSuccessor(Node node) {

        /*

         * If status is negative (i.e., possibly needing signal) try

         * to clear in anticipation of signalling. It is OK if this

         * fails or if status is changed by waiting thread.

         */

        int ws = node.waitStatus;

        if (ws < 0)

            compareAndSetWaitStatus(node, ws, 0);

        /*

         * Thread to unpark is held in successor, which is normally

         * just the next node.  But if cancelled or apparently null,

         * traverse backwards from tail to find the actual

         * non-cancelled successor.

         */

        Node s = node.next;

        if (s == null || s.waitStatus > 0) {

            s = null;

            for (Node t = tail; t != null && t != node; t = t.prev)

                if (t.waitStatus <= 0)

                    s = t;

        }

        if (s != null)

            LockSupport.unpark(s.thread);

    }

可中斷鎖的實現:本質是呼叫 AQS. 他在響應中斷後直接跳出迴圈,丟擲異常,而正常額Lock 忽略這個中斷,只是簡單的記錄下,然後繼續迴圈。

private void doAcquireInterruptibly(int arg)

        throws InterruptedException {

        final Node node = addWaiter(Node.EXCLUSIVE);

        try {

            for (;;) {

                final Node p = node.predecessor();

                if (p == head && tryAcquire(arg)) {

                    setHead(node);

                    p.next = null; // help GC

                    return;

                }

                if (shouldParkAfterFailedAcquire(p, node) &&

                    parkAndCheckInterrupt())

                    break;

            }

        } catch (RuntimeException ex) {

            cancelAcquire(node);

            throw ex;

        }

        // Arrive here only if interrupted

        cancelAcquire(node);

        throw new InterruptedException();

    }

超時鎖的實現基本類似,就是阻塞一段時間後自己恢復,如果有中斷則丟擲異常。

private boolean doAcquireNanos(int arg, long nanosTimeout)

        throws InterruptedException {

        long lastTime = System.nanoTime();

        final Node node = addWaiter(Node.EXCLUSIVE);

        try {

            for (;;) {

                final Node p = node.predecessor();

                if (p == head && tryAcquire(arg)) {

                    setHead(node);

                    p.next = null; // help GC

                    return true;

                }

                if (nanosTimeout <= 0) {

                    cancelAcquire(node);

                    return false;

                }

                if (nanosTimeout > spinForTimeoutThreshold &&

                    shouldParkAfterFailedAcquire(p, node))

                    LockSupport.parkNanos(this, nanosTimeout);

                long now = System.nanoTime();

                nanosTimeout -= now - lastTime;

                lastTime = now;

                if (Thread.interrupted())

                    break;

            }

        } catch (RuntimeException ex) {

            cancelAcquire(node);

            throw ex;

        }

        // Arrive here only if interrupted

        cancelAcquire(node);

        throw new InterruptedException();

    }

Condition

Condition 實現了與java內容monitor 類似的功能。提供 await,signal,signalall 等操作,與object . wait等一系列操作對應。不同的是一個condition 可以有多個條件佇列。這點內建monitor 是做不到的。另外還支援 超時、取消等更加靈活的方式。

和內建的Monitor一樣,呼叫 Condition。aWait 等操作,需要獲得鎖,也就是 Condition 是和一個鎖繫結在一起的。它的實現 是在AQS中,基本思想如下:一下內容抄自部落格:http://www.nbtarena.com/Html/soft/201308/2429.html

public final void await() throws InterruptedException {

    // 1.如果當前執行緒被中斷,則丟擲中斷異常

    if (Thread.interrupted())

        throw newInterruptedException();

    // 2.將節點加入到Condition佇列中去,這裡如果lastWaiter是cancel狀態,那麼會把它踢出Condition佇列。

    Node node = addConditionWaiter();

    // 3.呼叫tryRelease,釋放當前執行緒的鎖

    long savedState =fullyRelease(node);

    int interruptMode = 0;

    // 4.為什麼會有在AQS的等待佇列的判斷?

    // 解答:signal*作會將Node從Condition佇列中拿出並且放入到等待佇列中去,在不在AQS等待佇列就看signal是否執行了

    // 如果不在AQS等待佇列中,就park當前執行緒,如果在,就退出迴圈,這個時候如果被中斷,那麼就退出迴圈

    while (!isOnSyncQueue(node)) {

        LockSupport.park(this);

        if ((interruptMode =checkInterruptWhileWaiting(node)) != 0)

            break;

    }

    // 5.這個時候執行緒已經被signal()或者signalAll()*作給喚醒了,退出了4中的while迴圈

    // 自旋等待嘗試再次獲取鎖,呼叫acquireQueued方法

    if (acquireQueued(node,savedState) && interruptMode != THROW_IE)

        interruptMode = REINTERRUPT;

    if (node.nextWaiter != null)

        unlinkCancelledWaiters();

    if (interruptMode != 0)

        reportInterruptAfterWait(interruptMode);

}

整個await的過程如下:

  1.將當前執行緒加入Condition鎖佇列。特別說明的是,這裡不同於AQS的佇列,這裡進入的是Condition的FIFO佇列。進行2。

  2.釋放鎖。這裡可以看到將鎖釋放了,否則別的執行緒就無法拿到鎖而發生死鎖。進行3。

  3.自旋(while)掛起,直到被喚醒或者超時或者CACELLED等。進行4。

  4.獲取鎖(acquireQueued)。並將自己從Condition的FIFO佇列中釋放,表明自己不再需要鎖(我已經拿到鎖了)。

可以看到,這個await的*作過程和Object.wait()方法是一樣,只不過await()採用了Condition佇列的方式實現了Object.wait()的功能。

signal和signalAll方法

await*()清楚了,現在再來看signal/signalAll就容易多了。按照signal/signalAll的需求,就是要將Condition.await*()中FIFO佇列中第一個Node喚醒(或者全部Node)喚醒。儘管所有Node可能都被喚醒,但是要知道的是仍然只有一個執行緒能夠拿到鎖,其它沒有拿到鎖的執行緒仍然需要自旋等待,就上上面提到的第4步(acquireQueued)。

 Java Code

public final void signal() {

    if (!isHeldExclusively())

        throw newIllegalMonitorStateException();

    Node first = firstWaiter;

    if (first != null)

        doSignal(first);

}

這裡先判斷當前執行緒是否持有鎖,如果沒有持有,則丟擲異常,然後判斷整個condition佇列是否為空,不為空則呼叫doSignal方法來喚醒執行緒,看看doSignal方法都幹了一些什麼:

 Java Code

private void doSignal(Node first) {

    do {

        if ( (firstWaiter =first.nextWaiter) == null)

            lastWaiter = null;

        first.nextWaiter = null;

    } while(!transferForSignal(first) &&

             (first = firstWaiter)!= null);

}

上面的代*很容易看出來,signal就是喚醒Condition佇列中的第一個非CANCELLED節點執行緒,而signalAll就是喚醒所有非CANCELLED節點執行緒。當然了遇到CANCELLED執行緒就需要將其從FIFO佇列中剔除。

 Java Code

final boolean transferForSignal(Node node) {

    /*

     * 設定node的waitStatus:Condition->0

     */

    if(!compareAndSetWaitStatus(node, Node.CONDITION, 0))

        return false;

    /*

     * 加入到AQS的等待佇列,讓節點繼續獲取鎖

     * 設定前置節點狀態為SIGNAL

     */

    Node p = enq(node);

    int c = p.waitStatus;

    if (c > 0 ||!compareAndSetWaitStatus(p, c, Node.SIGNAL))

       LockSupport.unpark(node.thread);

    return true;

}

上面就是喚醒一個await*()執行緒的過程,根據前面的介紹,如果要unpark執行緒,並使執行緒拿到鎖,那麼就需要執行緒節點進入AQS的佇列。所以可以看到在LockSupport.unpark之前呼叫了enq(node)*作,將當前節點加入到AQS佇列。

signalAll和signal方法類似,主要的不同在於它不是呼叫doSignal方法,而是呼叫doSignalAll方法:

 Java Code

private void doSignalAll(Node first) {

    lastWaiter = firstWaite