1. 程式人生 > >JAVA 併發與高併發知識筆記(二)

JAVA 併發與高併發知識筆記(二)

一、併發安全、不安全描述

安全:多個執行緒操作同一個資源,最後的執行結果與單執行緒執行結果一致,則說明是執行緒安全的

不安全:多個執行緒操作同一個資源,最後執行結果不確定的,則說明不是執行緒安全的

這裡我覺得還是解釋一下併發與並行的一點區別比好(並非絕對概念),併發通常是多個執行緒去競爭相同資源,而並行通常是多個執行緒之間是協作關係,例如,在秒殺場景下,多個使用者(執行緒)共同爭搶某個資源,這個是併發。例如,多個執行緒統計一個幾千萬資料的檔案,這個時候執行緒之間是協作關係,每個執行緒各自統計分配的一段資料,最後彙總給主執行緒。

二、常見併發模擬工具以及程式碼模擬併發(工具使用之後再單獨學習)

a) Postman :http 請求模擬工具

b) AB (Apache Bench) :Apache 附帶的模擬工具,主要用來測試網站效能

c) JMeter : Apache 組織開發的壓力測試工具

d) 使用 CountDownLatch、Semaphore 進行併發模擬 (在筆記一中已經提到)

三、程式碼模擬併發

a) CountDownLatch 介紹:該類為一個計數器,字面意思就是向下減的一個閉鎖類


上圖解釋:

     TA 為主執行緒,主執行緒初始化 CountDownLatch 計數器為3,T1~3 為子執行緒,主執行緒呼叫CountDownLatch的 await 後就開始阻塞,直到T1~3 呼叫 CountDownLatch 的 countDown() 方法將計數器減為0,然後主執行緒繼續執行。

b) Semaphore 介紹:

     字面意思是訊號量,它可以控制同一時間內有多少個執行緒可以執行,比如我們常見的馬路,有4車道,8車道,可以把這裡的車道比作執行緒,4車道相當於4個執行緒同時執行,8車道想相當於8個執行緒執行。semaphore 就是好比是這個車道,可以指定有多個車道,從對訊號量的功能描述,可以想到在實際開發中可以用來限制同一時間請求介面的次數,通常semaphore 會與執行緒池配合使用。

c) 併發模擬程式碼(Not thread safe)

/**
 * 併發模擬
 * 
 * @author Aaron
 *
 */
@NotThreadSafe
@Slf4j
public class ConcurrencyTest1 {

	// 模擬 1000個使用者請求
	private final static int TotalClient = 1000;

	// 限制同一時間只能有10個執行緒執行
	private final static int TotalThread = 10;
	// 計數器
	private static int count = 0;

	public static void main(String[] args) throws InterruptedException {
		ExecutorService es = Executors.newCachedThreadPool();
		// 設定訊號量,允許同時最多執行的執行緒數
		final Semaphore sp = new Semaphore(TotalThread);
		final CountDownLatch cdl = new CountDownLatch(TotalClient);
		for (int i = 0; i < TotalClient; i++) {
			es.execute(new Runnable() {
				@Override
				public void run() {
					try {
						sp.acquire();
						add();
						sp.release();
					} catch (InterruptedException e) {
						log.error("A", e);
					}
					cdl.countDown();
				}
			});
		}
		// 中斷主線層程式碼,直至countdownlatch 的計數器變為0
		cdl.await();
		es.shutdown();
		log.info(String.valueOf(count));
	}

	@NotThreadSafe
	public static void add() {
		count++;
	}

}
d ) 併發模擬daim(Thread safe)
/**
 * 併發模擬
 * 
 * @author Aaron
 *
 */
@ThreadSafe
@Recommend
@Slf4j
public class ConcurrencyTest2 {

	// 模擬 1000個使用者請求
	private final static int TotalClient = 1000;

	// 限制同一時間只能有10個執行緒執行
	private final static int TotalThread = 10;

	// 使用原子類
	private final static AtomicInteger count = new AtomicInteger(0);

	public static void main(String[] args) throws InterruptedException {
		ExecutorService es = Executors.newCachedThreadPool();
		// 設定訊號量,允許同時最多執行的執行緒數
		final Semaphore sp = new Semaphore(TotalThread);
		final CountDownLatch cdl = new CountDownLatch(TotalClient);
		for (int i = 0; i < TotalClient; i++) {
			es.execute(new Runnable() {
				@Override
				public void run() {
					try {
						sp.acquire();
						add();
						sp.release();
					} catch (InterruptedException e) {
						log.error("A", e);
					}
					cdl.countDown();
				}
			});
		}
		// 中斷主線層程式碼,直至countdownlatch 的計數器變為0
		cdl.await();
		es.shutdown();
		log.info(String.valueOf(count.get()));
	}

	@ThreadSafe
	public static void add() {
		count.incrementAndGet();
	}

}

四、類的執行緒安全性定義

      當多個執行緒同時訪問一個類時,不管執行時環境採用何種方式呼叫或者這些執行緒如何交替執行, 並且在主調程式碼中不需要做額外的同步或協同操作,這個類始終表現出正確的行為,那麼這個類就是執行緒安全的。

五、執行緒安全的主要體現點

a) 原子性:

    原子性可以解釋為互斥性訪問,既同一時刻,只能有一個執行緒進行操作

b) 可見性:

   某個執行緒對主記憶體的修改,其它執行緒必須能及時觀察到

c) 有序性:

   某個執行緒觀察其它執行緒中的指令執行順序,由於指令重排序的存在,通常觀察到的是雜亂無序的

六、CAS 原理 (以 AtomicInteger 為參考)

由於學習發現我的 eclipse 看不到 unsafe 原始碼,其它原始碼可以看到,所以特意安裝了反編譯外掛(Decompiler

地址:https://www.cnblogs.com/godtrue/p/5499785.html

a ) CAS 是 unsafe 中的 compareAndSwapInt 方法的縮寫

b) 原理,在 AtomicInteger 的 incrementAndGet 方法裡呼叫了 unsafe 中的 getAndAddInt 方法,在該方法中,核心的方法是 compareAdnSwapInt 方法,核心原理是通過物件以及值的記憶體地址取出當前值,然後再進行比較,如果比較是值發生了更改則重新取出最新的值再繼續比較,直到比較成功,然後更新值。

// 標記為 native 的方法,說明不是用java實現的,通常是由C、C++ 等等實現的
// 第一個引數是當前物件,第二個引數是值所對應的記憶體地址
public native int getIntVolatile(Object arg0, long arg1);

// 也是標記為 native 的方法,也是核心方法
// 第一個引數是當前物件,第二個引數是值所對應的記憶體地址,第三個引數為記憶體值,第四個引數為準備更新的值
public final native boolean compareAndSwapInt(Object arg0, long arg1, int arg3, int arg4);

// AtomicInteger 中呼叫的是該方法
// 第一個引數是當前物件,第二個引數是值的記憶體地址,第三個引數為增加量
public final int getAndAddInt(Object arg0, long arg1, int arg3) {
	int arg4;
	do {
            // 取出當前記憶體值(預期值)
	     arg4 = this.getIntVolatile(arg0, arg1);
            // 比較當前記憶體值是否與預期值相等,如果不相等則繼續比較,如果相等則返回當前的記憶體值
            // 如果預期值 與 arg1 指向的值一樣,則更新為 arg4 + arg3 
            // 如果不一樣則繼續迴圈,直到完成更新
	} while (!this.compareAndSwapInt(arg0, arg1, arg4, arg4 + arg3));

	return arg4;
}

c) CAS 缺點

    分析CAS原始碼後可以發現,如果大量執行緒進行CAS操作,那麼競爭就會很激烈,導致一部分執行緒由於總是比較失敗而長時間停留在迴圈體中,可能會有瞬間或一段時間的CPU過載,影響系統性能。

d) (JDK1.8 新增 ) LongAdder 與 DoubleAdder 處理思想

     對於普通型別的 long 或 double 型別的變數,JVM 允許將64位的讀操作或寫操作拆分成兩個32位的讀寫操作,該處理方式的主要思想是將熱點資料分離,將內部 Value 分離成一個Cell 陣列,當多個執行緒訪問時通過HASH等演算法將執行緒對映到其中一個Cell 上進行操作,最終的計算結果則是Cell 陣列的求和值,當低併發的時候,演算法會直接更新變數的值,在高併發的時候通過分散操作Cell 提高效能,當然缺點也是有的,當併發更改以及呼叫sum操作時,sum統計的值可能不準確,以下是原話。

    /**
     * Returns the current sum.  The returned value is <em>NOT</em> an
     * atomic snapshot; invocation in the absence of concurrent (意思是在非併發的情況下使用)
     * updates returns an accurate result, but concurrent updates that
     * occur while the sum is being calculated might not be
     * incorporated.
     *
     * @return the sum
     */
    public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

七、java.util.concurrent.atomic 包


八、AtomicReference 與 AtomicIntegerFieldUpdater

a) AtomicReference 基本使用

該類提供一個泛型引數,用於對多種物件的原子操作(注意是物件的操作,如果傳入原子類,則是對這個原子類本身的原子操作,並非是原子類中資料的原子操作),以下為簡單的示例

@ThreadSafe
@Slf4j
public class AtomicReferenceTest {

	private static AtomicReference<Integer> ar = new AtomicReference<Integer>(0);

	public static void main(String[] args) {
		// 比較更新方法,如果是值是0,則更新為1
		log.info("{} -> {} - {}", 0, 1, ar.compareAndSet(0, 1));
		// 獲取原先的值,並設定為指定的新值
		log.info("{} -> {}", ar.getAndSet(3), ar.get());
		
		// 以下是原始碼實現,核心還是使用的Unsafe類的方法
		// /**
		// * Atomically sets the value to the given updated value
		// * if the current value {@code ==} the expected value.
		// * @param expect the expected value
		// * @param update the new value
		// * @return {@code true} if successful. False return indicates that
		// * the actual value was not equal to the expected value.
		// */
		// public final boolean compareAndSet(V expect, V update) {
		// native 原子方法
		// return unsafe.compareAndSwapObject(this, valueOffset, expect,
		// update);
		// }
	}
}

b) AtomicIntegerFieldUpdater
基本使用

該類提供一個泛型引數,用於對物件內部的成員變數進行原子操作,以下為簡單的示例

@Slf4j
public class AtomicIntegerFieldUpdaterTest {
	private static AtomicIntegerFieldUpdater<AtomicIntegerFieldUpdaterTest> a = AtomicIntegerFieldUpdater
			.newUpdater(AtomicIntegerFieldUpdaterTest.class, "value");
	// 變數必須是 int 基本型別,不能是物件型別
	// 變數必須有 volatile 關鍵字修飾
	// 以下是原始碼中的判斷
	// if (field.getType() != int.class)
	// throw new IllegalArgumentException("Must be integer type");
	//
	// if (!Modifier.isVolatile(modifiers))
	// throw new IllegalArgumentException("Must be volatile type");
	@Getter
	// 未初始化預設是0
	private volatile int value;

	public static void main(String[] args) {
         AtomicIntegerFieldUpdaterTest aifu = new AtomicIntegerFieldUpdaterTest();
          // 比較&設定
	 a.compareAndSet(aifu, 0, 2);
         log.info("{}", aifu.getValue());
}
}九、解決 CAS 的ABA問題

a) ABA問題解釋:

     當多個執行緒操作一個資源時,T1取出值A,T2執行緒也取出值A,這個時候T2執行過程中將A變為B又變回A,然後T1繼續執行發現與自己的值相同,然後進行了更新操作,操作雖然成功,但是這個過程卻是有隱患的,比如對一個單項鍊表操作,T1 取出棧頂A與下一個棧B,想用CAS替換棧頂A為B,在T1執行CAS操作之前,這時候T2取出A和B,然後push了A、C、D,這個時候B屬於獨立的連結串列,處於遊離狀態(當前有兩個連結串列 A->C->D->NULL ,還有一個遊離的 B->NULL),然後T1開始執行CAS操作,發現ACD棧頂還是A,然後開始處理,由於之前已經取出B,當時的B->NULL 這樣的,最後結果是把T2的 CD 給丟了。

b) 為了解決ABA問題,有了 AtomicStampedReference 這個類

    該類的核心思想是,每次操作都有個一 version 來記錄,例如 T2 取出A時版本是 1,更新為B後版本號變為2,再更新為A時版本號變為3,此時由於有版本號控制,T1 再來更新A時發現自己的版本號1 與 3 不一致,最後CAS操作失敗。

示例:

@Slf4j
public class ABATest {
	// 普通原子類
	private static AtomicInteger atomicInt = new AtomicInteger(100);
	// 有版本號的實現(引數是初始值與初始版本號)
	private static AtomicStampedReference<Integer> atomicStampedRef = new AtomicStampedReference<Integer>(100, 0);

	public static void main(String[] args) throws InterruptedException {
		// 模擬 B->A
		Thread intT1 = new Thread(new Runnable() {
			@Override
			public void run() {
				// A->B
				atomicInt.compareAndSet(100, 101);
				// B->A
				atomicInt.compareAndSet(101, 100);
			}
		});
		// 模擬 A->B
		Thread intT2 = new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					// 執行緒休眠,給 T1 執行
					TimeUnit.SECONDS.sleep(1);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				// A->B
				boolean c3 = atomicInt.compareAndSet(100, 101);
				log.info("一般 CAS={}", c3);// 操作成功
			}
		});

		intT1.start();
		intT2.start();
		intT1.join();
		intT2.join();

		Thread refT1 = new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					TimeUnit.SECONDS.sleep(1);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				// A-B 版本號 1
				atomicStampedRef.compareAndSet(100, 101, atomicStampedRef.getStamp(), atomicStampedRef.getStamp() + 1);
				// B-A 版本號 2
				atomicStampedRef.compareAndSet(101, 100, atomicStampedRef.getStamp(), atomicStampedRef.getStamp() + 1);
			}
		});

		Thread refT2 = new Thread(new Runnable() {
			@Override
			public void run() {
				// T2取出版本號
				int stamp = atomicStampedRef.getStamp();
				log.info("有版本號,執行緒休眠之前:stamp={}", stamp);
				try {
					// 休眠2秒
					TimeUnit.SECONDS.sleep(2);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				// T1 的版本號
				log.info("有版本號,執行緒休眠之後:stamp={}", atomicStampedRef.getStamp());
				// A->B ,T1 將版本號增加到了2,然後執行時由於T2
				// 持有的版本號還是之前的0,與當前的版本號2不一致,最中CAS操作失敗
				boolean c3 = atomicStampedRef.compareAndSet(100, 101, stamp, stamp + 1);
				log.info("有版本號 CAS={}", c3);
			}
		});

		refT1.start();
		refT2.start();
	}

十、 AtomicBoolean 示例

可以使用該類來保證某項操作只執行一次

@Slf4j
public class AtomicBooleanTest {

	private static AtomicBoolean ab = new AtomicBoolean(true);

	public static void main(String[] args) throws InterruptedException {
		ExecutorService es = Executors.newCachedThreadPool();
		int c = 1000;
		int s = 100;
		final Semaphore sh = new Semaphore(s);
		final CountDownLatch cdl = new CountDownLatch(c);
		for (int i = 0; i < c; i++) {
			es.execute(new Runnable() {
				@Override
				public void run() {
					try {
						sh.acquire();
						init();
						sh.release();
					} catch (InterruptedException e) {
						log.error("Error", e);
					}

					cdl.countDown();
				}
			});
		}

		cdl.await();
		es.shutdown();
	}

	private static void init() {
		// 個人理解這麼寫會有效率問題,因為每次都要進行CAS比較,應該加一層 if 判斷,如 init2
		if (ab.compareAndSet(true, false)) {
			log.info(".......init.....OK");
		}
	}

	private static void init2() {
		if (ab.get()) {
			if (ab.compareAndSet(true, false)) {
				log.info(".......init.....OK");
			}
		}
	}
}