1. 程式人生 > >並發機制的底層實現

並發機制的底層實現

同步 分享 {} 情況下 link external nor 問題 操作

concurrent 包的實現

由於 Java 的 CAS 同時具有 volatile 讀和 volatile 寫的內存語義,因此 Java 線程之間的通信現在有了下面四種方式:

  1. A 線程寫 volatile 變量,隨後 B 線程讀這個 volatile 變量。
  2. A 線程寫 volatile 變量,隨後 B 線程用 CAS 更新這個 volatile 變量。
  3. A 線程用 CAS 更新一個 volatile 變量,隨後 B 線程用 CAS 更新這個 volatile 變量。
  4. A 線程用 CAS 更新一個 volatile 變量,隨後 B 線程讀這個 volatile 變量。

同時,volatile 變量的讀/寫和 CAS 可以實現線程之間的通信。把這些特性整合在一起,就形成了整個 concurrent 包得以實現的基石。如果我們仔細分析 concurrent 包的源代碼實現,會發現一個通用化的實現模式:

首先,聲明共享變量為 volatile;

然後,使用 CAS 的原子條件更新來實現線程之間的同步;

同時,配合以 volatile 的讀/寫和 CAS 所具有的 volatile 讀和寫的內存語義來實現線程之間的通信。

AQS,非阻塞數據結構和原子變量類(Java.util.concurrent.atomic 包中的類),這些 concurrent 包中的基礎類都是使用這種模式來實現的,而 concurrent 包中的高層類又是依賴於這些基礎類來實現的。從整體來看,concurrent 包的實現示意圖如下:

技術分享圖片

synchronized

synchronized 的要點

關鍵字 synchronized 可以保證在同一個時刻,只有一個線程可以執行某個方法或者某個代碼塊。

synchronized 有 3 種應用方式:

  1. 同步實例方法
  2. 同步靜態方法
  3. 同步代碼塊

同步實例方法

? 錯誤示例 - 未同步的示例

@NotThreadSafe
public class SynchronizedDemo01 implements Runnable {
    static int i = 0;

    public void increase() {
        i++;
    }

    @Override
    public void run() {
        for (int j = 0; j < 100000; j++) {
            increase();
        }
    }

    
public static void main(String[] args) throws InterruptedException { SynchronizedDemo01 instance = new SynchronizedDemo01(); Thread t1 = new Thread(instance); Thread t2 = new Thread(instance); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(i); } } // 輸出結果: 小於 200000 的隨機數字

Java 實例方法同步是同步在擁有該方法的對象上。這樣,每個實例其方法同步都同步在不同的對象上,即該方法所屬的實例。只有一個線程能夠在實例方法同步塊中運行。如果有多個實例存在,那麽一個線程一次可以在一個實例同步塊中執行操作。一個實例一個線程。

@ThreadSafe
public class SynchronizedDemo02 implements Runnable {
    static int i = 0;

    public synchronized void increase() {
        i++;
    }

    @Override
    public void run() {
        for (int j = 0; j < 100000; j++) {
            increase();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        SynchronizedDemo02 instance = new SynchronizedDemo02();
        Thread t1 = new Thread(instance);
        Thread t2 = new Thread(instance);
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(i);
    }
}
// 輸出結果:
// 2000000

同步靜態方法

靜態方法的同步是指同步在該方法所在的類對象上。因為在 JVM 中一個類只能對應一個類對象,所以同時只允許一個線程執行同一個類中的靜態同步方法。

對於不同類中的靜態同步方法,一個線程可以執行每個類中的靜態同步方法而無需等待。不管類中的那個靜態同步方法被調用,一個類只能由一個線程同時執行。

@ThreadSafe
public class SynchronizedDemo03 implements Runnable {
    static int i = 0;

    public static synchronized void increase() {
        i++;
    }

    @Override
    public void run() {
        for (int j = 0; j < 100000; j++) {
            increase();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(new SynchronizedDemo03());
        Thread t2 = new Thread(new SynchronizedDemo03());
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(i);
    }
}
// 輸出結果:
// 200000

同步代碼塊

有時你不需要同步整個方法,而是同步方法中的一部分。Java 可以對方法的一部分進行同步。

註意 Java 同步塊構造器用括號將對象括起來。在上例中,使用了 this,即為調用 add 方法的實例本身。在同步構造器中用括號括起來的對象叫做監視器對象。上述代碼使用監視器對象同步,同步實例方法使用調用方法本身的實例作為監視器對象。

一次只有一個線程能夠在同步於同一個監視器對象的 Java 方法內執行。

@ThreadSafe
public class SynchronizedDemo04 implements Runnable {
    static int i = 0;
    static SynchronizedDemo04 instance = new SynchronizedDemo04();

    @Override
    public void run() {
        synchronized (instance) {
            for (int j = 0; j < 100000; j++) {
                i++;
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(instance);
        Thread t2 = new Thread(instance);
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(i);
    }
}
// 輸出結果:
// 200000

synchronized 的原理

synchronized 實現同步的基礎是:Java 中的每一個對象都可以作為鎖。

  • 對於普通同步方法,鎖是當前實例對象。
  • 對於靜態同步方法,鎖是當前類的 Class 對象。
  • 對於同步方法塊,鎖是 Synchonized 括號裏配置的對象。

?? 參考閱讀:Java 並發編程:synchronized ?? 參考閱讀:深入理解 Java 並發之 synchronized 實現原理

volatile

volatile 的要點

volatile 是輕量級的 synchronized,它在多處理器開發中保證了共享變量的“可見性”。

可見性的意思是當一個線程修改一個共享變量時,另外一個線程能讀到這個修改的值。

一旦一個共享變量(類的成員變量、類的靜態成員變量)被 volatile 修飾之後,那麽就具備了兩層語義:

  1. 保證了不同線程對這個變量進行操作時的可見性,即一個線程修改了某個變量的值,這新值對其他線程來說是立即可見的。
  2. 禁止進行指令重排序。

如果一個字段被聲明成 volatile,Java 線程內存模型確保所有線程看到這個變量的值是一致的。

volatile 的原理

觀察加入 volatile 關鍵字和沒有加入 volatile 關鍵字時所生成的匯編代碼發現,加入 volatile 關鍵字時,會多出一個 lock 前綴指令。

lock 前綴指令實際上相當於一個內存屏障(也成內存柵欄),內存屏障會提供 3 個功能:

  • 它確保指令重排序時不會把其後面的指令排到內存屏障之前的位置,也不會把前面的指令排到內存屏障的後面;即在執行到內存屏障這句指令時,在它前面的操作已經全部完成;
  • 它會強制將對緩存的修改操作立即寫入主存;
  • 如果是寫操作,它會導致其他 CPU 中對應的緩存行無效。

volatile 的應用場景

如果 volatile 變量修飾符使用恰當的話,它比 synchronized 的使用和執行成本更低,因為它不會引起線程上下文的切換和調度。

但是,volatile 無法替代 synchronized ,因為 volatile 無法保證操作的原子性。通常來說,使用 volatile 必須具備以下 2 個條件:

  1. 對變量的寫操作不依賴於當前值
  2. 該變量沒有包含在具有其他變量的不變式中

應用場景:

狀態標記量

volatile boolean flag = false;

while(!flag) {
    doSomething();
}

public void setFlag() {
    flag = true;
}
double check

class Singleton {
    private volatile static Singleton instance = null;

    private Singleton() {}

    public static Singleton getInstance() {
        if(instance==null) {
            synchronized (Singleton.class) {
                if(instance==null)
                    instance = new Singleton();
            }
        }
        return instance;
    }
}

?? 參考閱讀:Java 並發編程:volatile 關鍵字解析

CAS

簡介

CAS(Compare and Swap),字面意思為比較並交換。CAS 有 3 個操作數,內存值 V,舊的預期值 A,要修改的新值 B。當且僅當預期值 A 和內存值 V 相同時,將內存值 V 修改為 B,否則什麽都不做。

操作

我們常常做這樣的操作

if(a==b) {
    a++;
}

試想一下如果在做 a++之前 a 的值被改變了怎麽辦?a++還執行嗎?出現該問題的原因是在多線程環境下,a 的值處於一種不定的狀態。采用鎖可以解決此類問題,但 CAS 也可以解決,而且可以不加鎖。

int expect = a;
if(a.compareAndSet(expect,a+1)) {
    doSomeThing1();
} else {
    doSomeThing2();
}

這樣如果 a 的值被改變了 a++就不會被執行。按照上面的寫法,a!=expect 之後,a++就不會被執行,如果我們還是想執行 a++操作怎麽辦,沒關系,可以采用 while 循環

while(true) {
    int expect = a;
    if (a.compareAndSet(expect, a + 1)) {
        doSomeThing1();
        return;
    } else {
        doSomeThing2();
    }
}

采用上面的寫法,在沒有鎖的情況下實現了 a++操作,這實際上是一種非阻塞算法。

應用

非阻塞算法 (nonblocking algorithms)

一個線程的失敗或者掛起不應該影響其他線程的失敗或掛起的算法。

現代的 CPU 提供了特殊的指令,可以自動更新共享數據,而且能夠檢測到其他線程的幹擾,而 compareAndSet() 就用這些代替了鎖定。

拿出 AtomicInteger 來研究在沒有鎖的情況下是如何做到數據正確性的。

private volatile int value;

首先毫無疑問,在沒有鎖的機制下可能需要借助 volatile 原語,保證線程間的數據是可見的(共享的)。

這樣才獲取變量的值的時候才能直接讀取。

public final int get() {
    return value;
}

然後來看看++i 是怎麽做到的。

public final int incrementAndGet() {
    for (;;) {
        int current = get();
        int next = current + 1;
            if (compareAndSet(current, next))
                return next;
    }
}

在這裏采用了 CAS 操作,每次從內存中讀取數據然後將此數據和+1 後的結果進行 CAS 操作,如果成功就返回結果,否則重試直到成功為止。

而 compareAndSet 利用 JNI 來完成 CPU 指令的操作。

public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

整體的過程就是這樣子的,利用 CPU 的 CAS 指令,同時借助 JNI 來完成 Java 的非阻塞算法。其它原子操作都是利用類似的特性完成的。

其中 unsafe.compareAndSwapInt(this, valueOffset, expect, update)類似:

if (this == expect) {
    this = update
    return true;
} else {
    return false;
}

那麽問題就來了,成功過程中需要 2 個步驟:比較 this == expect,替換 this = update,compareAndSwapInt 如何這兩個步驟的原子性呢? 參考 CAS 的原理

原理

Java 代碼如何確保處理器執行 CAS 操作?

CAS 通過調用 JNI(JNI:Java Native Interface 為 Java 本地調用,允許 Java 調用其他語言。)的代碼實現的。JVM 將 CAS 操作編譯為底層提供的最有效方法。在支持 CAS 的處理器上,JVM 將它們編譯為相應的機器指令;在不支持 CAS 的處理器上,JVM 將使用自旋鎖。

特點

優點

一般情況下,比鎖性能更高。因為 CAS 是一種非阻塞算法,所以其避免了線程被阻塞時的等待時間。

缺點

ABA 問題

因為 CAS 需要在操作值的時候檢查下值有沒有發生變化,如果沒有發生變化則更新,但是如果一個值原來是 A,變成了 B,又變成了 A,那麽使用 CAS 進行檢查時會發現它的值沒有發生變化,但是實際上卻變化了。ABA 問題的解決思路就是使用版本號。在變量前面追加上版本號,每次變量更新的時候把版本號加一,那麽 A-B-A 就會變成 1A-2B-3A。

從 Java1.5 開始 JDK 的 atomic 包裏提供了一個類 AtomicStampedReference 來解決 ABA 問題。這個類的 compareAndSet 方法作用是首先檢查當前引用是否等於預期引用,並且當前標誌是否等於預期標誌,如果全部相等,則以原子方式將該引用和該標誌的值設置為給定的更新值。

循環時間長開銷大

自旋 CAS 如果長時間不成功,會給 CPU 帶來非常大的執行開銷。如果 JVM 能支持處理器提供的 pause 指令那麽效率會有一定的提升,pause 指令有兩個作用,第一它可以延遲流水線執行指令(de-pipeline),使 CPU 不會消耗過多的執行資源,延遲的時間取決於具體實現的版本,在一些處理器上延遲時間是零。第二它可以避免在退出循環的時候因內存順序沖突(memory order violation)而引起 CPU 流水線被清空(CPU pipeline flush),從而提高 CPU 的執行效率。

比較花費 CPU 資源,即使沒有任何用也會做一些無用功。

只能保證一個共享變量的原子操作

當對一個共享變量執行操作時,我們可以使用循環 CAS 的方式來保證原子操作,但是對多個共享變量操作時,循環 CAS 就無法保證操作的原子性,這個時候就可以用鎖,或者有一個取巧的辦法,就是把多個共享變量合並成一個共享變量來操作。比如有兩個共享變量 i = 2,j=a,合並一下 ij=2a,然後用 CAS 來操作 ij。從 Java1.5 開始 JDK 提供了 AtomicReference 類來保證引用對象之間的原子性,你可以把多個變量放在一個對象裏來進行 CAS 操作。

總結

可以用 CAS 在無鎖的情況下實現原子操作,但要明確應用場合,非常簡單的操作且又不想引入鎖可以考慮使用 CAS 操作,當想要非阻塞地完成某一操作也可以考慮 CAS。不推薦在復雜操作中引入 CAS,會使程序可讀性變差,且難以測試,同時會出現 ABA 問題。

免費Java資料需要自己領取,涵蓋了Java、Redis、MongoDB、MySQL、Zookeeper、Spring Cloud、Dubbo/Kafka、Hadoop、Hbase、Flink等高並發分布式、大數據、機器學習等技術。
傳送門:

並發機制的底層實現