1. 程式人生 > >並發編程專題(四)

並發編程專題(四)

上鎖 live 過多 repo 狀態 業務 方式 元素 epo

CountDownLatch(計數器)

CountDownLatch 位於並發包下,利用它可以完成類似於計數器的功能,如果線程 A 需要等待其他 n 個線程執行完畢後才能執行,此時就可以利用 CountDownLatch 來實現這個功能,CountDownLatch 是通過一個計數器來實現的,計數器的初始值為線程數量,每當一個線程完成了自己的任務後,計數器的值就會減1,當計數器的值為0時,表示所有線程已經執行完畢,等待線程就可以恢復。

package com.kernel;

import java.util.concurrent.CountDownLatch;

public class Test001 {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + ",子線程開始執行...");
                countDownLatch.countDown();
                System.out.println(Thread.currentThread().getName() + ",子線程結束執行...");

            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + ",子線程開始執行...");
                countDownLatch.countDown();
                System.out.println(Thread.currentThread().getName() + ",子線程結束執行...");

            }
        }).start();
        countDownLatch.await();
        System.out.println("其他線程執行完畢");
        System.out.println("等待線程執行。。。");
    }
}

CyclicBarrier(屏障)

CyclicBarrier 在初始化時會傳入一個數量,它會記錄調用了 await 方法的線程數,只有這個線程數和創建該對象時提供的數量相同時,所有進入線程等待的線程才會被重新喚醒繼續執行。

顧名思義,它就像一個屏幕,人來全了才能一塊兒過屏障。

CyclicBarrier 還可以提供一個可以傳入 Runable 對象的構造,該線程將在一起通過屏障後所有線程喚醒之前被喚醒

package com.kernel;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

class Write extends Thread {
    private CyclicBarrier cyclicBarrier;

    public Write(CyclicBarrier cyclicBarrier) {
        this.cyclicBarrier = cyclicBarrier;
    }

    @Override
    public void run() {
        System.out.println("線程" + Thread.currentThread().getName() + ",正在寫入數據");
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("線程" + Thread.currentThread().getName() + ",寫入數據成功.....");
        try {
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println("所有線程執行完畢");
    }
}

public class Test002 {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
        for (int i = 0; i < 5; i++) {
            Write write = new Write(cyclicBarrier);
            write.start();
        }
    }
}

Semaphore(信號量)

Semaphore 是一種基於計數的信號量,創建時可以指定一個值,這個值規定了有多少個線程並發執行,執行前申請,執行完畢後歸還,超過那個值後,線程申請信號將會被阻塞,知道有其他占有信號的線程執行完成歸還信號

Semaphore 可以用來構建一些對象池,資源池之類的,比如數據庫連接池

我們也可以創建計數為1的 Semaphore,將其作為一種類似互斥鎖的機制,這也叫二元信號量,表示兩種互斥狀態

package com.kernel;

import java.util.Random;
import java.util.concurrent.Semaphore;

public class Test003 extends Thread {
    private String name;
    private Semaphore windows;

    public Test003(String name, Semaphore windows) {
        this.name = name;
        this.windows = windows;
    }

    @Override
    public void run() {
        int availablePermits = windows.availablePermits();
        if (availablePermits > 0) {
            System.out.println(name + ":終於輪到我了");
        } else {
            System.out.println(name + ":**,能不能快點");
        }
        try {
            windows.acquire();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(name + ":我要XXX,剩下窗口");
        try {
            Thread.sleep(new Random().nextInt(1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(name + ":我買完了");
        windows.release();

    }

    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < 5; i++) {
            Test003 test003 = new Test003("第" + i + "個人", semaphore);
            test003.start();
        }
    }
}

並發隊列

  • ArrayDeque(數組雙端隊列)
  • PriorityQueue(優先級隊列)
  • ConcurrentLinkedQueue(基於鏈表的並發隊列)
  • DelayQueue(延期阻塞隊列,阻塞隊列實現了BlockingQueue接口)
  • ArrayBlockingQueue(基於數組的並發阻塞隊列)
  • LinkedBlockingQueue(基於鏈表的FIFO阻塞隊列)
  • LinkedBlockingDeque(基於鏈表的FIFO雙端阻塞隊列)
  • PriorityBlockingQueue(帶優先級的×××阻塞隊列)
  • SynchronousQueue(並發同步阻塞隊列)

ConcurrentLinkedDeque(非阻塞式隊列)

是一個×××隊列,性能高於阻塞式隊列,是一個基於鏈表實現的線程安全隊列,不允許 null

add 和 offer 是加入元素的方法,兩者之間沒有區別

poll 從隊列中取出並刪除元素

peek 查看隊列頭元素,不刪除

BlockingQueue(阻塞式隊列)

是一個有界隊列,阻塞隊列常用語生產消費者場景

以下情況會產生阻塞:

隊列元素滿,還往裏面存元素

隊列元素空,還想從隊列中拿元素

基於阻塞隊列的生產者消費者模型

package com.kernel;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

class ProducerThread implements Runnable {
    private BlockingQueue<String> blockingQueue;
    private AtomicInteger atomicInteger = new AtomicInteger();
    private volatile boolean flag = true;

    public ProducerThread(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        System.out.println("###生產者線程已經啟動###");
        while (flag) {
            String data = atomicInteger.incrementAndGet() + "";
            try {
                boolean offer = blockingQueue.offer(data, 2, TimeUnit.SECONDS);
                if (offer) {
                    System.out.println(Thread.currentThread().getName() + ", 生產隊列" + data + "成功");
                } else {
                    System.out.println(Thread.currentThread().getName() + ", 生產隊列" + data + "失敗");
                }
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("生產者線程結束");
    }

    public void stop() {
        this.flag = false;
    }
}

class CustomerThread implements Runnable {

    private BlockingQueue<String> blockingQueue;
    private volatile boolean flag = true;

    public CustomerThread(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        System.out.println("###消費者線程已經啟動###");
        while (flag) {
            String data = null;
            try {
                data = blockingQueue.poll(2, TimeUnit.SECONDS);
                if (data == null) {
                    flag = false;
                    System.out.println("消費者超過2秒時間未獲取到消息.");
                    return;
                }
                System.out.println("消費者獲取到隊列信息成功,data:" + data);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("消費者進程結束");
    }
}

public class Test006 {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
        ProducerThread producerThread = new ProducerThread(blockingQueue);
        CustomerThread customerThread = new CustomerThread(blockingQueue);
        new Thread(producerThread).start();
        new Thread(customerThread).start();
        Thread.sleep(1000 * 10);
        producerThread.stop();
    }
}

線程池介紹

線程池其實就是一個可以容納線程的容器,其中的線程可以反復利用,節省了反復創建、銷毀線程的消耗

線程池有什麽優點?

降低資源消耗:重復利用已經創建好的線程而節約反復創建、銷毀線程的消耗

提高響應速度:眾所周期,創建線程不是立馬可以使用的,創建好線程之後進入就緒狀態,需要經過 CPU 的調度才能進入運行狀態,而利用線程池,只要任務來到,線程池有空閑線程,就可以立即作業

提高線程管理性:線程是稀缺資源,如果無限制的創建,不僅會消耗系統資源,還可以降低系統穩定性,使用線程池可以進行統一分配、調優和監控

線程池的分類:

newCachedThreadPool

創建一個可緩存線程池,可反復回收利用,若任務數大於當然線程數,則繼續創建線程

public class Test008 {
    public static void main(String[] args) {
        // 可緩存線程池(可重復利用)無限大
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 100; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName());
                }
            });
        }
    }
}

newFixedThreadPool

創建一個定長線程,超出線程存放在隊列中

public class Test009 {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 10; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName());
                }
            });
        }
    }
}

newScheduledThreadPool

創建一個定長並支持定時及周期性任務執行的線程池

public class Test010 {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
        for (int i = 0; i < 10; i++) {
            scheduledExecutorService.schedule(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName());
                }
                // 表示延遲3秒執行
            },3, TimeUnit.SECONDS);
        }
    }
}

newSingleThreadExecutor

創建一個單線程線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO、LIFO、優先級)執行

public class Test011 {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName());
                }
            });
        }
        executorService.shutdown();
    }
}

線程池源碼分析

  • corePoolSize 核心線程數(實際運行線程數)
  • maximumPoolSize 最大線程數(最多可以創建多少個線程)
  • keepAliveTime 線程空閑超時時間
  • unit 時間單位
  • workQueue 緩存隊列

提交一個任務到線程池中去,首先判斷當前線程數是都小於 corePoolSize,如果小於 corePoolSize,則創建一個新線程來執行任務

如果當前線程數等於corePoolSize,再來任務的話就會將任務添加到緩存隊列中

如果緩存隊列已滿,在判斷當前線程是否小於 maximumPoolSize

如果小於 maximumPoolSize,創建線程執行任務,否則,采取任務拒絕策略進行處理

如果當前線程數大於 corePoolSize,並且某線程空閑時間大於 keepAliveTime,線程被終止,直到線程池中的線程數目不大於corePoolSize

如果允許為核心池中的線程設置存活時間,那麽核心池中的線程空閑時間超過 keepAliveTime,線程也會被終止

合理配置線程池

IO 密集型:即該任務需要大量的IO,即大量的阻塞,在單線程上運行IO密集型的任務會導致浪費大量的 CPU 運算能力浪費在等待,所以在 IO 密集型任務中使用多線程可以大大的加速程序運行,即時在單核CPU上,這種加速主要就是利用了被浪費掉的阻塞時間,一般 IO 密集型任務線程設置為 2*核心數+1

CPU 密集型:該任務需要進行大量計算,沒有 IO 阻塞,CPU 一直在全速運行,CPU 密集任務只有在真正的多核CPU上才可能得到加速(通過多線程),而在單核CPU上,無論你開幾個模擬的多線程,該任務都不可能得到加速,因為CPU總的運算能力就那些,一般 CPU 密集型任務線程設置為 核心數+1

Callable

Java中,創建線程一般有兩種方式,就是繼承 Thread 或者實現 Runable 接口,這兩種方式的缺點是在線程任務執行完畢後,無法獲得執行結果,所以一般使用共享變量或者共享存儲區以及線程通信的方式獲得結果,Java 中也提供了使用 Callable 和 Future 來實現獲取任務結果的操作,Callable 用來執行任務,產生結果,而 Future 用來獲得結果

Future 常用方法

V get() 獲取異步執行的結果,如果沒有結果可用,此方法會阻塞直到異步計算完成

V get(Long timeout, Timeunit unit) 獲取異步執行結果,如果沒有結果可用,此方法會阻塞,但是會有時間限制,如果阻塞時間超過設定的timeout時間,該方法將拋出異常

boolean isDone() 如果任務執行結束,無論是正常結束或是中途取消還是發生異常,都返回true

boolean isCanceller() 如果任務完成前被取消,則返回true

boolean cancel(boolean mayInterruptIfRunning) 如果任務還沒開始就執行該方法將返回 false,如果任務執行過程中調用 cancel(true) 將以中斷執行任務的方式試圖停止任務,如果停止成功,返回 true,如果執行 cancel(false) 不會對執行的任務產生影響,此時返回 false,當任務完成後調用該方法將返回 false,參數表示是否中斷執行進程

Future 模式

去除主函數的等待時間,使得原本需要等待的時間可以處理其他業務,對於多線程,如果線程 A 要等待線程 B 的結果,那麽線程 A 沒必要等待 B,直到 B 有結果,可以先拿到一個未來的 Future,等 B 有結果是再取真實的結果

模擬 Future

Data

public interface Data {
    // 獲取子線程執行結果
    public String getRequest() throws InterruptedException;
}

FutureData

public class FutureData implements Data {
    private boolean flag = false;
    private RealData realData;

    public synchronized void setRealData(RealData realData) {
        if (flag)
            return;
        this.realData = realData;
        flag = true;
        // 喚醒
        notify();
    }

    @Override
    public synchronized String getRequest() throws InterruptedException {
        while (!flag) {
            // 等待
            wait();
        }
        // 返回結果
        return realData.getRequest();
    }
}

RealData

public class RealData implements Data {
    private String result;

    public RealData(String data) throws InterruptedException {
        System.out.println("正在下載" + data);
        Thread.sleep(5000);
        System.out.println("下載完畢!");
        result = "author:kernel";
    }

    @Override
    public String getRequest() {
        return result;
    }
}

FutureClient

public class FutureClient {
    public Data submit(String requestData) {
        FutureData futureData = new FutureData();
        new Thread(new Runnable() {
            @Override
            public void run() {
                RealData realData = null;
                try {
                    realData = new RealData(requestData);
                    futureData.setRealData(realData);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        return futureData;
    }
}

測試

public class Test002 {
    public static void main(String[] args) throws InterruptedException {
        FutureClient futureClient = new FutureClient();
        Data request = futureClient.submit("請求參數");
        System.out.println("請求發送成功");
        // 主線程該幹嘛幹嘛去
        System.out.println("執行其他任務");
        // 獲取其他線程的結果
        String result = request.getRequest();
        System.out.println("獲取到結果" + result);
    }
}

執行流程:

首先創建一個向 FutureClient 發送請求,然後 realData 執行下載任務,將結果封裝起來,然後獲取結果函數時刻監測線程是否拿到結果,如果拿到了,就返回,如果沒有拿到,就一直阻塞,設置結果的函數拿到結果會立即喚醒返回結果的函數

重入鎖

重入鎖,也叫遞歸鎖,指的是同一線程外層函數獲得鎖之後,內存函數仍有獲取該鎖的代碼,但不受影響

ReentrantLock(顯式鎖、輕量級鎖)和 Synchronized(內置鎖、重量級鎖)都是可重入鎖

讀寫鎖

程序中涉及一些對共享變量的讀寫操作時,在沒有寫操作時,多個線程同時讀取是沒有任何問題的,如果有一個線程正在進行寫操作,其他線程就不應該對其進行讀或寫操作了

讀寫不同共存,讀讀可以共存,寫寫不能共存

public class Test001 {
    private volatile Map<String, Object> cache = new HashMap<String, Object>();
    private ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
    private ReadLock readLock = reentrantReadWriteLock.readLock();
    private WriteLock writeLock = reentrantReadWriteLock.writeLock();

    public void put(String key, String value) {
        try {
            writeLock.lock();
            System.out.println("寫入put方法key:" + key + ",value" + value + ",開始");
            Thread.sleep(1000);
            cache.put(key, value);
            System.out.println("寫入put方法key:" + key + ",value" + value + ",結束");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            writeLock.unlock();
        }
    }

    public String get(String key) {
        try {
            readLock.lock();
            System.out.println("讀取key:"+ key + ",開始");
            Thread.sleep(1000);
            String value = (String) cache.get(key);
            System.out.println("讀取key:"+ key + ",結束");
            return value;
        } catch (InterruptedException e) {
            e.printStackTrace();
            return null;
        } finally {
            readLock.unlock();
        }
    }

    public static void main(String[] args) {
        Test001 test001 = new Test001();
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    test001.put("i", i + "");
                }
            }
        });
        t1.start();
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    test001.get(i + "");
                }
            }
        });
        t2.start();
    }
}

悲觀鎖

悲觀鎖悲觀的認為每一次操作都會造成更新丟失問題,在每次查詢時加上排他鎖,每次去拿數據的時候都認為別人會修改,所以每次在拿數據的時候都會上鎖,這樣別人想拿這個數據就會阻塞直到它拿到鎖,傳統的關系型數據庫裏邊就用到了很多這種鎖機制,比如行鎖,表鎖等,讀鎖,寫鎖等,都是在做操作之前先上鎖

樂觀鎖

樂觀鎖會樂觀的認為每次查詢都不會造成更新丟失,利用版本字段控制,

首先通過條件查詢出版本號,然後更新的時候判斷當前版本號是否和之前版本號一致,如果一致,證明沒人修改,直接更新,否則,重新查詢以便在進行更新

CAS 無鎖機制 (利用原子類)

與鎖相比,使用 CAS 無鎖機制會使程序看起來復雜一些,但由於其非阻塞性,是不會發生死鎖現象的,而且線程間相互影響也比鎖要小的多,使用 CAS 完全沒有鎖之間競爭帶來的系統開銷,也沒有線程間頻繁調用帶來的開銷

CAS 原理

CAS 包括三個參數,分別是 V(表示要更新的變量、主內存的值)、E(表示預期值、本地內存的值)、N(新值),如果 V = E,那麽將 V 的值設置成 N,如果 V != E,說明其他線程修改了,則當前線程什麽都不做,最後,CAS返回當前V的真實值

CAS 和樂觀鎖很相似,都是抱著樂觀的心態去處理,多個線程同時使用 CAS 操作,只有一個能勝出,其他都失敗,失敗的線程不會掛起,僅告知失敗,並且允許重新嘗試,當然也允許放棄嘗試,基於這樣的原理,CAS操作即使沒有鎖,也可以發現其他線程對當前線程的幹擾,並進行恰當的處理

簡單地說,CAS需要你額外給出一個期望值,也就是你認為這個變量現在應該是什麽樣子的,如果變量不是你想象的那樣,那說明它已經被別人修改過了。你就重新讀取,再次嘗試修改就好了

優缺點

優點:

在高並發的情況下,它比有鎖的程序擁有更好的性能

死鎖免疫

缺點:

CAS存在一個很明顯的問題,即 ABA 問題

果在這段期間曾經被改成 B,然後又改回 A,那 CAS 操作就會誤認為它從來沒有被修改過,針對這種情況,並發包中提供了一個帶有標記的原子引用類 AtomicStampedReference,它可以通過控制變量值的版本來保證 CAS 的正確性

原子類

Java 中的原子操作類大致可以分為4類:原子更新基本類型、原子更新數組類型、原子更新引用類型、原子更新屬性類型,這些原子類中都是用了無鎖的概念,有的地方直接使用CAS操作的線程安全的類型
AtomicBoolean

AtomicInteger

AtomicLong

AtomicReference

並發編程專題(四)