1. 程式人生 > >高並發第十四彈:線程池的介紹及使用

高並發第十四彈:線程池的介紹及使用

簡單 idt brush nano 減少 線程池大小 core mit rmi

單線程就不說了因為簡單,並且 在實際的生產環境中一般必須來說 線程資源都是由線程池提供線程資源的。

線程池的好處

  • 重用存在的線程,減少對象創建、消亡的開銷,性能好
  • 可有效控制最大並發線程數,提高系統資源利用率,同時可以避免過多資源競爭,避免阻塞。
  • 提供定時執行、定期執行、單線程、並發數控制等功能。

技術分享圖片

在線程池的類圖中,我們最常使用的是最下邊的Executors,用它來創建線程池使用線程。那麽在上邊的類圖中,包含了一個Executor框架,它是一個根據一組執行策略的調用調度執行和控制異步任務的框架,目的是提供一種將任務提交與任務如何運行分離開的機制。它包含了三個executor接口:

  • Executor:運行新任務的簡單接口
  • ExecutorService:擴展了Executor,添加了用來管理執行器生命周期和任務生命周期的方法
  • ScheduleExcutorService:擴展了ExecutorService,支持Future和定期執行任務

來說一下實際應用:

【強制】線程池不允許使用 Executors去創建,而是通過 ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。說明:Executors返回的線程池對象的弊端如下:

1)FixedThreadPool和 SingleThreadPool:

  允許的請求隊列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM。

2)CachedThreadPool和 ScheduledThreadPool:

  允許的創建線程數量為 Integer.MAX_VALUE,可能會創建大量的線程,從而導致 OOM

線程池核心類-ThreadPoolExecutor

參數說明:ThreadPoolExecutor一共有七個參數,這七個參數配合起來,構成了線程池強大的功能。

  • corePoolSize:核心線程數量
  • maximumPoolSize:線程最大線程數
  • workQueue:阻塞隊列,存儲等待執行的任務,很重要,會對線程池運行過程產生重大影響
當我們提交一個新的任務到線程池,線程池會根據當前池中正在運行的線程數量來決定該任務的處理方式。處理方式有三種:
1、直接切換(SynchronusQueue)
2、無界隊列(LinkedBlockingQueue)能夠創建的最大線程數為corePoolSize,這時maximumPoolSize就不會起作用了。當線程池中所有的核心線程都是運行狀態的時候,新的任務提交就會放入等待隊列中。
3、有界隊列(ArrayBlockingQueue)最大maximumPoolSize,能夠降低資源消耗,但是這種方式使得線程池對線程調度變的更困難。因為線程池與隊列容量都是有限的。所以想讓線程池的吞吐率和處理任務達到一個合理的範圍,又想使我們的線程調度相對簡單,並且還盡可能降低資源的消耗,我們就需要合理的限制這兩個數量
分配技巧: [如果想降低資源的消耗包括降低cpu使用率、操作系統資源的消耗、上下文切換的開銷等等,可以設置一個較大的隊列容量和較小的線程池容量,這樣會降低線程池的吞吐量。如果我們提交的任務經常發生阻塞,我們可以調整maximumPoolSize。如果我們的隊列容量較小,我們需要把線程池大小設置的大一些,這樣cpu的使用率相對來說會高一些。但是如果線程池的容量設置的過大,提高任務的數量過多的時候,並發量會增加,那麽線程之間的調度就是一個需要考慮的問題。這樣反而可能會降低處理任務的吞吐量。
  • keepAliveTime:線程沒有任務執行時最多保持多久時間終止(當線程中的線程數量大於corePoolSize的時候,如果這時沒有新的任務提交核心線程外的線程不會立即銷毀,而是等待,直到超過keepAliveTime)
  • unit:keepAliveTime的時間單位
  • threadFactory:線程工廠,用來創建線程,有一個默認的工場來創建線程,這樣新創建出來的線程有相同的優先級,是非守護線程、設置好了名稱)技術分享圖片
corePoolSize、maximumPoolSize、workQueue 三者關系:如果運行的線程數小於corePoolSize的時候,直接創建新線程來處理任務。即使線程池中的其他線程是空閑的。如果運行中的線程數大於corePoolSize且小於maximumPoolSize時,那麽只有當workQueue滿的時候才創建新的線程去處理任務。如果corePoolSize與maximumPoolSize是相同的,那麽創建的線程池大小是固定的。這時有新任務提交,當workQueue未滿時,就把請求放入workQueue中。等待空線程從workQueue取出任務。如果workQueue此時也滿了,那麽就使用另外的拒絕策略參數去執行拒絕策略。
初始化方法:由七個參數組合成四個初始化方法 技術分享圖片 其他方法: 1 execute() 提交任務,交給線程池執行 2 submit() 提交任務,能夠返回執行結果 execute+Future 3 shutdown() 關閉線程池,等待任務都執行完 4 shutdownNow() 關閉線程池,不等待任務執行完 5 getTaskCount() 線程池已執行和未執行的任務總數 6 getCompleteTaskCount() 已完成的任務數量 7 getPoolSize() 線程池當前的線程數量 8 getActiveCount() 當前線程池中正在執行任務的線程數量 技術分享圖片
  • running:能接受新提交的任務,也能處理阻塞隊列中的任務
  • shutdown:不能處理新的任務,但是能繼續處理阻塞隊列中任務
  • stop:不能接收新的任務,也不處理隊列中的任務
  • tidying:如果所有的任務都已經終止了,這時有效線程數為0
  • terminated:最終狀態
技術分享圖片

4種線程池

1. Executors.newCachedThreadPool
創建一個可緩存的線程池,如果線程池的長度超過了處理的需要,可以靈活回收空閑線程。如果沒有可回收的就新建線程。

2.Executors.newFixedThreadPool
定長線程池,可以線程現成的最大並發數,超出在隊列等待

3.Executors.newSingleThreadExecutor
單線程化的線程池,用唯一的一個共用線程執行任務,保證所有任務按指定順序執行(FIFO、優先級…)

4.Executors.newScheduledThreadPool
定長線程池,支持定時和周期任務執行 這是Executors 創建常用的 4種線程池,但是我在上面說了 需要用ThreadPoolExecutor 來創建,那麽怎麽辦呢? 那我們一起看一下源碼把 1. Executors.newCachedThreadPool
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();

源碼:
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

2.Executors.newFixedThreadPool

ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);

public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }

3.Executors.newSingleThreadExecutor

ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();

源碼:
  public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

4.Executors.newScheduledThreadPool

 public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
//此處super指的是ThreadPoolExecutor super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); }

 ScheduledExecutorService提供了三種方法可以使用:

技術分享圖片 

scheduleAtFixedRate:以指定的速率執行任務
scheduleWithFixedDelay:以指定的延遲執行任務
舉例:

executorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        log.warn("schedule run");
    }
}, 1, 3, TimeUnit.SECONDS);//延遲一秒後每隔3秒執行

小擴展:延遲執行任務的操作,java中還有Timer類同樣可以實現

Timer timer = new Timer();
timer.schedule(new TimerTask() {
    @Override
    public void run() {
        log.warn("timer run");
    }
}, new Date(), 5 * 1000);

高並發第十四彈:線程池的介紹及使用