長文慎入-探索Java併發程式設計與高併發解決方案[轉]
轉自:https://yq.aliyun.com/articles/636038
所有示例程式碼,請見/下載於
https://github.com/Wasabi1234/concurrency
高併發處理的思路及手段
1 基本概念
1.1 併發
同時擁有兩個或者多個執行緒,如果程式在單核處理器上執行多個執行緒將交替地換入或者換出記憶體,這些執行緒是同時“存在"的,每個執行緒都處於執行過程中的某個狀態,如果執行在多核處理器上,此時,程式中的每個執行緒都將分配到一個處理器核上,因此可以同時執行.
1.2 高併發( High Concurrency)
網際網路分散式系統架構設計中必須考慮的因素之一,通常是指,通過設計保證系統能夠同時並行處理很多請求.
1.3 區別與聯絡
- 併發: 多個執行緒操作相同的資源,保證執行緒安全,合理使用資源
- 高併發:服務能同時處理很多請求,提高程式效能
2 CPU
2.1 CPU 多級快取
- 為什麼需要CPU cache
CPU的頻率太快了,快到主存跟不上
如此,在處理器時鐘週期內,CPU常常需要等待主存,浪費資源。所以cache的出現,是為了緩解CPU和記憶體之間速度的不匹配問題(結構:cpu-> cache-> memory ). - CPU cache的意義
- 時間區域性性
如果某個資料被訪問,那麼在不久的將來它很可能被再次訪問 - 空間區域性性
如果某個資料被訪問,那麼與它相鄰的資料很快也可能被訪問
- 時間區域性性
2.2 快取一致性(MESI)
用於保證多個 CPU cache 之間快取共享資料的一致
- M-modified被修改
該快取行只被快取在該 CPU 的快取中,並且是被修改過的,與主存中資料是不一致的,需在未來某個時間點寫回主存,該時間是允許在其他CPU 讀取主存中相應的記憶體之前,當這裡的值被寫入主存之後,該快取行狀態變為 E - E-exclusive獨享
快取行只被快取在該 CPU 的快取中,未被修改過,與主存中資料一致
可在任何時刻當被其他 CPU讀取該記憶體時變成 S 態,被修改時變為 M態 - S-shared共享
該快取行可被多個 CPU 快取,與主存中資料一致 -
I-invalid無效
- 亂序執行優化
處理器為提高運算速度而做出違背程式碼原有順序的優化
併發的優勢與風險
3 專案準備
3.1 專案初始化
自定義4個基本註解
隨手寫個測試類
執行正常
3.2 併發模擬-Jmeter壓測
新增"檢視結果數"和"圖形結果"監聽器
log view 下當前日誌資訊
圖形結果
3.3 併發模擬-程式碼
CountDownLatch
可阻塞執行緒,並保證當滿足特定條件時可繼續執行
Semaphore(訊號量)
可阻塞執行緒,控制同一時間段內的併發量
以上二者通常和執行緒池搭配
下面開始做併發模擬
package com.mmall.concurrency;
import com.mmall.concurrency.annoations.NotThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
* @author shishusheng
* @date 18/4/1
*/
@Slf4j
@NotThreadSafe
public class ConcurrencyTest {
/**
* 請求總數
*/
public static int clientTotal = 5000;
/**
* 同時併發執行的執行緒數
*/
public static int threadTotal = 200;
public static int count = 0;
public static void main(String[] args) throws Exception {
//定義執行緒池
ExecutorService executorService = Executors.newCachedThreadPool();
//定義訊號量,給出允許併發的執行緒數目
final Semaphore semaphore = new Semaphore(threadTotal);
//統計計數結果
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
//將請求放入執行緒池
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
//訊號量的獲取
semaphore.acquire();
add();
//釋放
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
//關閉執行緒池
executorService.shutdown();
log.info("count:{}", count);
}
/**
* 統計方法
*/
private static void add() {
count++;
}
}
執行發現結果隨機,所以非執行緒安全
4執行緒安全性
4.1 執行緒安全性
當多個執行緒訪問某個類時,不管執行時環境採用何種排程方式
或者這些程序將如何交替執行,並且在主調程式碼中不需要任何額外的同步或協同
,這個類都能表現出正確的行為
,那麼就稱這個類是執行緒安全的
4.2 原子性
4.2.1 Atomic 包
- AtomicXXX:CAS,Unsafe.compareAndSwapInt
提供了互斥訪問,同一時刻只能有一個執行緒來對它進行操作
package com.mmall.concurrency.example.atomic;
import com.mmall.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author shishusheng
*/
@Slf4j
@ThreadSafe
public class AtomicExample2 {
/**
* 請求總數
*/
public static int clientTotal = 5000;
/**
* 同時併發執行的執行緒數
*/
public static int threadTotal = 200;
/**
* 工作記憶體
*/
public static AtomicLong count = new AtomicLong(0);
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
System.out.println();
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
//主記憶體
log.info("count:{}", count.get());
}
private static void add() {
count.incrementAndGet();
// count.getAndIncrement();
}
}
package com.mmall.concurrency.example.atomic;
import com.mmall.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicReference;
/**
* @author shishusheng
* @date 18/4/3
*/
@Slf4j
@ThreadSafe
public class AtomicExample4 {
private static AtomicReference<Integer> count = new AtomicReference<>(0);
public static void main(String[] args) {
// 2
count.compareAndSet(0, 2);
// no
count.compareAndSet(0, 1);
// no
count.compareAndSet(1, 3);
// 4
count.compareAndSet(2, 4);
// no
count.compareAndSet(3, 5);
log.info("count:{}", count.get());
}
}
輸出結果
-
AtomicReference,AtomicReferenceFieldUpdater
-
AtomicBoolean
-
AtomicStampReference : CAS的 ABA 問題
4.2.2 鎖
synchronized:依賴 JVM
- 修飾程式碼塊:大括號括起來的程式碼,作用於呼叫的物件
-
修飾方法: 整個方法,作用於呼叫的物件
-
修飾靜態方法:整個靜態方法,作用於所有物件
package com.mmall.concurrency.example.count;
import com.mmall.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
* @author shishusheng
*/
@Slf4j
@ThreadSafe
public class CountExample3 {
/**
* 請求總數
*/
public static int clientTotal = 5000;
/**
* 同時併發執行的執行緒數
*/
public static int threadTotal = 200;
public static int count = 0;
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count:{}", count);
}
private synchronized static void add() {
count++;
}
}
synchronized 修正計數類方法
- 修飾類:括號括起來的部分,作用於所有物件
子類繼承父類的被 synchronized 修飾方法時,是沒有 synchronized 修飾的!!!
Lock: 依賴特殊的 CPU 指令,程式碼實現
4.2.3 對比
- synchronized: 不可中斷鎖,適合競爭不激烈,可讀性好
- Lock: 可中斷鎖,多樣化同步,競爭激烈時能維持常態
- Atomic: 競爭激烈時能維持常態,比Lock效能好; 只能同步一
個值
4.3 可見性
一個執行緒對主記憶體的修改可以及時的被其他執行緒觀察到
4.3.1 導致共享變數線上程間不可見的原因
- 執行緒交叉執行
- 重排序結合線程交叉執行
- 共享變數更新後的值沒有在工作記憶體與主存間及時更新
4.3.2 可見性之synchronized
JMM關於synchronized的規定
- 執行緒解鎖前,必須把共享變數的最新值重新整理到主記憶體
- 執行緒加鎖時,將清空工作記憶體中共享變數的值,從而使
用共享變數時需要從主記憶體中重新讀取最新的值(加鎖與解鎖是同一把鎖
)
4.3.3 可見性之volatile
通過加入記憶體屏障和禁止重排序優化來實現
- 對volatile變數寫操作時,會在寫操作後加入一條store
屏障指令,將本地記憶體中的共享變數值重新整理到主記憶體 -
對volatile變數讀操作時,會在讀操作前加入一條load
屏障指令,從主記憶體中讀取共享變數volatile 寫
volatile 讀
計數類之 volatile 版,非執行緒安全的
- volatile使用
volatile boolean inited = false;
//執行緒1:
context = loadContext();
inited= true;
// 執行緒2:
while( !inited ){
sleep();
}
doSomethingWithConfig(context)
4.4 有序性
一個執行緒觀察其他執行緒中的指令執行順序,由於指令重排序的存在,該觀察結果一般雜亂無序
JMM允許編譯器和處理器對指令進行重排序,但是重排序過程不會影響到單執行緒程式的執行,卻會影響到多執行緒併發執行的正確性
4.4.1 happens-before 規則
5釋出物件
釋出物件
物件逸出
5.1 安全釋出物件
非執行緒安全的懶漢模式
餓漢模式
執行緒安全的懶漢模式
package com.mmall.concurrency.example.singleton;
import com.mmall.concurrency.annoations.NotThreadSafe;
/**
* 懶漢模式 -》 雙重同步鎖單例模式
* 單例例項在第一次使用時進行建立
* @author shishusheng
*/
@NotThreadSafe
public class SingletonExample4 {
/**
* 私有建構函式
*/
private SingletonExample4() {
}
// 1、memory = allocate() 分配物件的記憶體空間
// 2、ctorInstance() 初始化物件
// 3、instance = memory 設定instance指向剛分配的記憶體
// JVM和cpu優化,發生了指令重排
// 1、memory = allocate() 分配物件的記憶體空間
// 3、instance = memory 設定instance指向剛分配的記憶體
// 2、ctorInstance() 初始化物件
/**
* 單例物件
*/
private static SingletonExample4 instance = null;
/**
* 靜態的工廠方法
*
* @return
*/
public static SingletonExample4 getInstance() {
// 雙重檢測機制 // B
if (instance == null) {
// 同步鎖
synchronized (SingletonExample4.class) {
if (instance == null) {
// A - 3
instance = new SingletonExample4();
}
}
}
return instance;
}
}
7 AQS
7.1 介紹
資料結構
- 使用Node實現FIFO佇列,可以用於構建鎖或者其他同步裝置的基礎框架
- 利用了一個int型別表示狀態
- 使用方法是繼承
- 子類通過繼承並通過實現它的方法管理其狀態{acquire 和release} 的方法操縱狀態
- 可以同時實現排它鎖和共享鎖模式(獨佔、共享)
同步元件
CountDownLatch
package com.mmall.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author shishusheng
*/
@Slf4j
public class CountDownLatchExample1 {
private final static int threadCount = 200;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
test(threadNum);
} catch (Exception e) {
log.error("exception", e);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
log.info("finish");
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
Thread.sleep(100);
log.info("{}", threadNum);
Thread.sleep(100);
}
}
package com.mmall.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* 指定時間內處理任務
*
* @author shishusheng
*
*/
@Slf4j
public class CountDownLatchExample2 {
private final static int threadCount = 200;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
test(threadNum);
} catch (Exception e) {
log.error("exception", e);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await(10, TimeUnit.MILLISECONDS);
log.info("finish");
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
Thread.sleep(100);
log.info("{}", threadNum);
}
}
Semaphore用法
CycliBarrier
package com.mmall.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author shishusheng
*/
@Slf4j
public class CyclicBarrierExample1 {
private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}
}
package com.mmall.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author shishusheng
*/
@Slf4j
public class CyclicBarrierExample2 {
private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
try {
barrier.await(2000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.warn("BarrierException", e);
}
log.info("{} continue", threadNum);
}
}
await 超時導致程式拋異常
package com.mmall.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
* @author shishusheng
*/
@Slf4j
public class SemaphoreExample3 {
private final static int threadCount = 20;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
// 嘗試獲取一個許可
if (semaphore.tryAcquire()) {
test(threadNum);
// 釋放一個許可
semaphore.release();
}
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
9 執行緒池
9.1 newCachedThreadPool
9.2 newFixedThreadPool
9.3 newSingleThreadExecutor
看出是順序執行的
9.4 newScheduledThreadPool
10 死鎖
版權宣告:本文內容由網際網路使用者自發貢獻,版權歸作者所有,本社群不擁有所有權,也不承擔相關法律責任。如果您發現本社群中有涉嫌抄襲的內容,歡迎傳送郵件至:[email protected] 進行舉報,並提供相關證據,一經查實,本社群將立刻刪除涉嫌侵權內容。
【雲棲快訊】阿里開發者們的第3個感悟:從身邊開源開始學習,用過才能更好理解程式碼 詳情請點選