1. 程式人生 > >長文慎入-探索Java併發程式設計與高併發解決方案[轉]

長文慎入-探索Java併發程式設計與高併發解決方案[轉]

轉自:https://yq.aliyun.com/articles/636038

所有示例程式碼,請見/下載於
https://github.com/Wasabi1234/concurrency

img_6a0b1613c35351f35bcb4166557c76d2.png

 

img_bf036cb63de51eeb110b043c05266c72.png

高併發處理的思路及手段

 

img_a6fb0ec6c446de186c307c896e2521ee.png

 

1 基本概念

1.1 併發

同時擁有兩個或者多個執行緒,如果程式在單核處理器上執行多個執行緒將交替地換入或者換出記憶體,這些執行緒是同時“存在"的,每個執行緒都處於執行過程中的某個狀態,如果執行在多核處理器上,此時,程式中的每個執行緒都將分配到一個處理器核上,因此可以同時執行.

1.2 高併發( High Concurrency)

網際網路分散式系統架構設計中必須考慮的因素之一,通常是指,通過設計保證系統能夠同時並行處理很多請求.

1.3 區別與聯絡

  • 併發: 多個執行緒操作相同的資源,保證執行緒安全,合理使用資源
  • 高併發:服務能同時處理很多請求,提高程式效能

2 CPU

2.1 CPU 多級快取

img_391c3ae7eeb1206d5d4c4e532cecc68c.png

  • 為什麼需要CPU cache
    CPU的頻率太快了,快到主存跟不上
    如此,在處理器時鐘週期內,CPU常常需要等待主存,浪費資源。所以cache的出現,是為了緩解CPU和記憶體之間速度的不匹配問題(結構:cpu-> cache-> memory ).
  • CPU cache的意義
    1. 時間區域性性
      如果某個資料被訪問,那麼在不久的將來它很可能被再次訪問
    2. 空間區域性性
      如果某個資料被訪問,那麼與它相鄰的資料很快也可能被訪問

2.2 快取一致性(MESI)

用於保證多個 CPU cache 之間快取共享資料的一致

  • M-modified被修改
    該快取行只被快取在該 CPU 的快取中,並且是被修改過的,與主存中資料是不一致的,需在未來某個時間點寫回主存,該時間是允許在其他CPU 讀取主存中相應的記憶體之前,當這裡的值被寫入主存之後,該快取行狀態變為 E
  • E-exclusive獨享
    快取行只被快取在該 CPU 的快取中,未被修改過,與主存中資料一致
    可在任何時刻當被其他 CPU讀取該記憶體時變成 S 態,被修改時變為 M態
  • S-shared共享
    該快取行可被多個 CPU 快取,與主存中資料一致
  • I-invalid無效

     

    img_4cc963828e5e31b7269d3ab6d616308e.png

  • 亂序執行優化
    處理器為提高運算速度而做出違背程式碼原有順序的優化

併發的優勢與風險

img_14fdcede9f93e14ffe4a2212012eb18b.png

3 專案準備

3.1 專案初始化

img_7f41eb1c85c398a17b0bbb5f71c40192.png

自定義4個基本註解

 

img_2b15780ea867ff3999c8c14c2c914501.png

隨手寫個測試類

 

img_30fd764d7e0ca25b691d930ca07fe332.png

執行正常

3.2 併發模擬-Jmeter壓測

img_19a23fb72c8aa30603ecaada9e1fb2b5.png

 

img_bd4f8ccc6ac65a7a36a41073ece18175.png

 

img_04225535e84f19a926c03ce7b9f230e6.png

新增"檢視結果數"和"圖形結果"監聽器

 

img_9bc5d2a417ca8aba3c73f156aa179122.png

log view 下當前日誌資訊

 

img_22bb26f19935a1013a98d9989c93feba.png

圖形結果

3.3 併發模擬-程式碼

CountDownLatch

img_a052c03b4956f607c6227e97b80b4017.png

可阻塞執行緒,並保證當滿足特定條件時可繼續執行

Semaphore(訊號量)

img_85f1bf413cf46899ba17120f316e4947.png

可阻塞執行緒,控制同一時間段內的併發量

 

以上二者通常和執行緒池搭配

下面開始做併發模擬

package com.mmall.concurrency;

import com.mmall.concurrency.annoations.NotThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * @author shishusheng
 * @date 18/4/1
 */
@Slf4j
@NotThreadSafe
public class ConcurrencyTest {

    /**
     * 請求總數
     */
    public static int clientTotal = 5000;

    /**
     * 同時併發執行的執行緒數
     */
    public static int threadTotal = 200;

    public static int count = 0;

    public static void main(String[] args) throws Exception {
        //定義執行緒池
        ExecutorService executorService = Executors.newCachedThreadPool();
        //定義訊號量,給出允許併發的執行緒數目
        final Semaphore semaphore = new Semaphore(threadTotal);
        //統計計數結果
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        //將請求放入執行緒池
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    //訊號量的獲取
                    semaphore.acquire();
                    add();
                    //釋放
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        //關閉執行緒池
        executorService.shutdown();
        log.info("count:{}", count);
    }

    /**
     * 統計方法
     */
    private static void add() {
        count++;
    }
}

執行發現結果隨機,所以非執行緒安全

4執行緒安全性

4.1 執行緒安全性

當多個執行緒訪問某個類時,不管執行時環境採用何種排程方式或者這些程序將如何交替執行,並且在主調程式碼中不需要任何額外的同步或協同,這個類都能表現出正確的行為,那麼就稱這個類是執行緒安全的

4.2 原子性

4.2.1 Atomic 包

  • AtomicXXX:CAS,Unsafe.compareAndSwapInt
    提供了互斥訪問,同一時刻只能有一個執行緒來對它進行操作
package com.mmall.concurrency.example.atomic;

import com.mmall.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @author shishusheng
 */
@Slf4j
@ThreadSafe
public class AtomicExample2 {

    /**
     * 請求總數
     */
    public static int clientTotal = 5000;

    /**
     * 同時併發執行的執行緒數
     */
    public static int threadTotal = 200;

    /**
     * 工作記憶體
     */
    public static AtomicLong count = new AtomicLong(0);

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    System.out.println();
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        //主記憶體
        log.info("count:{}", count.get());
    }
    
    private static void add() {
        count.incrementAndGet();
        // count.getAndIncrement();
    }
}
package com.mmall.concurrency.example.atomic;

import com.mmall.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicReference;

/**
 * @author shishusheng
 * @date 18/4/3
 */
@Slf4j
@ThreadSafe
public class AtomicExample4 {

    private static AtomicReference<Integer> count = new AtomicReference<>(0);

    public static void main(String[] args) {
        // 2
        count.compareAndSet(0, 2);
        // no
        count.compareAndSet(0, 1);
        // no
        count.compareAndSet(1, 3);
        // 4
        count.compareAndSet(2, 4);
        // no
        count.compareAndSet(3, 5); 
        log.info("count:{}", count.get());
    }
}

img_4a2e098012a53117062a83280f564d92.png

輸出結果

  • AtomicReference,AtomicReferenceFieldUpdater

     

    img_105da3257e9498be28f8a6279c9ce66b.png

  • AtomicBoolean

     

    img_44686cc93e68574d497a868f22221155.png

  • AtomicStampReference : CAS的 ABA 問題

4.2.2 鎖

synchronized:依賴 JVM

  • 修飾程式碼塊:大括號括起來的程式碼,作用於呼叫的物件
  • 修飾方法: 整個方法,作用於呼叫的物件

     

    img_fd09b1ff7f7947265baa0829cfc8c124.png

  • 修飾靜態方法:整個靜態方法,作用於所有物件

     

    img_61e0d1224f8d87021d63d4ded57cbf0f.png

package com.mmall.concurrency.example.count;

import com.mmall.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * @author shishusheng
 */
@Slf4j
@ThreadSafe
public class CountExample3 {

    /**
     * 請求總數
     */
    public static int clientTotal = 5000;

    /**
     * 同時併發執行的執行緒數
     */
    public static int threadTotal = 200;

    public static int count = 0;

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count);
    }

    private synchronized static void add() {
        count++;
    }
}

synchronized 修正計數類方法

  • 修飾類:括號括起來的部分,作用於所有物件
    子類繼承父類的被 synchronized 修飾方法時,是沒有 synchronized 修飾的!!!

Lock: 依賴特殊的 CPU 指令,程式碼實現

4.2.3 對比

  • synchronized: 不可中斷鎖,適合競爭不激烈,可讀性好
  • Lock: 可中斷鎖,多樣化同步,競爭激烈時能維持常態
  • Atomic: 競爭激烈時能維持常態,比Lock效能好; 只能同步一
    個值

4.3 可見性

一個執行緒對主記憶體的修改可以及時的被其他執行緒觀察到

4.3.1 導致共享變數線上程間不可見的原因

  • 執行緒交叉執行
  • 重排序結合線程交叉執行
  • 共享變數更新後的值沒有在工作記憶體與主存間及時更新

4.3.2 可見性之synchronized

JMM關於synchronized的規定

  • 執行緒解鎖前,必須把共享變數的最新值重新整理到主記憶體
  • 執行緒加鎖時,將清空工作記憶體中共享變數的值,從而使
    用共享變數時需要從主記憶體中重新讀取最新的值(加鎖與解鎖是同一把鎖)

4.3.3 可見性之volatile

通過加入記憶體屏障和禁止重排序優化來實現

  • 對volatile變數寫操作時,會在寫操作後加入一條store
    屏障指令,將本地記憶體中的共享變數值重新整理到主記憶體
  • 對volatile變數讀操作時,會在讀操作前加入一條load
    屏障指令,從主記憶體中讀取共享變數

     

    img_2cdfdbfd4000375f8f235ad3f1b94013.png

    volatile 寫

     

    img_bd78a8a91e1e053102ab133a050aab68.png

    volatile 讀

     

    img_f3a2eb25f50d07e4ef9560bb94c75b26.png

    計數類之 volatile 版,非執行緒安全的

  • volatile使用
volatile boolean inited = false;

//執行緒1:
context = loadContext();
inited= true;

// 執行緒2:
while( !inited ){
    sleep();
}
doSomethingWithConfig(context)

4.4 有序性

一個執行緒觀察其他執行緒中的指令執行順序,由於指令重排序的存在,該觀察結果一般雜亂無序

JMM允許編譯器和處理器對指令進行重排序,但是重排序過程不會影響到單執行緒程式的執行,卻會影響到多執行緒併發執行的正確性

4.4.1 happens-before 規則

5釋出物件

img_116a3ce151d7b1445ee82359d8511ce2.png

 

img_dad8bf8a283de3b6e8220623cdeff3ae.png

釋出物件

 

img_b22e279f3284398fc5a6c3c2460f8256.png

物件逸出

5.1 安全釋出物件

img_b7a5287e934475759fd485b6d1294c73.png

 

img_228f19af533783e91a39294f8c1681d6.png

非執行緒安全的懶漢模式

 

img_e8d0abedbd4bed33cdc5892a9b2e1255.png

餓漢模式

 

img_656ed63b20b576829c82f33e9d11be26.png

執行緒安全的懶漢模式

package com.mmall.concurrency.example.singleton;

import com.mmall.concurrency.annoations.NotThreadSafe;

/**
 * 懶漢模式 -》 雙重同步鎖單例模式
 * 單例例項在第一次使用時進行建立
 * @author shishusheng
 */
@NotThreadSafe
public class SingletonExample4 {

    /**
     * 私有建構函式
     */
    private SingletonExample4() {

    }

    // 1、memory = allocate() 分配物件的記憶體空間
    // 2、ctorInstance() 初始化物件
    // 3、instance = memory 設定instance指向剛分配的記憶體

    // JVM和cpu優化,發生了指令重排

    // 1、memory = allocate() 分配物件的記憶體空間
    // 3、instance = memory 設定instance指向剛分配的記憶體
    // 2、ctorInstance() 初始化物件

    /**
     * 單例物件
     */
    private static SingletonExample4 instance = null;

    /**
     * 靜態的工廠方法
     *
     * @return
     */
    public static SingletonExample4 getInstance() {
        // 雙重檢測機制 // B
        if (instance == null) {        
            // 同步鎖
            synchronized (SingletonExample4.class) { 
                if (instance == null) {
                    // A - 3
                    instance = new SingletonExample4(); 
                }
            }
        }
        return instance;
    }
}

img_6783fda31724b08ebfebb3fae7f976d1.png

 

img_e72422cce9a1b0bac80a2a8dc053b040.png

7 AQS

7.1 介紹

img_b1e2effacc72845c5a33887343fe1799.png

資料結構

  • 使用Node實現FIFO佇列,可以用於構建鎖或者其他同步裝置的基礎框架
  • 利用了一個int型別表示狀態
  • 使用方法是繼承
  • 子類通過繼承並通過實現它的方法管理其狀態{acquire 和release} 的方法操縱狀態
  • 可以同時實現排它鎖和共享鎖模式(獨佔、共享)
    同步元件

CountDownLatch

package com.mmall.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author shishusheng
 */
@Slf4j
public class CountDownLatchExample1 {

    private final static int threadCount = 200;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    test(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await();
        log.info("finish");
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        Thread.sleep(100);
        log.info("{}", threadNum);
        Thread.sleep(100);
    }
}
package com.mmall.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* 指定時間內處理任務
* 
* @author shishusheng 
* 
*/
@Slf4j
public class CountDownLatchExample2 {

   private final static int threadCount = 200;

   public static void main(String[] args) throws Exception {

       ExecutorService exec = Executors.newCachedThreadPool();

       final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

       for (int i = 0; i < threadCount; i++) {
           final int threadNum = i;
           exec.execute(() -> {
               try {
                   test(threadNum);
               } catch (Exception e) {
                   log.error("exception", e);
               } finally {
                   countDownLatch.countDown();
               }
           });
       }
       countDownLatch.await(10, TimeUnit.MILLISECONDS);
       log.info("finish");
       exec.shutdown();
   }

   private static void test(int threadNum) throws Exception {
       Thread.sleep(100);
       log.info("{}", threadNum);
   }
}

Semaphore用法

img_08ae05f9ab20e3be8c49e6896f01bf67.png

 

img_775f52f3aaaf93513123aa3d389aabce.png

 

img_8605a01d8b526f8b47e5f1c4a0fb4f63.png

CycliBarrier

package com.mmall.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author shishusheng
 */
@Slf4j
public class CyclicBarrierExample1 {

    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready", threadNum);
        barrier.await();
        log.info("{} continue", threadNum);
    }
}

img_b052be420ba97107c8d06e5996c05178.png

package com.mmall.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @author shishusheng
 */
@Slf4j
public class CyclicBarrierExample2 {

    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready", threadNum);
        try {
            barrier.await(2000, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            log.warn("BarrierException", e);
        }
        log.info("{} continue", threadNum);
    }
}

img_6b9ed55f45a6e7ae1dc041091aa3a991.png

await 超時導致程式拋異常

package com.mmall.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
 * @author shishusheng
 */
@Slf4j
public class SemaphoreExample3 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    // 嘗試獲取一個許可
                    if (semaphore.tryAcquire()) {
                        test(threadNum);
                        // 釋放一個許可
                        semaphore.release();
                    }
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        log.info("{}", threadNum);
        Thread.sleep(1000);
    }


}

9 執行緒池

9.1 newCachedThreadPool

img_313819e24829e7a3396a87ecb12f4372.png

9.2 newFixedThreadPool

img_ddb0658cb68e4ad08eecb2286d4c7eab.png

9.3 newSingleThreadExecutor

看出是順序執行的

 

img_4c407b7683773c00490b13503a30483a.png

9.4 newScheduledThreadPool

img_2bf45c51497b92ac3caa6f91db355bdc.png

 

img_2f6cfe21f1ccd601d7123fff534abcf0.png

10 死鎖

img_47ebbc8a293f93eab409c602908791bd.png

 

img_9d0de0f2ad83f9d9578798d6e8f37752.png

版權宣告:本文內容由網際網路使用者自發貢獻,版權歸作者所有,本社群不擁有所有權,也不承擔相關法律責任。如果您發現本社群中有涉嫌抄襲的內容,歡迎傳送郵件至:[email protected] 進行舉報,並提供相關證據,一經查實,本社群將立刻刪除涉嫌侵權內容。

【雲棲快訊】阿里開發者們的第3個感悟:從身邊開源開始學習,用過才能更好理解程式碼  詳情請點選