1. 程式人生 > >【並發編程】CAS與synchronized

【並發編程】CAS與synchronized

ews 算法 正式 ima PE 理解 cnblogs inf sch

線程安全

眾所周知,Java是多線程的。但是,Java對多線程的支持其實是一把雙刃劍。一旦涉及到多個線程操作共享資源的情況時,處理不好就可能產生線程安全問題。線程安全性可能是非常復雜的,在沒有充足的同步的情況下,多個線程中的操作執行順序是不可預測的。

Java裏面進行多線程通信的主要方式就是共享內存的方式,共享內存主要的關註點有兩個:可見性和有序性。加上復合操作的原子性,我們可以認為Java的線程安全性問題主要關註點有3個:可見性、有序性和原子性。

Java內存模型(JMM)解決了可見性和有序性的問題,而鎖解決了原子性的問題。

樂觀鎖與悲觀鎖

悲觀鎖:總是假設最壞的情況,每次去拿數據的時候都認為別人會修改,所以每次在拿數據的時候都會上鎖,這樣別人想拿這個數據就會阻塞直到它拿到鎖。傳統的關系型數據庫裏邊就用到了很多這種鎖機制,比如行鎖,表鎖等,讀鎖,寫鎖等,都是在做操作之前先上鎖。再比如Java裏面的同步原語synchronized關鍵字的實現也是悲觀鎖。

樂觀鎖:顧名思義,就是很樂觀,每次去拿數據的時候都認為別人不會修改,所以不會上鎖,但是在更新的時候會判斷一下在此期間別人有沒有去更新這個數據,可以使用版本號等機制。樂觀鎖適用於多讀的應用類型,這樣可以提高吞吐量,像數據庫提供的類似於write_condition機制,其實都是提供的樂觀鎖。在Java中java.util.concurrent.atomic包下面的原子變量類就是使用了樂觀鎖的一種實現方式CAS實現的。

悲觀鎖的一種實現synchronized

Java在JDK1.5之前都是靠synchronized關鍵字保證同步的,這種通過使用一致的鎖定協議來協調對共享狀態的訪問,可以確保無論哪個線程持有共享變量的鎖,都采用獨占的方式來訪問這些變量。獨占鎖其實就是一種悲觀鎖,所以可以說synchronized是悲觀鎖。

悲觀鎖機制存在以下問題:

1、在多線程競爭下,加鎖、釋放鎖會導致比較多的上下文切換和調度延時,引起性能問題。

2、一個線程持有鎖會導致其它所有需要此鎖的線程掛起。

3、如果一個優先級高的線程等待一個優先級低的線程釋放鎖會導致優先級倒置,引起性能風險。

而另一個更加有效的鎖就是樂觀鎖。所謂樂觀鎖就是,每次不加鎖而是假設沒有沖突而去完成某項操作,如果因為沖突失敗就重試,直到成功為止。

與鎖相比,volatile變量是一個更輕量級的同步機制,因為在使用這些變量時不會發生上下文切換和線程調度等操作,但是volatile不能解決原子性問題,因此當一個變量依賴舊值時就不能使用volatile變量。因此對於同步最終還是要回到鎖機制上來。

樂觀鎖的一種實現CAS

樂觀鎖( Optimistic Locking)在上文已經說過了,其實就是一種思想。相對悲觀鎖而言,樂觀鎖假設認為數據一般情況下不會產生並發沖突,所以在數據進行提交更新的時候,才會正式對數據是否產生並發沖突進行檢測,如果發現並發沖突了,則讓返回用戶錯誤的信息,讓用戶決定如何去做。

上面提到的樂觀鎖的概念中其實已經闡述了它的具體實現細節:主要就是兩個步驟:沖突檢測數據更新。其實現方式有一種比較典型的就是 Compare and Swap ( CAS )。

CAS(Compare And Swap),即比較並交換。是解決多線程並行情況下使用鎖造成性能損耗的一種機制,CAS操作包含三個操作數——內存位置(V)、預期原值(A)和新值(B)。如果內存位置的值與預期原值相匹配,那麽處理器會自動將該位置值更新為新值。否則,處理器不做任何操作。無論哪種情況,它都會在CAS指令之前返回該位置的值。CAS有效地說明了“我認為位置V應該包含值A;如果包含該值,則將B放到這個位置;否則,不要更改該位置,只告訴我這個位置現在的值即可”。這其實和樂觀鎖的沖突檢查+數據更新的原理是一樣的。

Java對CAS的支持

在JDK1.5 中新增 java.util.concurrent (J.U.C)就是建立在CAS之上的。相對於對於 synchronized 這種阻塞算法,CAS是非阻塞算法的一種常見實現。所以J.U.C在性能上有了很大的提升。

非阻塞算法 (nonblocking algorithms)
一個線程的失敗或者掛起不應該影響其他線程的失敗或掛起的算法。

以 java.util.concurrent 中的 AtomicInteger 為例,看一下在不使用鎖的情況下是如何保證線程安全的。主要理解 getAndIncrement 方法,該方法的作用相當於 ++i 操作。

public class AtomicInteger extends Number implements java.io.Serializable {  
    private volatile int value; 

    public final int get() {  
        return value;  
    }  

    public final int getAndIncrement() {  
        for (;;) {  
            int current = get();  
            int next = current + 1;  
            if (compareAndSet(current, next))  
                return current;  
        }  
    }  

    public final boolean compareAndSet(int expect, int update) {  
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);  
    }  
}

在沒有鎖的機制下,字段value要借助volatile原語,保證線程間的數據是可見性。這樣在獲取變量的值的時候才能直接讀取。然後來看看 ++i 是怎麽做到的。

getAndIncrement 采用了CAS操作,每次從內存中讀取數據然後將此數據和 +1 後的結果進行CAS操作,如果成功就返回結果,否則重試直到成功為止。

而 compareAndSet 利用JNI(Java Native Interface)來完成CPU指令的操作:

public final boolean compareAndSet(int expect, int update) {   
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
} 

其中unsafe.compareAndSwapInt(this, valueOffset, expect, update);類似如下邏輯:

if (this == expect) {
     this = update
     return true;
 } else {
     return false;
 }

那麽比較this == expect,替換this = update,compareAndSwapInt實現這兩個步驟的原子性呢? 參考CAS的原理

CAS原理:

CAS通過調用JNI的代碼實現的。而compareAndSwapInt就是借助C來調用CPU底層指令實現的。

下面從分析比較常用的CPU(intel x86)來解釋CAS的實現原理。

下面是sun.misc.Unsafe類的compareAndSwapInt()方法的源代碼:

public final native boolean compareAndSwapInt(Object o, long offset,
                                              int expected,
                                              int x);

可以看到這是個本地方法調用。這個本地方法在JDK中依次調用的C++代碼為:

#define LOCK_IF_MP(mp) __asm cmp mp, 0                         __asm je L0                             __asm _emit 0xF0                        __asm L0:

inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value) {
  // alternative for InterlockedCompareExchange
  int mp = os::is_MP();
  __asm {
    mov edx, dest
    mov ecx, exchange_value
    mov eax, compare_value
    LOCK_IF_MP(mp)
    cmpxchg dword ptr [edx], ecx
  }
}

如上面源代碼所示,程序會根據當前處理器的類型來決定是否為cmpxchg指令添加lock前綴。如果程序是在多處理器上運行,就為cmpxchg指令加上lock前綴(lock cmpxchg)。反之,如果程序是在單處理器上運行,就省略lock前綴(單處理器自身會維護單處理器內的順序一致性,不需要lock前綴提供的內存屏障效果)。

CAS缺點

1、ABA問題

比如說一個線程one從內存位置V中取出A,這時候另一個線程two也從內存中取出A,並且two進行了一些操作變成了B,然後two又將V位置的數據變成A,這時候線程one進行CAS操作發現內存中仍然是A,然後one操作成功。盡管線程one的CAS操作成功,但可能存在潛藏的問題。

比如說一個線程one從內存位置V中取出A,這時候另一個線程two也從內存中取出A,並且two進行了一些操作變成了B,然後two又將V位置的數據變成A,這時候線程one進行CAS操作發現內存中仍然是A,然後one操作成功。盡管線程one的CAS操作成功,但可能存在潛藏的問題。如下所示:

技術分享圖片

 

現有一個用單向鏈表實現的堆棧,棧頂為A,這時線程T1已經知道A.next為B,然後希望用CAS將棧頂替換為B:

head.compareAndSet(A,B);

在T1執行上面這條指令之前,線程T2介入,將A、B出棧,再pushD、C、A,此時堆棧結構如下圖,而對象B此時處於遊離狀態:

技術分享圖片

此時輪到線程T1執行CAS操作,檢測發現棧頂仍為A,所以CAS成功,棧頂變為B,但實際上B.next為null,所以此時的情況變為:

技術分享圖片

其中堆棧中只有B一個元素,C和D組成的鏈表不再存在於堆棧中,平白無故就把C、D丟掉了。

以上就是由於ABA問題帶來的隱患,各種樂觀鎖的實現中通常都會用版本戳version來對記錄或對象標記,避免並發操作帶來的問題。

因此AtomicStampedReference/AtomicMarkableReference就很有用了。可以用來避免ABA問題。

AtomicMarkableReference
類描述的一個的對,可以原子的修改Object或者Boolean的值,這種數據結構在一些緩存或者狀態描述中比較有用。這種結構在單個或者同時修改Object/Boolean的時候能夠有效的提高吞吐量。

AtomicStampedReference
類維護帶有整數“標誌”的對象引用,可以用原子方式對其進行更新。對比AtomicMarkableReference 類的,AtomicStampedReference維護的是一種類似的數據結構,其實就是對對象(引用)的一個並發計數(標記版本戳stamp)。但是與AtomicInteger 不同的是,此數據結構可以攜帶一個對象引用(Object),並且能夠對此對象和計數同時進行原子操作。

以AtomicStampedReference為例。從Java1.5開始JDK的atomic包裏提供了一個類AtomicStampedReference,它通過包裝[E,Integer]的元組來對對象標記版本戳stamp,從而避免ABA問題。這個類的compareAndSet方法是首先檢查當前引用是否等於預期引用,並且當前標誌是否等於預期標誌,如果全部相等,則以原子方式將該引用和該標誌的值設置為給定的更新值。

public boolean compareAndSet(
               V      expectedReference,//預期引用

               V      newReference,//更新後的引用

              int    expectedStamp, //預期標誌

              int    newStamp //更新後的標誌
)

例如下面的代碼分別用AtomicInteger和AtomicStampedReference來對初始值為100的原子整型變量進行更新,AtomicInteger會成功執行CAS操作,而加上版本戳的AtomicStampedReference對於ABA問題會執行CAS失敗:

package concur.lock;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicStampedReference;

public class ABA {
    
    private static AtomicInteger atomicInt = new AtomicInteger(100);
    private static AtomicStampedReference<Integer> atomicStampedRef = 
            new AtomicStampedReference<Integer>(100, 0);
    
    public static void main(String[] args) throws InterruptedException {
        Thread intT1 = new Thread(new Runnable() {
            @Override
            public void run() {
                atomicInt.compareAndSet(100, 101);
                atomicInt.compareAndSet(101, 100);
            }
        });
        
        Thread intT2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                boolean c3 = atomicInt.compareAndSet(100, 101);
                System.out.println(c3);        //true
            }
        });
        
        intT1.start();
        intT2.start();
        intT1.join();
        intT2.join();
        
        Thread refT1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                atomicStampedRef.compareAndSet(100, 101, 
                        atomicStampedRef.getStamp(), atomicStampedRef.getStamp()+1);
                atomicStampedRef.compareAndSet(101, 100, 
                        atomicStampedRef.getStamp(), atomicStampedRef.getStamp()+1);
            }
        });
        
        Thread refT2 = new Thread(new Runnable() {
            @Override
            public void run() {
                int stamp = atomicStampedRef.getStamp();
                System.out.println("before sleep : stamp = " + stamp);    // stamp = 0
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("after sleep : stamp = " + atomicStampedRef.getStamp());//stamp = 1
                boolean c3 = atomicStampedRef.compareAndSet(100, 101, stamp, stamp+1);
                System.out.println(c3);        //false
            }
        });
        
        refT1.start();
        refT2.start();
    }

}

2、循環時間長開銷大:

自旋CAS(不成功,就一直循環執行,直到成功)如果長時間不成功,會給CPU帶來非常大的執行開銷。如果JVM能支持處理器提供的pause指令那麽效率會有一定的提升,pause指令有兩個作用,第一它可以延遲流水線執行指令(de-pipeline),使CPU不會消耗過多的執行資源,延遲的時間取決於具體實現的版本,在一些處理器上延遲時間是零。第二它可以避免在退出循環的時候因內存順序沖突(memory order violation)而引起CPU流水線被清空(CPU pipeline flush),從而提高CPU的執行效率。

3、只能保證一個共享變量的原子操作:

當對一個共享變量執行操作時,我們可以使用循環CAS的方式來保證原子操作,但是對多個共享變量操作時,循環CAS就無法保證操作的原子性,這個時候就可以用鎖,或者有一個取巧的辦法,就是把多個共享變量合並成一個共享變量來操作。比如有兩個共享變量i=2,j=a,合並一下ij=2a,然後用CAS來操作ij。從Java1.5開始JDK提供了AtomicReference類來保證引用對象之間的原子性,你可以把多個變量放在一個對象裏來進行CAS操作。

CAS與Synchronized的使用情景:   

1、對於資源競爭較少(線程沖突較輕)的情況,使用synchronized同步鎖進行線程阻塞和喚醒切換以及用戶態內核態間的切換操作額外浪費消耗cpu資源;而CAS基於硬件實現,不需要進入內核,不需要切換線程,操作自旋幾率較少,因此可以獲得更高的性能。

2、對於資源競爭嚴重(線程沖突嚴重)的情況,CAS自旋的概率會比較大,從而浪費更多的CPU資源,效率低於synchronized。

補充:synchronized在jdk1.6之後,已經改進優化。synchronized的底層實現主要依靠Lock-Free的隊列,基本思路是自旋後阻塞,競爭切換後繼續競爭鎖,稍微犧牲了公平性,但獲得了高吞吐量。在線程沖突較少的情況下,可以獲得和CAS類似的性能;而線程沖突嚴重的情況下,性能遠高於CAS。

J.U.C的實現基礎

由於java的CAS同時具有 volatile 讀和volatile寫的內存語義,因此Java線程之間的通信現在有了下面四種方式:

  1. A線程寫volatile變量,隨後B線程讀這個volatile變量。
  2. A線程寫volatile變量,隨後B線程用CAS更新這個volatile變量。
  3. A線程用CAS更新一個volatile變量,隨後B線程用CAS更新這個volatile變量。
  4. A線程用CAS更新一個volatile變量,隨後B線程讀這個volatile變量。

Java的CAS會使用現代處理器上提供的高效機器級別原子指令,這些原子指令以原子方式對內存執行讀-改-寫操作,這是在多處理器中實現同步的關鍵(從本質上來說,能夠支持原子性讀-改-寫指令的計算機器,是順序計算圖靈機的異步等價機器,因此任何現代的多處理器都會去支持某種能對內存執行原子性讀-改-寫操作的原子指令)。同時,volatile變量的讀/寫和CAS可以實現線程之間的通信。把這些特性整合在一起,就形成了整個concurrent包得以實現的基石。如果我們仔細分析concurrent包的源代碼實現,會發現一個通用化的實現模式:

  1. 首先,聲明共享變量為volatile;  
  2. 然後,使用CAS的原子條件更新來實現線程之間的同步;
  3. 同時,配合以volatile的讀/寫和CAS所具有的volatile讀和寫的內存語義來實現線程之間的通信。

AQS、非阻塞數據結構原子變量類(java.util.concurrent.atomic包中的類),concurrent包中的基礎類都是使用這種模式來實現的。而concurrent包中的高層類又是依賴於這些基礎類來實現的。從整體來看,concurrent包的實現示意圖如下:
技術分享圖片

JVM中的CAS(堆中對象的分配) 

Java調用new object()會創建一個對象,這個對象會被分配到JVM的堆中。那麽這個對象到底是怎麽在堆中保存的呢?

首先,new object()執行的時候,這個對象需要多大的空間,其實是已經確定的,因為java中的各種數據類型,占用多大的空間都是固定的(對其原理不清楚的請自行Google)。那麽接下來的工作就是在堆中找出那麽一塊空間用於存放這個對象。

在單線程的情況下,一般有兩種分配策略:

指針碰撞:這種一般適用於內存是絕對規整的(內存是否規整取決於內存回收策略),分配空間的工作只是將指針像空閑內存一側移動對象大小的距離即可。

空閑列表:這種適用於內存非規整的情況,這種情況下JVM會維護一個內存列表,記錄哪些內存區域是空閑的,大小是多少。給對象分配空間的時候去空閑列表裏查詢到合適的區域然後進行分配即可。

但是JVM不可能一直在單線程狀態下運行,那樣效率太差了。由於再給一個對象分配內存的時候不是原子性的操作,至少需要以下幾步:查找空閑列表、分配內存、修改空閑列表等等,這是不安全的。解決並發時的安全問題也有兩種策略:

CAS:實際上虛擬機采用CAS配合上失敗重試的方式保證更新操作的原子性,原理和上面講的一樣。

TLAB:如果使用CAS其實對性能還是會有影響的,所以JVM又提出了一種更高級的優化策略:每個線程在Java堆中預先分配一小塊內存,稱為本地線程分配緩沖區(TLAB),線程內部需要分配內存時直接在TLAB上分配就行,避免了線程沖突。只有當緩沖區的內存用光需要重新分配內存的時候才會進行CAS操作分配更大的內存空間。

虛擬機是否使用TLAB,可以通過-XX:+/-UseTLAB參數來進行配置(jdk5及以後的版本默認是啟用TLAB的)。

參考資料:
樂觀鎖的一種實現方式——CAS
Java並發問題--樂觀鎖與悲觀鎖以及樂觀鎖的一種實現方式-CAS
CAS原理 Java SE1.6中的Synchronized
JAVA並發編程: CAS和AQS
Java CAS 和ABA問題
CAS原理分析

【並發編程】CAS與synchronized