1. 程式人生 > >java高併發(六)執行緒安全性

java高併發(六)執行緒安全性

執行緒安全性體現在以下三個方面:

  1.  原子性:提供了互斥訪問,同一時刻只能有一個執行緒來對它進行操作。
  2. 可見性:一個執行緒對主記憶體的修改可以及時的被其他執行緒觀察到。
  3. 有序性:一個執行緒觀察其他執行緒中的指令執行順序,由於指令重排序的存在,該觀察結果一般雜亂無序。

原子性 Atomic包

新建一個測試類,內容如下:

@Slf4j
@ThreadSafe
public class CountExample2 {

    // 請求總數
    public static int clientTotal = 5000;

    // 同時併發執行的執行緒數
    public static int threadTotal = 200;

    // 工作記憶體
    public static AtomicInteger count = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {
        //執行緒池
        ExecutorService executorService = Executors.newCachedThreadPool();
        //定義訊號量
        final Semaphore semaphore = new Semaphore(threadTotal);
        //定義計數器
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for(int i = 0; i < clientTotal; i++) {
            executorService.execute(() ->{
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (InterruptedException e) {

                    log.error("exception", e);
                }
                countDownLatch.countDown();

            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count.get());
    }

    public static void add() {
        count.incrementAndGet();
    }

}

使用了AtomicInteger類,這個類的incrementAndGet方法底層使用的unsafe.getAndAddInt(this, valueOffset, 1) + 1;方法,而底層使用了this.compareAndSwapInt方法。這個compareAndSwapInt方法(CAS)是用當前值與主記憶體的值進行對比,如果值相等則進行相應的操作。

count變數就是工作記憶體,它與主記憶體中的資料不一定是一樣的,因此需要做同步操作才可以。 

AtomicLong與LongAdder

我們將上面的count用AtomicLong來修飾,同樣可以輸出正確的效果:

public static AtomicLong count = new AtomicLong(0);

我們為什麼要單獨說一下AtomicLong?因為JDK8中新增了一個類,與AtomicLong十分像,即LongAdder類。將上面的程式碼用LongAdder實現一下:

public static LongAdder count = new LongAdder();

public static void add() {
        count.increment();
    }

log.info("count:{}", count);

同樣也可以輸出正確的結果。

為什麼有了AtomicLong後還要新增一個LongAdder?

原因是AtomicLong底層使用CAS來保持同步,是在一個死迴圈內不斷嘗試比較值,當工作記憶體與主記憶體資料一致的情況下才執行後續操作,競爭不激烈的時候成功機率高,競爭激烈時也就是併發量高時效能就會降低。對於Long和Double變數來說,jvm會將64位的Long或Double變數的讀寫操作拆分成兩個32位的讀寫操作。因此實際使用過程中可以優先使用LongAdder,而不是繼續使用AtomicLong,當競爭比較低的時候可以繼續使用AtomicLong。

檢視atomic包:

AtomicReference和AtomicInteger非常類似,不同之處就在於AtomicInteger是對整數的封裝,底層採用的是compareAndSwapInt實現CAS,比較的是數值是否相等,而AtomicReference則對應普通的物件引用,底層使用的是compareAndSwapObject實現CAS,比較的是兩個物件的地址是否相等。也就是它可以保證你在修改物件引用時的執行緒安全性。

AtomicIntegerFieldUpdater

原子性更新某個類的例項的某個欄位的值,並且這個欄位必須用volatile關鍵字修飾同時不能是static修飾的。

private static AtomicIntegerFieldUpdater<AtomicExample5> updater = AtomicIntegerFieldUpdater.newUpdater(AtomicExample5.class, "count");
    @Getter
    public volatile int count = 100;

    public static void main(String[] args) {
        private AtomicExample5 example5 = new AtomicExample5();
        if (updater.compareAndSet(example5, 100, 120)){
            log.info("update success 1, {}", example5.getCount());
        }
        if(updater.compareAndSet(example5, 100, 120)){
            log.info("update success 2 ,{}", example5.getCount());
        }else {
            log.info("update failed, {}", example5.getCount());
        }
    }

AtomicStampReference:CAS的ABA問題

ABA問題是:在CAS操作的時候,其他執行緒將變數的值A改成了B,隨後又改成了A,CAS就會被誤導。所以ABA問題的解決思路就是將版本號加一,當一個變數被修改,那麼這個變數的版本號就增加1,從而解決ABA問題。

AtomicBoolean

@Slf4j
@ThreadSafe
public class AtomicExample6 {

    private static AtomicBoolean isHappened = new AtomicBoolean(false);

    // 請求總數
    public static int clientTotal = 5000;

    // 同時併發執行的執行緒數
    public static int threadTotal = 200;

    public static void main(String[] args) throws InterruptedException {
        //執行緒池
        ExecutorService executorService = Executors.newCachedThreadPool();
        //定義訊號量
        final Semaphore semaphore = new Semaphore(threadTotal);
        //定義計數器
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for(int i = 0; i < clientTotal; i++) {
            executorService.execute(() ->{
                try {
                    semaphore.acquire();
                    test();
                    semaphore.release();
                } catch (InterruptedException e) {

                    log.error("exception", e);
                }
                countDownLatch.countDown();

            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("isHappened:{}", isHappened);
    }

    private static void test() {
        if (isHappened.compareAndSet(false, true)){
            log.info("excute");
        }
    }

}

這段程式碼test()方法只會被執行5000次而進入log.info("excute")只會被執行一次,因為isHappened變數執行一次之後就變為true了。

這個方法可以保證變數isHappened從false變成true只會執行一次。

這個例子可以解決讓一段程式碼只執行一次絕對不會重複。

原子性 鎖

  • synchronized:synchronized關鍵字主要是依賴JVM實現鎖,因此在這個關鍵字作用物件的作用範圍內都是同一時刻只能有一個執行緒可以進行操作的。
  • lock:依賴特殊的CPU指令,實現類中比較有代表性的是ReentrantLock。

synchronized同步鎖

修飾的物件主要有一下四種:

  • 修飾程式碼塊:大括號括起來的程式碼,作用於呼叫的物件
  • 修飾方法:整個方法,作用於呼叫的物件
  • 修飾靜態方法:整個靜態方法,作用於這個類的所有物件
  • 修飾類:括號括起來的部分,作用於所有物件

修飾程式碼塊和方法

舉例如下:

@Slf4j
public class SynchronizedExample1 {

    /**
     * 修飾一個程式碼塊,被修飾的程式碼稱為同步語句塊,作用範圍是大括號括起來的程式碼,作用的物件是呼叫程式碼的物件
     */
    public void test1() {
        synchronized (this) {
            for (int i = 0; i < 10; i++) {
                log.info("test1 - {}", i);
            }
        }
    }
    public static void main(String[] args) {
        SynchronizedExample1 synchronizedExample1 = new SynchronizedExample1();
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(() -> {
            synchronizedExample1.test1();
        });
        executorService.execute(() -> {
            synchronizedExample1.test1();
        });
    }

}

為什麼我們要使用執行緒池?如果不使用執行緒池的話,兩次呼叫了同一個方法,本身就是同步執行的,因此是無法驗證具體的影響,而我們加上執行緒池之後,相當於分別啟動了兩個執行緒去執行方法。

輸出結果是連續輸出兩遍test1 0-9。

如果使用synchronized修飾方法:

    /**
     * 修飾一個方法,被修飾的方法稱為同步方法,作用範圍是整個方法,作用的物件是呼叫方法的物件
     */
    public synchronized void test2() {
        for (int i = 0; i < 10; i++) {
            log.info("test2 - {}", i);
        }
    }

輸出結果跟上面一樣,是正確的。

接下來換不同的物件,然後亂序輸出,因為同步程式碼塊和同步方法作用物件是呼叫物件,因此使用兩個不同的物件呼叫不同的同步程式碼塊互相是不影響的,如果我們使用執行緒池,example1的test1方法和example2的test1方法是交叉執行的,而不是example1的test1執行完然後再執行example2的test1,程式碼如下:

 /**
     * 修飾一個程式碼塊,被修飾的程式碼稱為同步語句塊,作用範圍是大括號括起來的程式碼,作用的物件是呼叫程式碼的物件
     */
    public void test1(int flag) {
        synchronized (this) {
            for (int i = 0; i < 10; i++) {
                log.info("test1 - {}, {}", flag, i);
            }
        }
    }

    public static void main(String[] args) {
        SynchronizedExample1 synchronizedExample1 = new SynchronizedExample1();
        SynchronizedExample1 synchronizedExample2 = new SynchronizedExample1();
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(() -> {
            synchronizedExample1.test1(1);
        });
        executorService.execute(() -> {
            synchronizedExample2.test1(2);
        });
    }

因此同步程式碼塊作用於當前物件,不同調用物件之間是互相不影響的。

接下來測試同步方法:

/**
     * 修飾一個方法,被修飾的方法稱為同步方法,作用範圍是整個方法,作用的物件是呼叫方法的物件
     */
    public synchronized void test2(int flag) {
        for (int i = 0; i < 10; i++) {
            log.info("test2 - {}, {}", flag, i);
        }
    }

    public static void main(String[] args) {
        SynchronizedExample1 synchronizedExample1 = new SynchronizedExample1();
        SynchronizedExample1 synchronizedExample2 = new SynchronizedExample1();
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(() -> {
            synchronizedExample1.test2(1);
        });
        executorService.execute(() -> {
            synchronizedExample2.test2(2);
        });
    }

如果一個方法內部是一個完整的同步程式碼塊,就像上面的test1方法一樣,那麼它和用synchronized修飾的方法效果是等同的。

同時需要注意的synchronized修飾是無法繼承給子類的方法。

修飾靜態方法和類

我們先測試修飾靜態方法:

 /**
     * 修飾一個靜態方法,被修飾的方法稱為同步方法,作用範圍是整個方法,作用的物件是呼叫方法的物件
     */
    public static synchronized void test2(int flag) {
        for (int i = 0; i < 10; i++) {
            log.info("test2 - {}, {}", flag, i);
        }
    }

    public static void main(String[] args) {
        SynchronizedExample2 synchronizedExample1 = new SynchronizedExample2();
        SynchronizedExample2 synchronizedExample2 = new SynchronizedExample2();
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(() -> {
            synchronizedExample1.test2(1);
        });
        executorService.execute(() -> {
            synchronizedExample2.test2(2);
        });
    }

修飾一個靜態方法作用於這個類的所有物件。因此我們使用不同的物件呼叫synchronized修飾的靜態方法時,同一時間只有一個執行緒在執行。因此上面的執行結果是:

11:31:37.447 [pool-1-thread-1] INFO com.vincent.example.sync.SynchronizedExample2 - test2 - 1, 0
11:31:37.451 [pool-1-thread-1] INFO com.vincent.example.sync.SynchronizedExample2 - test2 - 1, 1
11:31:37.451 [pool-1-thread-1] INFO com.vincent.example.sync.SynchronizedExample2 - test2 - 1, 2
11:31:37.451 [pool-1-thread-1] INFO com.vincent.example.sync.SynchronizedExample2 - test2 - 1, 3
11:31:37.451 [pool-1-thread-1] INFO com.vincent.example.sync.SynchronizedExample2 - test2 - 1, 4
11:31:37.451 [pool-1-thread-1] INFO com.vincent.example.sync.SynchronizedExample2 - test2 - 1, 5
11:31:37.451 [pool-1-thread-1] INFO com.vincent.example.sync.SynchronizedExample2 - test2 - 1, 6
11:31:37.451 [pool-1-thread-1] INFO com.vincent.example.sync.SynchronizedExample2 - test2 - 1, 7
11:31:37.451 [pool-1-thread-1] INFO com.vincent.example.sync.SynchronizedExample2 - test2 - 1, 8
11:31:37.451 [pool-1-thread-1] INFO com.vincent.example.sync.SynchronizedExample2 - test2 - 1, 9
11:31:37.451 [pool-1-thread-2] INFO com.vincent.example.sync.SynchronizedExample2 - test2 - 2, 0
11:31:37.451 [pool-1-thread-2] INFO com.vincent.example.sync.SynchronizedExample2 - test2 - 2, 1
11:31:37.451 [pool-1-thread-2] INFO com.vincent.example.sync.SynchronizedExample2 - test2 - 2, 2
11:31:37.451 [pool-1-thread-2] INFO com.vincent.example.sync.SynchronizedExample2 - test2 - 2, 3
11:31:37.451 [pool-1-thread-2] INFO com.vincent.example.sync.SynchronizedExample2 - test2 - 2, 4
11:31:37.451 [pool-1-thread-2] INFO com.vincent.example.sync.SynchronizedExample2 - test2 - 2, 5
11:31:37.451 [pool-1-thread-2] INFO com.vincent.example.sync.SynchronizedExample2 - test2 - 2, 6
11:31:37.451 [pool-1-thread-2] INFO com.vincent.example.sync.SynchronizedExample2 - test2 - 2, 7
11:31:37.451 [pool-1-thread-2] INFO com.vincent.example.sync.SynchronizedExample2 - test2 - 2, 8
11:31:37.451 [pool-1-thread-2] INFO com.vincent.example.sync.SynchronizedExample2 - test2 - 2, 9

他們不會交替執行。

然後呼叫修飾類的:

/**
     * 修飾一個類,被修飾的程式碼稱為同步語句塊,作用範圍是大括號括起來的程式碼,作用的物件是呼叫程式碼的物件
     */
    public static void test1(int flag) {
        synchronized (SynchronizedExample2.class) {
            for (int i = 0; i < 10; i++) {
                log.info("test1 - {}, {}", flag, i);
            }
        }
    }
    public static void main(String[] args) {
        SynchronizedExample2 synchronizedExample1 = new SynchronizedExample2();
        SynchronizedExample2 synchronizedExample2 = new SynchronizedExample2();
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(() -> {
            synchronizedExample1.test1(1);
        });
        executorService.execute(() -> {
            synchronizedExample2.test1(2);
        });
    }

執行結果跟上面是一致的。

同樣的如果一個方法內部被synchronized修飾的一個類是一個完整的同步程式碼塊,就像上面的test1方法一樣,那麼它和用synchronized修飾的靜態方法效果是等同的。

synchronized:不可中斷鎖,適合競爭不激烈,可讀性好。

lock:可中斷鎖,多樣化同步,競爭激烈時能維持常態。

Atomic:競爭激烈時能維持常態,比lock效能好;只能同步一個值。

可見性

可見性是指執行緒對主記憶體的修改可以及時的被其他執行緒觀察到。說起可見性,我們常常去向什麼時候不可見,下面介紹一下共享變數線上程間不可見的原因。

  • 執行緒交叉執行
  • 重排序結合線程交叉執行
  • 共享變數更新後的值沒有在工作記憶體與主記憶體間及時更新。

JMM關於synchronized的兩條規定:

  • 執行緒解鎖前,必須把共享變數的最新值重新整理到主記憶體。
  • 執行緒加鎖時,將清空工作記憶體中共享變數的值,從而使用共享變數時需要從主記憶體中重新讀取最新的值(注意,加鎖與解鎖是同一把鎖) 

 可見性 - volatile

通過加入記憶體屏障和禁止重排序優化來實現:

  • 對volatile變數寫操作時,會在寫操作後加入一條store屏障命令,將本地記憶體中的共享變數值重新整理到主記憶體。
  • 對volatile變數讀操作時,會在讀操作前加入一條load屏障指令,從主記憶體中讀取共享變數。

volatile變數在每次對執行緒訪問時,都強迫從主記憶體中讀取該變數的值,而當該變數發生改變時,又會強迫執行緒將最新的值重新整理到主記憶體,這樣任何時候不同的執行緒總能看到該變數的最新值。 下面舉例說明:

@Slf4j
@NotThreadSafe
public class CountExample4 {

    // 請求總數
    public static int clientTotal = 5000;

    // 同時併發執行的執行緒數
    public static int threadTotal = 200;

    public static volatile int count = 0;

    public static void main(String[] args) throws InterruptedException {
        //執行緒池
        ExecutorService executorService = Executors.newCachedThreadPool();
        //定義訊號量
        final Semaphore semaphore = new Semaphore(threadTotal);
        //定義計數器
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for(int i = 0; i < clientTotal; i++) {
            executorService.execute(() ->{
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (InterruptedException e) {

                    log.error("exception", e);
                }
                countDownLatch.countDown();

            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count);
    }

    public static void add() {

        // 使用volatile修飾,可以保證count是主記憶體中的值
        count++;
    }

}

執行結果依然無法保證執行緒安全。為什麼呢?

原因是當我們執行count++的時候呢,它其實是分了三步,1.從主記憶體中取出count值,這時的count值是最新的,2給count執行+1操作,3.將count值寫回主記憶體。當多執行緒同時讀取到count的值並且給count值+1,這樣就會出現執行緒不安全的情況。

因此通過使用volatile修飾變數不是執行緒安全的。同時也說明volatile不具有原子性。

既然volatile不適合計數的場景,那麼適合什麼場景呢?

通常來說使用volatile必須具備 對變數的寫操作不依賴與當前值。

原子性 有序性

java記憶體模型中,允許編譯器和處理器對指令進行重排序,但是重排序過程不會影響到單執行緒程式的執行,卻會影響到多執行緒併發執行的正確性。

有序性 - happens-before原則

  • 程式次序規則:一個執行緒內,按照程式碼順序,書寫在前面的操作先行發生於書寫後面的操作。對於程式次序規則來說,一段程式程式碼的執行在單個執行緒中看起來是有序的(注意,雖然在這條規則中提到書寫在前面的操作先行發生於書寫後面的操作,這是程式看起來執行順序是按照程式碼書寫的順序執行的,而虛擬機器會對程式程式碼進行指令重排序,雖然進行了重排序,但是最終執行的結果是與程式順序執行的結果是一樣的, 它只會對不存在資料依賴行的指令進行重排序,因此在單個執行緒中,程式看起來是有序執行的),事實上這個規則是用來保證程式在單執行緒中執行結果的正確性。但無法保證程式在多執行緒中執行的正確性。
  • 鎖定規則:一個unlock操作先行發生於後面對同一個鎖的lock操作。也就是說無論在單執行緒中還是多執行緒中,同一個鎖如果處於被鎖定狀態,那麼必須先對鎖進行釋放操作,後面才能繼續進行lock操作。
  • volatile變數規則:對一個變數的寫操作先行發生於後面對這個變數的讀操作。如果一個執行緒先去寫一個變數,然後一個執行緒進行讀取,那麼寫入操作肯定先行發生於讀操作。
  • 傳遞規則:如果操作A先行發生於操作B,而操作B又先行發生於操作C,則可以得出操作A先行發生於操作C。
  • 執行緒啟動原則:Thread物件的start()方法先行發生於此執行緒的每一個動作。
  • 執行緒中斷規則:對執行緒interrupt()方法的呼叫先行發生於被中斷執行緒的程式碼檢測到中斷事件的發生。
  • 執行緒終結規則:執行緒中所有的操作都先行發生於執行緒的終止檢測,我們可以通過Thread.join()方法結束、Thread.isAlive()的返回值手段檢測到執行緒已經終止執行。
  • 物件終結規則:一個物件的初始化完成先行發生於他的finalize()方