1. 程式人生 > >Java 多執行緒併發程式設計

Java 多執行緒併發程式設計

導讀

  創作不易,禁止轉載!

併發程式設計簡介

發展歷程

  早起計算機,從頭到尾執行一個程式,這樣就嚴重造成資源的浪費。然後作業系統就出現了,計算機能執行多個程式,不同的程式在不同的單獨的程序中執行,一個程序,有多個執行緒,提高資源的利用率。ok,如果以上你還不瞭解的話,我這裡有2個腦補連結(點我直達1、點我直達2)

簡介(百度百科)

  所謂併發程式設計是指在一臺處理器上“同時”處理多個任務。併發是在同一實體上的多個事件。多個事件在同一時間間隔發生。

目標(百度百科)

  併發程式設計的目標是充分的利用處理器的每一個核,以達到最高的處理效能。

序列與並行的區別

  可能這個栗子不是很恰當,仁者見仁智者見智。智者get到點,愚者咬文爵字,啊!你這個栗子不行,不切合實際,巴拉巴拉 .....為啥加起來是2小時6分鐘,吃飯不要時間麼(洗衣服:把要洗的衣服塞到洗衣機,包括倒洗衣液等等3分鐘;做飯:同理),你大爺的,吃飯的時候不能看電影嘛。好了,請出門右轉,這裡不歡迎槓精,走之前把門關上!!!通過這個栗子,可以看出做相同的事情,所花費的時間不同(這就是為啥工作中,每個人的工作效率有高低了叭)。

什麼時候適合併發程式設計

  1. 任務阻塞執行緒,導致之後的程式碼不能執行:一邊從檔案中讀取,一邊進行大量計算
  2. 任務執行時間過長,可以瓜分為分工明確的子任務:分段下載檔案
  3. 任務間斷性執行:日誌列印
  4. 任務協作執行:生產者消費者問題

併發程式設計中的上下文切換

  以下內容,百度百科原話(點我直達)。

  上下文切換指的是核心(作業系統的核心)在CPU上對程序或者執行緒進行切換。上下文切換過程中的資訊被儲存在程序控制塊(PCB-Process Control Block)中。PCB又被稱作切換楨(SwitchFrame)。上下文切換的資訊會一直被儲存在CPU的記憶體中,直到被再次使用。

  上下文切換 (context switch) , 其實際含義是任務切換, 或者CPU暫存器切換。當多工核心決定執行另外的任務時, 它儲存正在執行任務的當前狀態, 也就是CPU暫存器中的全部內容。這些內容被儲存在任務自己的堆疊中, 入棧工作完成後就把下一個將要執行的任務的當前狀況從該任務的棧中重新裝入CPU暫存器, 並開始下一個任務的執行, 這一過程就是context switch。 每個任務都是整個應用的一部分, 都被賦予一定的優先順序, 有自己的一套CPU暫存器和棧空間。

  最重要的一句話:上下文頻繁的切換,會帶來一定的效能開銷。

減少上下文切換開銷方法

  • 無鎖併發程式設計
    • 多執行緒競爭鎖時,會引起上下文切換,所以多個執行緒處理資料時,可以用一些辦法來避免使用鎖,如將資料的ID按照Hash演算法取模分段,不同的執行緒處理不同段的資料
  • CAS
    • Java的Atomic包使用CAS演算法來更新資料,而不需要加鎖
  • 控制執行緒數
    • 避免建立過多不需要的執行緒,當任務少的時候,但是建立很多執行緒來處理,這樣會造成大量執行緒都處於等待狀態
  • 協程(GO語言)
    • 在單執行緒裡實現多工的排程,並在單執行緒裡維持多個任務間的切換。

知乎上,有個人寫的不錯,推薦給大家:點我直達

死鎖(程式碼演示)

  第一次執行,沒有發生死鎖,第二次執行時,先讓執行緒A睡眠50毫秒,程式一直卡著不動,發生死鎖。你不讓我,我不讓你,爭奪YB_B的資源。

檢視死鎖(在重要不過啦)(jdk提供的一些工具)

  1. 命令列工具:jps
  2. 檢視堆疊:jstack pid
  3. 視覺化工具:jconsole

jps&jstack

分析

jconsole

  控制檯輸入:jconsole,然後按照gif,看執行緒->檢測死鎖

程式碼拷貝區

package com.yb.thread;

/**
 * @ClassName:DeadLockDemo
 * @Description:死鎖程式碼演示
 * @Author:chenyb
 * @Date:2020/9/7 10:23 下午
 * @Versiion:1.0
 */
public class DeadLockDemo {
    private static final Object YB_A=new Object();
    private static final Object YB_B=new Object();

    public static void main(String[] args) {
        new Thread(()->{
            synchronized (YB_A){
                try {
                    //讓執行緒睡眠50毫秒
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (YB_B){
                    System.out.println("執行緒-AAAAAAAAAAAAA");
                }
            }
        }).start();
        new Thread(()->{
            synchronized (YB_B){
                synchronized (YB_A){
                    System.out.println("執行緒-BBBBBBBBBBBBB");
                }
            }
        }).start();
    }
}

執行緒基礎

程序與執行緒的區別

  程序:是系統進行分配和管理資源的基本單位

  執行緒:程序的一個執行單元,是程序內排程的實體、是CPU排程和分派的基本單位,是比程序更小的獨立執行的基本單位。執行緒也被稱為輕量級程序,執行緒是程式執行的最小單位。

  一個程式至少一個程序,一個程序至少一個執行緒。

執行緒的狀態(列舉)

  • 初始化(NEW)
    • 新建了一個執行緒物件,但還沒有呼叫start()方法
  • 執行(RUNNABLE)
    • 處於可執行狀態的執行緒正在JVM中執行,但他可能正在等待來自作業系統的其他資源
  • 阻塞(BLOCKED)
    • 執行緒阻塞與synchronized鎖,等待獲取synchronized鎖的狀態
  • 等待(WAITING)
    • Object.wait()、join()、LockSupport.part(),進入該狀態的執行緒需要等待其他執行緒做出一些特定動作(通知|中斷)
  • 超時等待(TIME_WAITING)
    • Object.wait(long)、Thread.join()、LockSupport.parkNanos()、LockSupport.parkUntil,該狀態不同於WAITING
  • 終止(TERMINATED)
    • 該執行緒已經執行完畢

建立執行緒

方式一

方式二(推薦)

好處

  1. java只能單繼承,但是介面可以繼承多個
  2. 增加程式的健壯性,程式碼可以共享

注意事項

方式三(匿名內部類)

方式四(Lambada)

方式五(執行緒池)

  注意:程式還未關閉!!!!

執行緒的掛起與恢復

方式一(不推薦)

  不推薦使用,會造成死鎖~

方式二(推薦)

wait():暫停執行,放棄已獲得的鎖,進入等待狀態

notify():隨機喚醒一個在等待鎖的執行緒

notifyAll():喚醒所有在等待鎖的執行緒,自行搶佔CPU資源

執行緒的中斷

方式一(不推薦)

  注意:使用stop()可以中斷執行緒,但是會帶來執行緒不安全問題(stop被呼叫,執行緒立刻停止),理論上numA和numB都是1,結果numB=0;還是沒搞明白的,給你個眼神,自己體會~

方式二(推薦)

方式三(更推薦)

執行緒優先順序

  執行緒的優先順序告訴程式該執行緒的重要程度有多大。如果有大量執行緒都被阻塞,都在等候執行,程式會盡可能地先執行優先順序的那個執行緒。但是,這並不表示優先順序較低的執行緒不會執行。若執行緒的優先順序較低,只不過表示它被准許的機會小一些而已。

執行緒的優先順序

  1. 最小=1
  2. 最大=10
  3. 預設=5

驗證

  可以看出,列印執行緒2的機率比較大,因為執行緒優先順序高。執行緒優先順序,推薦使用(不同平臺對執行緒的優先順序支援不同):1、5、10

守護執行緒(不建議使用)

  任何一個守護執行緒都是整個程式中所有使用者執行緒的守護者,只要有活著的使用者執行緒,守護執行緒就活著。

執行緒安全性

synchronized

點我直達

修改普通方法:鎖住物件的例項

修飾靜態方法:鎖住整個類

修改程式碼塊:鎖住一個物件synchronized (lock)

volatile

  僅能修飾變數,保證該物件的可見性(多執行緒共享的變數),不保證原子性。

用途

  1. 執行緒開關
  2. 單例修改物件的例項

lock的使用

lock與synchronized區別

  lock:需要手動設定加鎖和釋放鎖

  synchronized:託管給jvm執行

檢視lock的實現類有哪些

多執行緒下除錯

  注意看圖,執行緒1、2、3的狀態:Runnable|wailting,還沒get到點的話,你真的要反思一下了

讀寫鎖

  讀寫互斥、寫寫互斥、讀讀不互斥

  如果要想debug除錯檢視效果,可開2個執行緒,一個自增,一個輸出

package com.yb.thread.lock;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * @ClassName:ReentrantReadWriteLockDemo
 * @Description:讀寫鎖
 * @Author:chenyb
 * @Date:2020/9/26 3:14 下午
 * @Versiion:1.0
 */
public class ReentrantReadWriteLockDemo {
    private int num_1 = 0;
    private int num_2 = 0;
    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    //讀鎖
    private Lock readLock = lock.readLock();
    //寫鎖
    private Lock writeLock = lock.writeLock();

    public void out() {
        readLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "num1====>" + num_1 + ";num_2======>" + num_2);
        } finally {
            readLock.unlock();
        }
    }

    public void inCreate() {
        writeLock.lock();
        try {
            num_1++;
            try {
                Thread.sleep(500L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            num_2++;
        } finally {
            writeLock.unlock();
        }
    }

    public static void main(String[] args) {
        ReentrantReadWriteLockDemo rd = new ReentrantReadWriteLockDemo();
//        for(int x=0;x<3;x++){
//            new Thread(()->{
//                rd.inCreate();
//                rd.out();
//            }).start();
//        }

        //=========讀寫互斥
        new Thread(() -> {
            rd.inCreate();
        }, "寫").start();
        new Thread(() -> {
            rd.out();
        }, "讀").start();

        //========寫寫互斥
        new Thread(() -> {
            rd.inCreate();
        }, "寫1").start();
        new Thread(() -> {
            rd.inCreate();
        }, "寫2").start();

        //==========讀讀不互斥
        new Thread(() -> {
            rd.out();
        }, "讀1").start();
        new Thread(() -> {
            rd.out();
        }, "讀2").start();
    }
}

鎖降級

  寫執行緒獲取寫鎖後可以獲取讀鎖,然後釋放寫鎖,這樣寫鎖變成了讀鎖,從而實現鎖降級。

  注:鎖降級之後,寫鎖不會直接降級成讀鎖,不會隨著讀鎖的釋放而釋放,因此要顯示地釋放寫鎖。

用途

  用於對資料比較敏感,需要在對資料修改之後,獲取到修改後的值,並進行接下來的其他操作。理論上已經會輸入依據:“num=1”,實際多執行緒下沒輸出,此時可以用鎖降級解決。給你個眼神,自己體會

package com.yb.thread.lock;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * @ClassName:LockDegradeDemo
 * @Description:鎖降級demo
 * @Author:chenyb
 * @Date:2020/9/26 10:53 下午
 * @Versiion:1.0
 */
public class LockDegradeDemo {
    private int num = 0;
    //讀寫鎖
    private ReentrantReadWriteLock readWriteLOck = new ReentrantReadWriteLock();
    Lock readLock = readWriteLOck.readLock();
    Lock writeLock = readWriteLOck.writeLock();

    public void doSomething() {
        //寫鎖
        writeLock.lock();
        //讀鎖
        readLock.lock();
        try {
            num++;
        } finally {
            //釋放寫鎖
             writeLock.unlock();
        }
        //模擬其他複雜操作
        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            if (num == 1) {
                System.out.println("num=" + num);
            } else {
                System.out.println(num);
            }
        } finally {
            //釋放度鎖
             readLock.unlock();
        }
    }

    public static void main(String[] args) {
        LockDegradeDemo ld = new LockDegradeDemo();
        for (int i = 0; i < 4; i++) {
            new Thread(() -> {
                ld.doSomething();
            }).start();
        }
    }
}

鎖升級?

  注:從圖可以看出,執行緒卡著,驗證不存在先讀後寫,從而不存在鎖升級這種說法

StampedLock鎖

簡介

  一般應用,都是讀多寫少,ReentrantReadWriteLock,因為讀寫互斥,所以讀時阻塞寫,效能提不上去。可能會使寫執行緒飢餓

特點

  1. 不可重入:一個執行緒已經持有寫鎖,再去獲取寫鎖的話,就會造成死鎖
  2. 支援鎖升級、降級
  3. 可以樂觀讀也可以悲觀讀
  4. 使用有限次自旋,增加鎖獲得的機率,避免上下文切換帶來的開銷,樂觀讀不阻塞寫操作,悲觀讀,阻塞寫

優點

  相比於ReentrantReadWriteLock,吞吐量大幅提升

缺點

  1. api複雜,容易用錯
  2. 實現原理相比於ReentrantReadWriteLock複雜的多

demo

package com.yb.thread.lock;

import java.util.concurrent.locks.StampedLock;

/**
 * @ClassName:StampedLockDemo
 * @Description:官方例子
 * @Author:chenyb
 * @Date:2020/9/26 11:37 下午
 * @Versiion:1.0
 */
public class StampedLockDemo {
    //成員變數
    private double x, y;
    //鎖例項
    private final StampedLock sl = new StampedLock();

    //排它鎖-寫鎖(writeLock)
    void move(double deltaX, double deltaY) {
        long stamp = sl.writeLock();
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            sl.unlockWrite(stamp);
        }
    }

    //樂觀讀鎖
    double distanceFromOrigin() {
        //嘗試獲取樂觀鎖1
        long stam = sl.tryOptimisticRead();
        //將全部變數拷貝到方法體棧內2
        double currentX = x, currentY = y;
        //檢查在1獲取到讀鎖票據後,鎖有沒被其他寫執行緒排他性搶佔3
        if (!sl.validate(stam)) {
            //如果被搶佔則獲取一個共享讀鎖(悲觀獲取)4
            stam = sl.readLock();
            try {
                //將全部變數拷貝到方法體棧內5
                currentX = x;
                currentY = y;
            } finally {
                //釋放共享讀鎖6
                sl.unlockRead(stam);
            }
        }
        //返回計算結果7
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }

    //使用悲觀鎖獲取讀鎖,並嘗試轉換為寫鎖
    void moveIfAtOrigin(double newX, double newY) {
        //這裡可以使用樂觀讀鎖替換1
        long stamp = sl.readLock();
        try {
            //如果當前點遠點則移動2
            while (x == 0.0 && y == 0.0) {
                //嘗試將獲取的讀鎖升級為寫鎖3
                long ws = sl.tryConvertToWriteLock(stamp);
                //升級成功後,則更新票據,並設定座標值,然後退出迴圈4
                if (ws != 0L) {
                    stamp = ws;
                    x = newX;
                    y = newY;
                    break;
                } else {
                    //讀鎖升級寫鎖失敗則釋放讀鎖,顯示獲取獨佔寫鎖,然後迴圈重試5
                    sl.unlockRead(stamp);
                    stamp = sl.writeLock();
                }
            }
        } finally {
            //釋放鎖6
            sl.unlock(stamp);
        }
    }
}

生產者消費者模型

Consumer.java

package com.yb.thread.communication;

/**
 * 消費者
 */
public class Consumer implements Runnable {
    private Medium medium;

    public Consumer(Medium medium) {
        this.medium = medium;
    }

    @Override
    public void run() {
        while (true) {
            medium.take();
        }
    }
}

Producer.java

package com.yb.thread.communication;

/**
 * 生產者
 */
public class Producer implements Runnable {
    private Medium medium;

    public Producer(Medium medium) {
        this.medium = medium;
    }

    @Override
    public void run() {
        while (true) {
            medium.put();
        }
    }
}

Medium.java

package com.yb.thread.communication;


/**
 * 中間商
 */
public class Medium {
    //生產個數
    private int num = 0;
    //最多生產數
    private static final int TOTAL = 20;

    /**
     * 接受生產資料
     */
    public synchronized void put() {
        //判斷當前庫存,是否最大庫存容量
        //如果不是,生產完成之後,通知消費者消費
        //如果是,通知生產者進行等待
        if (num < TOTAL) {
            System.out.println("新增庫存--------當前庫存" + ++num);
            //喚醒所有執行緒
            notifyAll();
        } else {
            try {
                System.out.println("新增庫存-----庫存已滿" + num);
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 獲取消費資料
     */
    public synchronized void take() {
        //判斷當前庫存是否不足
        //如果充足,在消費完成之後,通知生產者進行生產
        //如果不足,通知消費者暫停消費
        if (num > 0) {
            System.out.println("消費庫存-------當前庫存容量" + --num);
            //喚醒所有執行緒
            notifyAll();
        } else {
            System.out.println("消費庫存--------庫存不足" + num);
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

測試

管道流通訊

  以記憶體為媒介,用於執行緒之間的資料傳輸

  面向位元組:PipedOutputStream、PipedInputStream

  面向字元:PipedReader、PipedWriter

Reader.java

package com.yb.thread.communication.demo;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PipedInputStream;
import java.util.stream.Collectors;

/**
 * @ClassName:Reader
 * @Description:TODO
 * @Author:chenyb
 * @Date:2020/9/27 10:22 下午
 * @Versiion:1.0
 */
public class Reader implements Runnable{
    private PipedInputStream pipedInputStream;
    public Reader(PipedInputStream pipedInputStream){
        this.pipedInputStream=pipedInputStream;
    }
    @Override
    public void run() {
        if (pipedInputStream!=null){
            String collect = new BufferedReader(new InputStreamReader(pipedInputStream)).lines().collect(Collectors.joining("\n"));
            System.out.println(collect);
        }
        //關閉流
        try {
            pipedInputStream.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Main.java

package com.yb.thread.communication.demo;

import java.io.*;

/**
 * @ClassName:Main
 * @Description:TODO
 * @Author:chenyb
 * @Date:2020/9/27 10:22 下午
 * @Versiion:1.0
 */
public class Main {
    public static void main(String[] args) {
        PipedInputStream pipedInputStream = new PipedInputStream();
        PipedOutputStream pipedOutputStream = new PipedOutputStream();
        try {
            pipedOutputStream.connect(pipedInputStream);
        } catch (IOException e) {
            e.printStackTrace();
        }
        new Thread(new Reader(pipedInputStream)).start();
        BufferedReader bufferedReader = null;
        try {
            bufferedReader = new BufferedReader(new InputStreamReader(System.in));
            pipedOutputStream.write(bufferedReader.readLine().getBytes());
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                pipedOutputStream.close();
                if (bufferedReader!=null){
                    bufferedReader.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }

        }
    }
}

測試

Thread.join

  執行緒A執行一半,需要資料,這個資料需要執行緒B去執行修改,B修改完成後,A才繼續操作

演示

ThreadLocal

執行緒變數,是一個以ThreadLocal物件為鍵、任意物件為值的儲存結構。

1、ThreadLocal.get: 獲取ThreadLocal中當前執行緒共享變數的值。
2、ThreadLocal.set: 設定ThreadLocal中當前執行緒共享變數的值。
3、ThreadLocal.remove: 移除ThreadLocal中當前執行緒共享變數的值。
4、ThreadLocal.initialValue: ThreadLocal沒有被當前執行緒賦值時或當前執行緒剛呼叫remove方法後呼叫get方法,返回此方法值。

原子類

概念

  對多執行緒訪問同一個變數,我們需要加鎖,而鎖是比較消耗效能的,JDK1.5之後,新增的原子操作類提供了一種用法簡單、效能高效、執行緒安全地更新一個變數的方式,這些類同樣位於JUC包下的atomic包下,發展到JDK1.8,該包下共有17個類,囊括了原子更新基本型別、原子更新陣列、原子更新屬性、原子更新引用。

1.8新增的原子類

  1. DoubleAccumulator
  2. DoubleAdder
  3. LongAccumulator
  4. LongAdder
  5. Striped64

原子更新基本型別

JDK1.8之前有以下幾個

  1. AtomicBoolean
  2. AtomicInteger
  3. AtomicLong
  4. DoubleAccumulator
  5. DoubleAdder
  6. LongAccumulator
  7. LongAdder

大致3類

  1. 元老級的原子更新,方法幾乎一模一樣:AtomicBoolean、AtomicInteger、AtomicLong
  2. 對Double、Long原子更新效能進行優化提升:DoubleAdder、LongAdder
  3. 支援自定義運算:DoubleAccumulator、LongAccumulator

演示

元老級

自定義運算

原子更新陣列

JDK1.8之前大概有以下幾個

  1. AtomicIntegerArray
  2. AtomicLongArray
  3. AtomicReferenceArray

原子更新屬性

  1. AtomicIntegerFieldUpdater
  2. AtomicLongFieldUpdater
  3. AtomicStampedReference
  4. AtomicReferenceFieldUpdater

原子更新引用

  1. AtomicReference:用於對引用的原子更新
  2. AtomicMarkableReference:帶版本戳的原子引用型別,版本戳為boolean型別
  3. AtomicStampedReference:帶版本戳的原子引用型別,版本戳為int型別

容器

同步容器

   Vector、HashTable:JDK提供的同步容器類

  Collections.SynchronizedXXX:對相應容器進行包裝

缺點

  在單獨使用裡面的方法的時候,可以保證執行緒安全,但是,複合操作需要額外加鎖來保證執行緒安全,使用Iterator迭代容器或使用for-each遍歷容器,在迭代過程中修改容器會拋ConcurrentModificationException異常。想要避免出現這個異常,就必須在迭代過程持有容器的鎖。但是若容器較大,則迭代的時間也會較長。那麼需要訪問該容器的其他執行緒將會長時間等待。從而極大降低效能。

  若不希望在迭代期間對容器加鎖,可以使用“克隆”容器的方式。使用執行緒封閉,由於其他執行緒不會對容器進行修改,可以避免ConcurrentModificationException。但是在建立副本的時候,存在較大效能開銷。toString、hashCode、equalse、containsAll、removeAll、retainAll等方法都會隱式的Iterate,也即可能丟擲ConcurrentModificationException。

package com.yb.thread.container;

import java.util.Iterator;
import java.util.Vector;

/**
 * @ClassName:VectorDemo
 * @Description:TODO
 * @Author:chenyb
 * @Date:2020/9/29 9:35 下午
 * @Versiion:1.0
 */
public class VectorDemo {
    public static void main(String[] args) {
        Vector<String> strings = new Vector<>();
        for (int i = 0; i <1000 ; i++) {
            strings.add("demo"+i);
        }
        //錯誤遍歷
//        strings.forEach(e->{
//            if (e.equals("demo3")){
//                strings.remove(e);
//            }
//            System.out.println(e);
//        });

        //正確迭代---->單執行緒
//        Iterator<String> iterator = strings.iterator();
//        while (iterator.hasNext()){
//            String next = iterator.next();
//            if (next.equals("demo3")){
//                iterator.remove();
//            }
//            System.out.println(next);
//        }

        //正確迭代--->多執行緒
        Iterator<String> iterator = strings.iterator();
        for (int i = 0; i < 4; i++) {
            new Thread(()->{
                synchronized (iterator){
                    while (iterator.hasNext()){
                        String next = iterator.next();
                        if (next.equals("demo3")){
                            iterator.remove();
                        }
                    }
                }
            }).start();
        }
    }
}

併發容器

  CopyOnWrite、Concurrent、BlockingQueue:根據具體場景進行設計,儘量避免使用鎖,提高容器的併發訪問性。

  ConcurrentBlockingQueue:基於queue實現的FIFO的佇列。佇列為空,去操作會被阻塞

  ConcurrentLinkedQueue:佇列為空,取得時候就直接返回空

package com.yb.thread.container;

import java.util.Iterator;
import java.util.concurrent.CopyOnWriteArrayList;

/**
 * @ClassName:Demo
 * @Description:TODO
 * @Author:chenyb
 * @Date:2020/9/29 9:50 下午
 * @Versiion:1.0
 */
public class Demo {
    public static void main(String[] args) {
        CopyOnWriteArrayList<String> strings=new CopyOnWriteArrayList<>();
        for (int i = 0; i < 1000; i++) {
            strings.add("demo"+i);
        }
        //正常操作--->單執行緒
//        strings.forEach(e->{
//            if (e.equals("demo2")){
//                strings.remove(e);
//            }
//        });

        //錯誤操作,不支援迭代器移除元素,直接拋異常
//        Iterator<String> iterator = strings.iterator();
//        while (iterator.hasNext()){
//            String next = iterator.next();
//            if (next.equals("demo2")){
//                iterator.remove();
//            }
//        }

        //正常操作--->多執行緒
        for (int i = 0; i < 4; i++) {
            new Thread(()->{
                strings.forEach(e -> {
                    if (e.equals("demo2")) {
                        strings.remove(e);
                    }
                });
            }).start();
        }
    }
}

LinkedBlockingQueue

  可以作為生產者消費者的中間商(使用put、take)。

package com.yb.thread.container;

import java.util.concurrent.LinkedBlockingDeque;

/**
 * @ClassName:Demo2
 * @Description:TODO
 * @Author:chenyb
 * @Date:2020/9/29 10:05 下午
 * @Versiion:1.0
 */
public class Demo2 {
    public static void main(String[] args) {
        LinkedBlockingDeque<String> strings = new LinkedBlockingDeque<>();
        //新增元素,3種方式
        strings.add("陳彥斌"); //佇列滿的時候,會拋異常
        strings.offer("陳彥斌"); //如果佇列滿了,直接入隊失敗
        try {
            strings.put("陳彥斌"); //佇列滿,進入阻塞狀態
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //從佇列中取元素,3種方式
        String remove = strings.remove(); //會丟擲異常
        strings.poll(); //在佇列為空的時候,直接返回null
        try {
            strings.take(); //佇列為空的時候,會進入等待狀態
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

併發工具類

CountDownLatch

  1. await():進入等待狀態
  2. countDown:計算器減一

應用場景

  1. 啟動三個執行緒計算,需要對結果進行累加

package com.yb.thread.tool;

import java.util.concurrent.CountDownLatch;

/**
 * @ClassName:CountDownLatchDemo
 * @Description:TODO
 * @Author:chenyb
 * @Date:2020/9/29 10:26 下午
 * @Versiion:1.0
 */
public class CountDownLatchDemo {
    public static void main(String[] args) {
        //模擬場景,學校比較,800米,跑完之後,有跨欄
        //需要先將800米跑完,在佈置跨欄,要不然跑800米的選手會被累死
        CountDownLatch countDownLatch = new CountDownLatch(8);
        new Thread(()->{
            try {
                countDownLatch.await();
                System.out.println("800米比賽結束,準備清跑道,並進行跨欄比賽");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        for (int i = 0; i < 8; i++) {
            int finalI = i;
            new Thread(()->{
                try {
                    Thread.sleep(finalI *1000L);
                    System.out.println(Thread.currentThread().getName()+",到達終點");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    countDownLatch.countDown();
                }
            }).start();
        }
    }
}

CyclicBarrier

  允許一組執行緒相互等待達到一個公共的障礙點,之後繼續執行

區別

  1. CountDownLatch一般用於某個執行緒等待若干個其他執行緒執行完任務之後,他才執行:不可重複使用
  2. CyclicBarrier一般用於一組執行緒相互等待至某個狀態,然後這一組執行緒再同時執行:可重用

package com.yb.thread.tool;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * @ClassName:CyclicBarrierDemo
 * @Description:TODO
 * @Author:chenyb
 * @Date:2020/9/29 10:42 下午
 * @Versiion:1.0
 */
public class CyclicBarrierDemo {
    public static void main(String[] args) {
        //模擬場景:學校800米跑步,等到所有選手全部到齊後,一直跑
        CyclicBarrier cyclicBarrier=new CyclicBarrier(8);
        for (int i = 0; i < 8; i++) {
            int finalI = i;
            new Thread(()->{
                try {
                    Thread.sleep(finalI *1000L);
                    System.out.println(Thread.currentThread().getName()+",準備就緒");
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("選手已到齊,開始比賽");
            }).start();
        }
    }
}

Semaphore(訊號量)

  控制執行緒併發數量

應用場景

  1. 介面限流

package com.yb.thread.tool;

import java.util.concurrent.Semaphore;

/**
 * @ClassName:SemaphoreDemo
 * @Description:TODO
 * @Author:chenyb
 * @Date:2020/9/29 11:11 下午
 * @Versiion:1.0
 */
public class SemaphoreDemo {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(8);
        for (int i = 0; i < 20; i++) {
            new Thread(() -> {

                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + ",開始執行");
                    Thread.sleep(2000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    //釋放
                    semaphore.release();
                }
            }).start();
        }
    }
}

Exchange

  它提供一個同步點,在這個同步點兩個執行緒可以交換彼此的資料(成對)。

應用場景

  1. 交換資料

package com.yb.thread.tool;

import java.util.concurrent.Exchanger;

/**
 * @ClassName:ExchangerDemo
 * @Description:TODO
 * @Author:chenyb
 * @Date:2020/9/29 11:21 下午
 * @Versiion:1.0
 */
public class ExchangerDemo {
    public static void main(String[] args) {
        Exchanger<String> stringExchanger=new Exchanger<>();
        String str1="陳彥斌";
        String str2="ybchen";
        new Thread(()->{
            System.out.println(Thread.currentThread().getName()+"--------------初始值:"+str1);
            try {
                String exchange = stringExchanger.exchange(str1);
                System.out.println(Thread.currentThread().getName()+"--------------交換:"+exchange);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"執行緒A").start();
        new Thread(()->{
            System.out.println(Thread.currentThread().getName()+"--------------初始值:"+str2);
            try {
                String exchange = stringExchanger.exchange(str2);
                System.out.println(Thread.currentThread().getName()+"--------------交換:"+exchange);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"執行緒B").start();
    }
}