並發機制的底層實現
concurrent 包的實現
由於 Java 的 CAS 同時具有 volatile 讀和 volatile 寫的內存語義,因此 Java 線程之間的通信現在有了下面四種方式:
- A 線程寫 volatile 變量,隨後 B 線程讀這個 volatile 變量。
- A 線程寫 volatile 變量,隨後 B 線程用 CAS 更新這個 volatile 變量。
- A 線程用 CAS 更新一個 volatile 變量,隨後 B 線程用 CAS 更新這個 volatile 變量。
- A 線程用 CAS 更新一個 volatile 變量,隨後 B 線程讀這個 volatile 變量。
同時,volatile 變量的讀/寫和 CAS 可以實現線程之間的通信。把這些特性整合在一起,就形成了整個 concurrent 包得以實現的基石。如果我們仔細分析 concurrent 包的源代碼實現,會發現一個通用化的實現模式:
首先,聲明共享變量為 volatile;
然後,使用 CAS 的原子條件更新來實現線程之間的同步;
同時,配合以 volatile 的讀/寫和 CAS 所具有的 volatile 讀和寫的內存語義來實現線程之間的通信。
AQS,非阻塞數據結構和原子變量類(Java.util.concurrent.atomic 包中的類),這些 concurrent 包中的基礎類都是使用這種模式來實現的,而 concurrent 包中的高層類又是依賴於這些基礎類來實現的。從整體來看,concurrent 包的實現示意圖如下:
synchronized
synchronized 的要點
關鍵字 synchronized 可以保證在同一個時刻,只有一個線程可以執行某個方法或者某個代碼塊。
synchronized 有 3 種應用方式:
- 同步實例方法
- 同步靜態方法
- 同步代碼塊
同步實例方法
? 錯誤示例 - 未同步的示例
@NotThreadSafe public class SynchronizedDemo01 implements Runnable { static int i = 0; public void increase() { i++; } @Override public void run() { for (int j = 0; j < 100000; j++) { increase(); } }public static void main(String[] args) throws InterruptedException { SynchronizedDemo01 instance = new SynchronizedDemo01(); Thread t1 = new Thread(instance); Thread t2 = new Thread(instance); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(i); } } // 輸出結果: 小於 200000 的隨機數字
Java 實例方法同步是同步在擁有該方法的對象上。這樣,每個實例其方法同步都同步在不同的對象上,即該方法所屬的實例。只有一個線程能夠在實例方法同步塊中運行。如果有多個實例存在,那麽一個線程一次可以在一個實例同步塊中執行操作。一個實例一個線程。
@ThreadSafe public class SynchronizedDemo02 implements Runnable { static int i = 0; public synchronized void increase() { i++; } @Override public void run() { for (int j = 0; j < 100000; j++) { increase(); } } public static void main(String[] args) throws InterruptedException { SynchronizedDemo02 instance = new SynchronizedDemo02(); Thread t1 = new Thread(instance); Thread t2 = new Thread(instance); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(i); } } // 輸出結果: // 2000000
同步靜態方法
靜態方法的同步是指同步在該方法所在的類對象上。因為在 JVM 中一個類只能對應一個類對象,所以同時只允許一個線程執行同一個類中的靜態同步方法。
對於不同類中的靜態同步方法,一個線程可以執行每個類中的靜態同步方法而無需等待。不管類中的那個靜態同步方法被調用,一個類只能由一個線程同時執行。
@ThreadSafe public class SynchronizedDemo03 implements Runnable { static int i = 0; public static synchronized void increase() { i++; } @Override public void run() { for (int j = 0; j < 100000; j++) { increase(); } } public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(new SynchronizedDemo03()); Thread t2 = new Thread(new SynchronizedDemo03()); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(i); } } // 輸出結果: // 200000
同步代碼塊
有時你不需要同步整個方法,而是同步方法中的一部分。Java 可以對方法的一部分進行同步。
註意 Java 同步塊構造器用括號將對象括起來。在上例中,使用了 this
,即為調用 add 方法的實例本身。在同步構造器中用括號括起來的對象叫做監視器對象。上述代碼使用監視器對象同步,同步實例方法使用調用方法本身的實例作為監視器對象。
一次只有一個線程能夠在同步於同一個監視器對象的 Java 方法內執行。
@ThreadSafe public class SynchronizedDemo04 implements Runnable { static int i = 0; static SynchronizedDemo04 instance = new SynchronizedDemo04(); @Override public void run() { synchronized (instance) { for (int j = 0; j < 100000; j++) { i++; } } } public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(instance); Thread t2 = new Thread(instance); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(i); } } // 輸出結果: // 200000
synchronized 的原理
synchronized 實現同步的基礎是:Java 中的每一個對象都可以作為鎖。
- 對於普通同步方法,鎖是當前實例對象。
- 對於靜態同步方法,鎖是當前類的 Class 對象。
- 對於同步方法塊,鎖是 Synchonized 括號裏配置的對象。
?? 參考閱讀:Java 並發編程:synchronized ?? 參考閱讀:深入理解 Java 並發之 synchronized 實現原理
volatile
volatile 的要點
volatile 是輕量級的 synchronized,它在多處理器開發中保證了共享變量的“可見性”。
可見性的意思是當一個線程修改一個共享變量時,另外一個線程能讀到這個修改的值。
一旦一個共享變量(類的成員變量、類的靜態成員變量)被 volatile 修飾之後,那麽就具備了兩層語義:
- 保證了不同線程對這個變量進行操作時的可見性,即一個線程修改了某個變量的值,這新值對其他線程來說是立即可見的。
- 禁止進行指令重排序。
如果一個字段被聲明成 volatile,Java 線程內存模型確保所有線程看到這個變量的值是一致的。
volatile 的原理
觀察加入 volatile 關鍵字和沒有加入 volatile 關鍵字時所生成的匯編代碼發現,加入 volatile 關鍵字時,會多出一個 lock 前綴指令。
lock 前綴指令實際上相當於一個內存屏障(也成內存柵欄),內存屏障會提供 3 個功能:
- 它確保指令重排序時不會把其後面的指令排到內存屏障之前的位置,也不會把前面的指令排到內存屏障的後面;即在執行到內存屏障這句指令時,在它前面的操作已經全部完成;
- 它會強制將對緩存的修改操作立即寫入主存;
- 如果是寫操作,它會導致其他 CPU 中對應的緩存行無效。
volatile 的應用場景
如果 volatile 變量修飾符使用恰當的話,它比 synchronized 的使用和執行成本更低,因為它不會引起線程上下文的切換和調度。
但是,volatile 無法替代 synchronized ,因為 volatile 無法保證操作的原子性。通常來說,使用 volatile 必須具備以下 2 個條件:
- 對變量的寫操作不依賴於當前值
- 該變量沒有包含在具有其他變量的不變式中
應用場景:
狀態標記量
volatile boolean flag = false; while(!flag) { doSomething(); } public void setFlag() { flag = true; } double check class Singleton { private volatile static Singleton instance = null; private Singleton() {} public static Singleton getInstance() { if(instance==null) { synchronized (Singleton.class) { if(instance==null) instance = new Singleton(); } } return instance; } }
?? 參考閱讀:Java 並發編程:volatile 關鍵字解析
CAS
簡介
CAS(Compare and Swap),字面意思為比較並交換。CAS 有 3 個操作數,內存值 V,舊的預期值 A,要修改的新值 B。當且僅當預期值 A 和內存值 V 相同時,將內存值 V 修改為 B,否則什麽都不做。
操作
我們常常做這樣的操作
if(a==b) { a++; }
試想一下如果在做 a++之前 a 的值被改變了怎麽辦?a++還執行嗎?出現該問題的原因是在多線程環境下,a 的值處於一種不定的狀態。采用鎖可以解決此類問題,但 CAS 也可以解決,而且可以不加鎖。
int expect = a; if(a.compareAndSet(expect,a+1)) { doSomeThing1(); } else { doSomeThing2(); }
這樣如果 a 的值被改變了 a++就不會被執行。按照上面的寫法,a!=expect 之後,a++就不會被執行,如果我們還是想執行 a++操作怎麽辦,沒關系,可以采用 while 循環
while(true) { int expect = a; if (a.compareAndSet(expect, a + 1)) { doSomeThing1(); return; } else { doSomeThing2(); } }
采用上面的寫法,在沒有鎖的情況下實現了 a++操作,這實際上是一種非阻塞算法。
應用
非阻塞算法 (nonblocking algorithms)
一個線程的失敗或者掛起不應該影響其他線程的失敗或掛起的算法。
現代的 CPU 提供了特殊的指令,可以自動更新共享數據,而且能夠檢測到其他線程的幹擾,而 compareAndSet() 就用這些代替了鎖定。
拿出 AtomicInteger 來研究在沒有鎖的情況下是如何做到數據正確性的。
private volatile int value;
首先毫無疑問,在沒有鎖的機制下可能需要借助 volatile 原語,保證線程間的數據是可見的(共享的)。
這樣才獲取變量的值的時候才能直接讀取。
public final int get() { return value; }
然後來看看++i 是怎麽做到的。
public final int incrementAndGet() { for (;;) { int current = get(); int next = current + 1; if (compareAndSet(current, next)) return next; } }
在這裏采用了 CAS 操作,每次從內存中讀取數據然後將此數據和+1 後的結果進行 CAS 操作,如果成功就返回結果,否則重試直到成功為止。
而 compareAndSet 利用 JNI 來完成 CPU 指令的操作。
public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); }
整體的過程就是這樣子的,利用 CPU 的 CAS 指令,同時借助 JNI 來完成 Java 的非阻塞算法。其它原子操作都是利用類似的特性完成的。
其中 unsafe.compareAndSwapInt(this, valueOffset, expect, update)類似:
if (this == expect) { this = update return true; } else { return false; }
那麽問題就來了,成功過程中需要 2 個步驟:比較 this == expect,替換 this = update,compareAndSwapInt 如何這兩個步驟的原子性呢? 參考 CAS 的原理
原理
Java 代碼如何確保處理器執行 CAS 操作?
CAS 通過調用 JNI(JNI:Java Native Interface 為 Java 本地調用,允許 Java 調用其他語言。)的代碼實現的。JVM 將 CAS 操作編譯為底層提供的最有效方法。在支持 CAS 的處理器上,JVM 將它們編譯為相應的機器指令;在不支持 CAS 的處理器上,JVM 將使用自旋鎖。
特點
優點
一般情況下,比鎖性能更高。因為 CAS 是一種非阻塞算法,所以其避免了線程被阻塞時的等待時間。
缺點
ABA 問題
因為 CAS 需要在操作值的時候檢查下值有沒有發生變化,如果沒有發生變化則更新,但是如果一個值原來是 A,變成了 B,又變成了 A,那麽使用 CAS 進行檢查時會發現它的值沒有發生變化,但是實際上卻變化了。ABA 問題的解決思路就是使用版本號。在變量前面追加上版本號,每次變量更新的時候把版本號加一,那麽 A-B-A 就會變成 1A-2B-3A。
從 Java1.5 開始 JDK 的 atomic 包裏提供了一個類 AtomicStampedReference 來解決 ABA 問題。這個類的 compareAndSet 方法作用是首先檢查當前引用是否等於預期引用,並且當前標誌是否等於預期標誌,如果全部相等,則以原子方式將該引用和該標誌的值設置為給定的更新值。
循環時間長開銷大
自旋 CAS 如果長時間不成功,會給 CPU 帶來非常大的執行開銷。如果 JVM 能支持處理器提供的 pause 指令那麽效率會有一定的提升,pause 指令有兩個作用,第一它可以延遲流水線執行指令(de-pipeline),使 CPU 不會消耗過多的執行資源,延遲的時間取決於具體實現的版本,在一些處理器上延遲時間是零。第二它可以避免在退出循環的時候因內存順序沖突(memory order violation)而引起 CPU 流水線被清空(CPU pipeline flush),從而提高 CPU 的執行效率。
比較花費 CPU 資源,即使沒有任何用也會做一些無用功。
只能保證一個共享變量的原子操作
當對一個共享變量執行操作時,我們可以使用循環 CAS 的方式來保證原子操作,但是對多個共享變量操作時,循環 CAS 就無法保證操作的原子性,這個時候就可以用鎖,或者有一個取巧的辦法,就是把多個共享變量合並成一個共享變量來操作。比如有兩個共享變量 i = 2,j=a,合並一下 ij=2a,然後用 CAS 來操作 ij。從 Java1.5 開始 JDK 提供了 AtomicReference 類來保證引用對象之間的原子性,你可以把多個變量放在一個對象裏來進行 CAS 操作。
總結
可以用 CAS 在無鎖的情況下實現原子操作,但要明確應用場合,非常簡單的操作且又不想引入鎖可以考慮使用 CAS 操作,當想要非阻塞地完成某一操作也可以考慮 CAS。不推薦在復雜操作中引入 CAS,會使程序可讀性變差,且難以測試,同時會出現 ABA 問題。
免費Java資料需要自己領取,涵蓋了Java、Redis、MongoDB、MySQL、Zookeeper、Spring Cloud、Dubbo/Kafka、Hadoop、Hbase、Flink等高並發分布式、大數據、機器學習等技術。
傳送門:https://mp.weixin.qq.com/s/JzddfH-7yNudmkjT0IRL8Q
並發機制的底層實現