1. 程式人生 > >java高併發實戰(十)——併發除錯和JDK8新特性

java高併發實戰(十)——併發除錯和JDK8新特性

由於之前看的容易忘記,因此特記錄下來,以便學習總結與更好理解,該系列博文也是第一次記錄,所有有好多不完善之處請見諒與留言指出,如果有幸大家看到該博文,希望報以參考目的看瀏覽,如有錯誤之處,謝謝大家指出與留言。

一、內容提要

 多執行緒除錯的方法

 執行緒dump及分析

 JDK8對併發的新支援

        – LongAdder

        – CompletableFuture

        – StampedLock

二、多執行緒除錯的方法

多執行緒執行順序不一致,有時很難重現bug

如下面例子:

01 public class UnsafeArrayList {
02 static ArrayList al=new ArrayList();
03 static class AddTask implements Runnable{
04 @Override
05 public void run() {
06 try {
07 Thread.sleep(100);
08 } catch (InterruptedException e) {}
09 for(int i=0;i<1000000;i++)
10 al.add(new Object());
11 }
12 }
13 public static void main(String[] args) throws InterruptedException {
14 Thread t1=new Thread(new AddTask(),"t1");
15 Thread t2=new Thread(new AddTask(),"t2");
16 t1.start();
17 t2.start();
18 Thread t3=new Thread(new Runnable(){
19 @Override
20 public void run() {
21 while(true){
22 try {
23 Thread.sleep(1000);
24 } catch (InterruptedException e) {}
25 }
26 }
27 },"t3");
28 t3.start();
29 }}

這是一個數組當容量不足時去擴容。在這裡下個斷點。下面展示出此時完整的堆疊,供檢視當前執行緒在幹什麼。

當所有現成經過這個點時,執行緒都會停下來,除錯比較麻煩。所以通過條件端點去除錯,去篩選出自己要檢視的執行緒,在指定執行緒停下來。


設定之後可以看出下面堆疊,這才是想看到的結果。當然經過dug跑到這裡也可以。


當我們看到上面堆疊顯示的執行緒前面有兩個黃色豎線,表示當前執行緒時被斷下來的。綠色的三角形表示執行緒在執行著。同時我們也可以選中堆疊中的t1或t2中的任何一個,然後執行dug,則可以單獨執行某一個執行緒,進行除錯指定的執行緒。


上面這個我們可以通過上面停止VM虛擬機器,把所有執行緒都停下來,但他會產生不穩定性,根據情況而定,一般會在當其他執行緒影響該執行緒結果時,可以使用停止VM。預設情況是是停斷執行緒設定(執行當前執行緒會斷下來當前這個要停斷的執行緒)。

下面是使用VM情況中斷的堆疊情況:


三、執行緒Dump分析

         jstack 3992

        使用jstack工具去分析,匯出當前正在執行的虛擬機器下面的所有執行緒,檢視執行緒都在做什麼,那個在卡死狀態等等去推理問題出現的情況。還有會Dump出來時發現每次Dump出來的執行緒總在執行一個步驟一句話,也沒有任何鎖的資訊。說明他可能發生死迴圈。

         在%JAVA_HOME%/bin

           他的工具在這個路徑可以找到它。

         分析死鎖案例

        程式碼就是上面給出的案例,通過上面除錯方法測試出out of bound的出現原因。就是線上程t1執行到10時,資料在執行會擴容,但當t2也執行到這個地方發現當前陣列也是10,他也執行擴容,但t1並不知道,也執行擴容,就出現了out of bound異常問題。

        使用jstack  :cmd中執行jstack 3992可以看到當前執行緒的情況。執行緒誰在等待誰,誰在擁有那個執行緒。wait for可以檢視裡面的他在等在的是誰,等了很久。然後去檢視這個東西被那個物件長時間擁有,給佔用,但也不一定是死鎖,但他就在等待很長時間,也說明系統有問題,然後通過通過-l命令打印出等待現象情況。

四、JDK8對併發的新支援

1、 LongAdder(累加器)

    – 和AtomicInteger類似的使用方式

      他的效能在高併發下AtomicLong效能更好。

    – 在AtomicInteger上進行了熱點分離

        比如對等個hashMap加鎖,當熱點分離是把hashmap分16分,對16份分別加鎖,就會盡可能避免衝突。是每一次的cas更新可能性提高,效能就所有提高。如下圖:

    – public void add(long x) 在原子上也有所實現,以及下面方法

    – public void increment()

    – public void decrement()

    – public long sum()

    – public long longValue()

    – public int intValue()

LongAdder內部本來把原來表示一個整數的那個數字,分解成一個數組,每一個小單元,每個單元都是一個整數,當執行緒進來時,把資料打散到分成的多個單元格上,多執行緒時,每個執行緒對應每個單元格,這樣就減少了執行緒的衝突,cas更新成功率有所提高。效能就會增加。但執行緒比較少時,非高併發下,他也做處理了,並不會無條件查分打散成陣列,他內部也會維護一個base的元素,相當於原子long型別對應的資料,每當你對這個元素進行累加,當他發現有衝突的時候,就會建立cell陣列,之後再會把執行緒對映這個cell陣列,當執行緒多時,cell會不斷增加擴大。有點自適應的思想。


2、 CompletableFuture(工具類)

    – 實現CompletionStage介面(大概有40多個方法)

    – Java 8中對Future的增強版

    – 支援流式呼叫 如下:


    – 完成後得到通知     (實現future功能)

01 public static class AskThread implements Runnable {
02 CompletableFuture<Integer> re = null;
03
04 public AskThread(CompletableFuture<Integer> re) {
05 this.re = re;
06 }
07
08 @Override
09 public void run() {
10 int myRe = 0;
11 try {
12 myRe = re.get() * re.get();
13 } catch (Exception e) {
14 }
15 System.out.println(myRe);
16 }
17 }
18
19 public static void main(String[] args) throws InterruptedException {
20 final CompletableFuture<Integer> future = new CompletableFuture<>();
21 new Thread(new AskThread(future)).start();
22 // 模擬長時間的計算過程
23 Thread.sleep(1000);
24 // 告知完成結果
25 future.complete(60);//使future更靈活  自由決定什麼時候進行通知。
26 }

    – 非同步執行  跟普通future很接近但更多用在函數語言程式設計情況,支援函數語言程式設計

01 public static Integer calc(Integer para) {
02 try {
03 // 模擬一個長時間的執行
04 Thread.sleep(1000);
05 } catch (InterruptedException e) {
06 }
07 return para*para;
08 }
09
10 public static void main(String[] args) throws InterruptedException, ExecutionException {
11 final CompletableFuture<Integer> future =
12 CompletableFuture.supplyAsync(() -> calc(50));
13 System.out.println(future.get());
14 }

    – 工廠方法

static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
static CompletableFuture<Void> runAsync(Runnable runnable);
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

    – 流式呼叫  通過工程方法建立例項後,可以通過流式方式對結果進行進一步操作

01 public static Integer calc(Integer para) {
02 try {
03 // 模擬一個長時間的執行
04 Thread.sleep(1000);
05 } catch (InterruptedException e) {
06 }
07 return para*para;
08 }
09
10 public static void main(String[] args) throws InterruptedException, ExecutionException {
11 CompletableFuture<Void> fu=CompletableFuture.supplyAsync(() -> calc(50))  //通過介面,構造例項;可看出這個介面更多傾向於函數語言程式設計
12 .thenApply((i)->Integer.toString(i))
13 .thenApply((str)->"\""+str+"\"")
14 .thenAccept(System.out::println);
15 fu.get();
16 }

    – 組合多個CompletableFuture

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
01 public static Integer calc(Integer para) {
02 return para/2;
03 }
04
05 public static void main(String[] args) throws InterruptedException, ExecutionException {
06 CompletableFuture<Void> fu =
07 CompletableFuture.supplyAsync(() -> calc(50))
08 .thenCompose((i)->CompletableFuture.supplyAsync(() -> calc(i)))
09 .thenApply((str)->"\"" + str + "\"").thenAccept(System.out::println);
10 fu.get();
11 }

這介面類主要功能是提供函數語言程式設計,同時跟普通future比他提供了可以由開發是決定什麼時候完成這個通知,去做future操作。這類跟效能感覺沒有什麼關係,對編碼量大大減少,方便開發。

4、StampedLock

而鎖分離的重要的實現就是ReadWriteLock。而StampedLock則是ReadWriteLock的一個改進。StampedLock與ReadWriteLock的區別在於,StampedLock認為讀不應阻塞寫,StampedLock認為當讀寫互斥的時候,讀應該是重讀,而不是不讓寫執行緒寫。這樣的設計解決了讀多寫少時,使用ReadWriteLock會產生寫執行緒飢餓現象。

所以StampedLock是一種偏向於寫執行緒的改進。

    – 讀寫鎖的改進(他認為讀時候,不應該也阻塞寫。應該通過從讀操作)當讀太多,寫堵塞,會發生飢餓現象。

    – 讀不阻塞寫

01 public class Point {
02 private double x, y;
03 private final StampedLock sl = new StampedLock();  //類似時間戳,每次操作會不斷累加
04
05 void move(double deltaX, double deltaY) { // an exclusively locked method
06 long stamp = sl.writeLock();
07 try {
08 x += deltaX;
09 y += deltaY;
10 } finally {
11 sl.unlockWrite(stamp);
12 }
13 }
14
15 double distanceFromOrigin() { // A read-only method
16 long stamp = sl.tryOptimisticRead();  //樂觀讀,所以事先讀不會堵塞寫
17 double currentX = x, currentY = y;//讀xy值,這裡並不一定是一直的。
18 if (!sl.validate(stamp)) {  //驗證讀的資料是否一致
19 stamp = sl.readLock();//如果失敗,也可以通過悲觀讀去去處理
20 try {
21 currentX = x;
22 currentY = y;
23 } finally {
24 sl.unlockRead(stamp);
25 }
26 }
27 return Math.sqrt(currentX * currentX + currentY * currentY);
28 }
29 }

述程式碼模擬了寫執行緒和讀執行緒, StampedLock根據stamp來檢視是否互斥,寫一次stamp變增加某個值

tryOptimisticRead()
就是剛剛所說的讀寫不互斥的情況。

每次讀執行緒要讀時,會先判斷

if (!sl.validate(stamp))
validate中會先檢視是否有寫執行緒在寫,然後再判斷輸入的值和當前的 stamp是否相同,即判斷是否讀執行緒將讀到最新的資料。如果有寫執行緒在寫,或者 stamp數值不同,則返回失敗。

如果判斷失敗,當然可以重複的嘗試去讀,在示例程式碼中,並沒有讓其重複嘗試讀,而採用的是將樂觀鎖退化成普通的讀鎖去讀,這種情況就是一種悲觀的讀法。

stamp = sl.readLock();

(1)StampedLock的實現思想(解決上面驗證失敗後通過自旋鎖去做處理)

    – CLH自旋鎖(他也使用了一種叫CLH的自旋鎖)

  當鎖申請失敗時,不會立即將讀執行緒掛起,在鎖當中會維護一個等待執行緒佇列,所有申請鎖,但是沒有成功的執行緒都記錄在這個佇列中; 鎖維護一個等待執行緒佇列,所有申請鎖,但是沒有成功的執行緒都記錄在這個佇列中。每一個節點(一個節點代表一個執行緒),儲存一個標記位(locked),用於判斷當前執行緒是否已經釋放鎖。當一個執行緒試圖獲得鎖時,取得當前等待佇列的尾部節點作為其前序節點。並使用類似如下程式碼判斷前序節點是否已經成功釋放鎖:while (pred.locked) {}不停的等待前面執行緒釋放鎖。

這個迴圈就是不斷等前面那個結點釋放鎖,這樣的自旋使得當前執行緒不會被作業系統掛起,從而提高了效能。當然他不會進行無休止的自旋,會在在若干次自旋後掛起執行緒。否則CPU的佔有率就會很高。