1. 程式人生 > >高併發第十四彈:執行緒池的介紹及使用

高併發第十四彈:執行緒池的介紹及使用

單執行緒就不說了因為簡單,並且 在實際的生產環境中一般必須來說 執行緒資源都是由執行緒池提供執行緒資源的。

執行緒池的好處

  • 重用存在的執行緒,減少物件建立、消亡的開銷,效能好
  • 可有效控制最大併發執行緒數,提高系統資源利用率,同時可以避免過多資源競爭,避免阻塞。
  • 提供定時執行、定期執行、單執行緒、併發數控制等功能。

線上程池的類圖中,我們最常使用的是最下邊的Executors,用它來建立執行緒池使用執行緒。那麼在上邊的類圖中,包含了一個Executor框架,它是一個根據一組執行策略的呼叫排程執行和控制非同步任務的框架,目的是提供一種將任務提交與任務如何執行分離開的機制。它包含了三個executor介面:

  • Executor:執行新任務的簡單介面
  • ExecutorService:擴充套件了Executor,添加了用來管理執行器生命週期和任務生命週期的方法
  • ScheduleExcutorService:擴充套件了ExecutorService,支援Future和定期執行任務

 來說一下實際應用:

【強制】執行緒池不允許使用 Executors去建立,而是通過 ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學更加明確執行緒池的執行規則,規避資源耗盡的風險。說明:Executors返回的執行緒池物件的弊端如下:

1)FixedThreadPool和 SingleThreadPool:

  允許的請求佇列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM。

2)CachedThreadPool和 ScheduledThreadPool:

  允許的建立執行緒數量為 Integer.MAX_VALUE,可能會建立大量的執行緒,從而導致 OOM

執行緒池核心類-ThreadPoolExecutor

引數說明:ThreadPoolExecutor一共有七個引數,這七個引數配合起來,構成了執行緒池強大的功能。

  • corePoolSize:核心執行緒數量
  • maximumPoolSize:執行緒最大執行緒數
  • workQueue:阻塞佇列,儲存等待執行的任務,很重要,會對執行緒池執行過程產生重大影響
當我們提交一個新的任務到執行緒池,執行緒池會根據當前池中正在執行的執行緒數量來決定該任務的處理方式。處理方式有三種: 1、直接切換(SynchronusQueue) 2、無界佇列(LinkedBlockingQueue)能夠建立的最大執行緒數為corePoolSize,這時maximumPoolSize就不會起作用了。當執行緒池中所有的核心執行緒都是執行狀態的時候,新的任務提交就會放入等待佇列中。 3、有界佇列(ArrayBlockingQueue)最大maximumPoolSize,能夠降低資源消耗,但是這種方式使得執行緒池對執行緒排程變的更困難。因為執行緒池與佇列容量都是有限的。所以想讓執行緒池的吞吐率和處理任務達到一個合理的範圍,又想使我們的執行緒排程相對簡單,並且還儘可能降低資源的消耗,我們就需要合理的限制這兩個數量 分配技巧: [如果想降低資源的消耗包括降低cpu使用率、作業系統資源的消耗、上下文切換的開銷等等,可以設定一個較大的佇列容量和較小的執行緒池容量,這樣會降低執行緒池的吞吐量。如果我們提交的任務經常發生阻塞,我們可以調整maximumPoolSize。如果我們的佇列容量較小,我們需要把執行緒池大小設定的大一些,這樣cpu的使用率相對來說會高一些。但是如果執行緒池的容量設定的過大,提高任務的數量過多的時候,併發量會增加,那麼執行緒之間的排程就是一個需要考慮的問題。這樣反而可能會降低處理任務的吞吐量。
  • keepAliveTime:執行緒沒有任務執行時最多保持多久時間終止(當執行緒中的執行緒數量大於corePoolSize的時候,如果這時沒有新的任務提交核心執行緒外的執行緒不會立即銷燬,而是等待,直到超過keepAliveTime)
  • unit:keepAliveTime的時間單位
  • threadFactory:執行緒工廠,用來建立執行緒,有一個預設的工場來建立執行緒,這樣新創建出來的執行緒有相同的優先順序,是非守護執行緒、設定好了名稱)
corePoolSize、maximumPoolSize、workQueue 三者關係:如果執行的執行緒數小於corePoolSize的時候,直接建立新執行緒來處理任務。即使執行緒池中的其他執行緒是空閒的。如果執行中的執行緒數大於corePoolSize且小於maximumPoolSize時,那麼只有當workQueue滿的時候才建立新的執行緒去處理任務。如果corePoolSize與maximumPoolSize是相同的,那麼建立的執行緒池大小是固定的。這時有新任務提交,當workQueue未滿時,就把請求放入workQueue中。等待空執行緒從workQueue取出任務。如果workQueue此時也滿了,那麼就使用另外的拒絕策略引數去執行拒絕策略。
初始化方法:由七個引數組合成四個初始化方法  其他方法: 1 execute() 提交任務,交給執行緒池執行 2 submit() 提交任務,能夠返回執行結果 execute+Future 3 shutdown() 關閉執行緒池,等待任務都執行完 4 shutdownNow() 關閉執行緒池,不等待任務執行完 5 getTaskCount() 執行緒池已執行和未執行的任務總數 6 getCompleteTaskCount() 已完成的任務數量 7 getPoolSize() 執行緒池當前的執行緒數量 8 getActiveCount() 當前執行緒池中正在執行任務的執行緒數量
  • running:能接受新提交的任務,也能處理阻塞佇列中的任務
  • shutdown:不能處理新的任務,但是能繼續處理阻塞佇列中任務
  • stop:不能接收新的任務,也不處理佇列中的任務
  • tidying:如果所有的任務都已經終止了,這時有效執行緒數為0
  • terminated:最終狀態

4種執行緒池

1. Executors.newCachedThreadPool 建立一個可快取的執行緒池,如果執行緒池的長度超過了處理的需要,可以靈活回收空閒執行緒。如果沒有可回收的就新建執行緒。

 2.Executors.newFixedThreadPool 定長執行緒池,可以執行緒現成的最大併發數,超出在佇列等待

3.Executors.newSingleThreadExecutor 單執行緒化的執行緒池,用唯一的一個共用執行緒執行任務,保證所有任務按指定順序執行(FIFO、優先順序…)

4.Executors.newScheduledThreadPool 定長執行緒池,支援定時和週期任務執行 這是Executors 建立常用的 4種執行緒池,但是我在上面說了 需要用ThreadPoolExecutor 來建立,那麼怎麼辦呢? 那我們一起看一下原始碼把 1. Executors.newCachedThreadPool 
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();

原始碼:
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

 2.Executors.newFixedThreadPool 

ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);  public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

3.Executors.newSingleThreadExecutor 

ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();

原始碼:
  public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

4.Executors.newScheduledThreadPool 

 public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {//此處super指的是ThreadPoolExecutor
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,  
              new DelayedWorkQueue(), threadFactory);
    }

 ScheduledExecutorService提供了三種方法可以使用: 

 

scheduleAtFixedRate:以指定的速率執行任務 scheduleWithFixedDelay:以指定的延遲執行任務 舉例:

executorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        log.warn("schedule run");
    }
}, 1, 3, TimeUnit.SECONDS);//延遲一秒後每隔3秒執行

小擴充套件:延遲執行任務的操作,java中還有Timer類同樣可以實現

Timer timer = new Timer();
timer.schedule(new TimerTask() {
    @Override
    public void run() {
        log.warn("timer run");
    }
}, new Date(), 5 * 1000);