1. 程式人生 > >乾貨!執行緒池+CountDownLatch,實現 多執行緒併發計算、彙總

乾貨!執行緒池+CountDownLatch,實現 多執行緒併發計算、彙總

目錄結構

抽象類:求和器

單執行緒 求和器 VS 多執行緒 求和器

1)執行緒池

  • 多個執行緒 一起併發執行,效能很生猛

2)CountDownLatch

  • 主執行緒 使用 latch.await() 阻塞住,直到所有 子任務 都執行完畢了,才會繼續向下執行。這樣就保證了 邏輯的正確性

  • 所有 子任務 共享同一個 CountDownLatch 變數,實現 協同合作

  • 所有 子任務 的 finally塊 中,必須要 latch.countDown() ,確保 "無論 正確、異常 都會 countDown",否則 主執行緒 會由於 "某一個 子任務 沒有 countDown 過,就 執行結束了,導致 latch 最終無法被 countDown 到 0 ",而被 永遠掛住

3)private AtomicInteger sum

  • 多個執行緒 會併發 操作同一個 Integer 型別變數,為了確保 執行緒安全,要使用 Atomic 原子型別

原始碼

SinglethreadSummator

package com.lsy.test;


/**
 * 單執行緒 求和器
 */
public class SinglethreadSummator extends Summator{

    private int sum = 0;

    private int getValue() {
        System.out.println("SinglethreadSummator.getValue()");
        try {
            Thread.sleep(400);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return 1;
    }

    @Override
    public int sum(int count) {

        for(int i=0; i<=count-1; i++) {
            sum = sum + getValue();
        }

        return sum;

    }



}

MultithreadSummator

package com.lsy.test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 多執行緒 求和器
 */
public class MultithreadSummator extends Summator{

    private AtomicInteger sum = new AtomicInteger(0); //由於是 多執行緒,所以要使用 原子型別
    private int corePoolSize = 10;

    public int getValue() {
        System.out.println("MultithreadSummator.getValue()");
        try {
            Thread.sleep(400);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return 1;
    }


    /**
     * 內部類
     */
    private class Core implements Runnable {
        private CountDownLatch latch;

        public Core(CountDownLatch latch) {
            this.latch = latch;
        }

        @Override
        public void run() {

            try {

                sum.getAndAdd(getValue());

            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                latch.countDown(); //無論 正確 或 異常,都必須 countDown,否則 main執行緒 會被 countDown.await() 一直掛住
            }

        }
    }

    @Override
    public int sum(int count) {

        CountDownLatch latch = new CountDownLatch(count);

        ExecutorService service = Executors.newFixedThreadPool(corePoolSize);

        try {

            //發起 count個 任務,併發執行
            for(int i=0; i<=count-1; i++) {
                service.submit(new Core(latch));
            }

            //等待 所有執行緒 都 執行完畢
            latch.await();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            service.shutdown();
        }

        return sum.get();
    }


}