1. 程式人生 > >java 併發程式設計學習筆記(八)執行緒池

java 併發程式設計學習筆記(八)執行緒池

                                             執行緒池 

(1)new Thread 弊端

  • 每次new  Thred 新建物件,效能差
  • 執行緒缺乏統一管理,可能無限制的新建執行緒,相互競爭,有可能佔用過多系統資源導致宕機或者oom
  • 缺少更多功能,如更多執行,定期執行,執行緒中斷

(2)執行緒池的好處

  • 重在存在的執行緒,減少物件的建立,消亡的開銷,效能差
  • 可有效控制最大併發執行緒數,提高系統資源利用率,同時可以避免過多資源競爭,避免阻塞
  • 提供定時執行,定期執行,單執行緒,併發數控制等功能

(3)執行緒池 ThreadPoolExecutor

在一個應用程式中,我們需要多次使用執行緒,也就意味著,我們需要多次建立並銷燬執行緒。而建立並銷燬執行緒的過程勢必會消耗記憶體。而在Java中,記憶體資源是及其寶貴的,所以,我們就提出了執行緒池的概念。

執行緒池:Java中開闢出了一種管理執行緒的概念,這個概念叫做執行緒池,從概念以及應用場景中,我們可以看出,執行緒池的好處,就是可以方便的管理執行緒,也可以減少記憶體的消耗。那麼,我們應該如何建立一個執行緒池呢?

Java中已經提供了建立執行緒池的一個類:Executor而我們建立時,一般使用它的子類:


   public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

 

  • execute (): 提交 任務,交給執行緒池執行
  • submit():提交任務,能夠返回執行結果 execute+future
  • shutdown():關閉執行緒池,等待任務都執行完
  • shutdownNow (): 關閉執行緒池,不等待任務執行完
  • getTaskCount (): 執行緒池已經執行和未執行的任務總數
  • getCompletedTaskCount (): 已完成的任務數量
  • getPoolSize () : 執行緒池當前的執行緒數量
  • getActiveCount (): 當前執行緒池中正在執行任務的執行緒數量

這是其中最重要的一個構造方法,

這個方法決定了創建出來的執行緒池的各種屬性,下面依靠一張圖來更好的理解執行緒池和這幾個引數:又圖中,我們可以看出,

執行緒池中的corePoolSize就是執行緒池中的核心執行緒數量,這幾個核心執行緒,只是在沒有用的時候,也不會被回收。

maximumPoolSize就是執行緒池中可以容納的最大執行緒的數量。

keepAliveTime,就是執行緒池中除了核心執行緒之外的其他的最長可以保留的時間,因為線上程池中,除了核心執行緒即使在無任務的情況下也不能被清除,其餘的都是有存活時間的,意思就是非核心執行緒可以保留的最長的空閒時間,而util,就是計算這個時間的一個單位。

workQueue,就是等待佇列,任務可以儲存在任務佇列中等待被執行,執行的是FIFIO原則(先進先出)。

threadFactory,就是建立執行緒的執行緒工廠。

最後一個handler,是一種拒絕策略,我們可以在任務滿了知乎,拒絕執行某些任務。

執行緒池的執行流程又是怎樣的呢?

有圖我們可以看出,任務進來時,首先執行判斷,判斷核心執行緒是否處於空閒狀態,如果不是,核心執行緒就先就執行任務,如果核心執行緒已滿,則判斷任務佇列是否有地方存放該任務,若果有,就將任務儲存在任務佇列中,等待執行,如果滿了,在判斷最大可容納的執行緒數,如果沒有超出這個數量,就開創非核心執行緒執行任務,如果超出了,就呼叫handler實現拒絕策略。

  • handler的拒絕策略有四種:

第一種AbortPolicy:不執行新任務,直接丟擲異常,提示執行緒池已滿

 public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

第二種DisCardPolicy:不執行新任務,也不丟擲異常

public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }

        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

第三種DisCardOldSetPolicy:將訊息佇列中的第一個任務替換為當前新進來的任務執行    

  public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }

        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();  //將佇列頭部的任務移除掉
                e.execute(r);   //執行當前新進來的任務
            }
        }
    }

第四種CallerRunsPolicy:直接呼叫execute來執行當前任務

public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();     //直接執行 當前新進來的任務
            }
        }
    }
  • 四種常見的執行緒池:

CachedThreadPool:可快取的執行緒池,該執行緒池中沒有核心執行緒,非核心執行緒的數量為Integer.max_value,就是無限大,當有需要時建立執行緒來執行任務,沒有需要時回收執行緒,適用於耗時少,任務量大的情況。

SecudleThreadPool:週期性執行任務的執行緒池,按照某種特定的計劃執行執行緒中的任務,有核心執行緒,但也有非核心執行緒,非核心執行緒的大小也為無限大。適用於執行週期性的任務。

SingleThreadPool:只有一條執行緒來執行任務,適用於有順序的任務的應用場景。

FixedThreadPool:定長的執行緒池,有核心執行緒,核心執行緒的即為最大的執行緒數量,沒有非核心執行緒

  /**
         *  fixedThreadPool 正規的執行緒池,由於核心池數 等於最大 池數,因此沒有最大執行緒池,
         *              只有最大執行緒池 的執行緒才能被回收,因為沒有最大執行緒池,所以無超時機制,
         *              佇列大小無限制,除非執行緒池關閉了核心執行緒才會被回收,
         *              採用預設的AbortPolicy:不執行新任務,直接丟擲異常,提示執行緒池已滿
         *  return new ThreadPoolExecutor(nThreads, nThreads,
         *                                       0L, TimeUnit.MILLISECONDS,
         *                                       new LinkedBlockingQueue<Runnable>());
         *
         */
        ExecutorService service = Executors.newFixedThreadPool(10);

        /**
         *   newCachedThreadPool
         *  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
         *                                       60L, TimeUnit.SECONDS,
         *                                       new SynchronousQueue<Runnable>());
         *
         * 只有非核心執行緒,最大執行緒數很大(Integer.MAX_VALUE),它會為每一個任務新增一個新的執行緒,
         * 這邊有一個超時機制,當最大執行緒池中空閒的執行緒超過60s內沒有用到的話,就會被回收。
         * 缺點就是沒有考慮到系統的實際記憶體大小。
         * 採用預設的AbortPolicy:不執行新任務,直接丟擲異常,提示執行緒池已滿
         */
        service = Executors.newCachedThreadPool();


        /**
         *    return new FinalizableDelegatedExecutorService
         *             (new ThreadPoolExecutor(1, 1,
         *                                     0L, TimeUnit.MILLISECONDS,
         *                                     new LinkedBlockingQueue<Runnable>()));
         * 看這個名字就知道這個傢伙是隻有一個核心執行緒,就是一個孤家寡人,
         *通過指定的順序將任務一個個丟到執行緒,都乖乖的排隊等待執行,不處理併發的操作,不會被回收。確定就是一個人幹活效率慢。
         */
        Executors.newSingleThreadExecutor();

        /**
         *   super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
         *               new DelayedWorkQueue());
         *    這個執行緒池就厲害了,是唯一一個有延遲執行和週期重複執行的執行緒池。
         *   它的核心執行緒池固定,非核心執行緒的數量沒有限制,但是閒置時會立即會被回收。
         */
        Executors.newScheduledThreadPool(5);

(4)執行緒池的簡單使用

@Slf4j
public class ThreadPoolExample1 {
    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();

        service = Executors.newFixedThreadPool(3);

        service = Executors.newSingleThreadExecutor();

        for (int i = 0; i < 10; i++) {
            final int index = i;
            service.execute(() -> {
                log.info("task {}", index);
            });
        }

        service.shutdown();

        //Timer 元件
        Timer timer = new Timer();
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                log.info("Timer元件的定時任務");
            }
        }, new Date());

        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                log.info("");
            }
        }, 3);

        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                log.info("開始時間");
                log.info("喵喵喵喵喵喵喵喵");
                log.info("結束時間");
            }
        }, 3000, 3000);


        timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                log.info("開始時間");
                log.info("喵喵喵喵喵喵喵喵");
                log.info("結束時間");
            }
        }, 3000, 3000);


        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
        //3秒之後執行
        scheduledExecutorService.schedule(() -> {
                    log.info("啦啦啦啦!");
                },
                3, TimeUnit.SECONDS);


        //3秒之後執行,前一個執行緒的執行開始時間 和 後一個 執行緒的執行開始時間相差2秒
        /**
         * command:執行執行緒
         * initialDelay:初始化延時
         * period:兩次開始執行最小間隔時間
         * unit:計時單位
         */
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            log.info("開始時間");
            log.info("啦啦啦啦啦啦啦啦啦!");
            log.info("結束時間");
        }, 3, 2, TimeUnit.SECONDS);


        //3秒之後執行,前一個執行緒的結束時間 和 後一個 執行緒的 開始時間相差2秒
        /**
         * command:執行執行緒
         * initialDelay:初始化延時
         * period:前一次執行結束到下一次執行開始的間隔時間(間隔執行延遲時間)
         * unit:計時單位
         */
        scheduledExecutorService.scheduleWithFixedDelay(() -> {
            log.info("開始時間");
            log.info("啦啦啦啦啦啦啦啦啦!");
            log.info("結束時間");
        }, 3, 5, TimeUnit.SECONDS);
    }
}

(5)執行緒池 合理配置

cpu 密集型任務,就需要儘量壓榨cpu,參考值可以設定為NCPU+1

Io 密集型任務,參考值可以設定為2*NCPU

I/O 密集型(主要是讀寫):指的是系統的CPU效能相對硬碟/記憶體的效能要好很多,此時,系統運作,大部分的狀況是 CPU 在等 I/O (硬碟/記憶體) 的讀/寫,此時 CPU Loading 不高 
CPU 密集型(主要是運算): 指的是系統的 硬碟/記憶體 效能 相對 CPU 的效能 要好很多,此時,系統運作,大部分的狀況是 CPU Loading 100%,CPU 要讀/寫 I/O (硬碟/記憶體),I/O在很短的時間就可以完成,而 CPU 還有許多運算要處理,CPU Loading 很高。