1. 程式人生 > >Java並發編程(7)- 線程調度 - 線程池

Java並發編程(7)- 線程調度 - 線程池

car 並且執行 創建線程 atomic 介紹 狀態 edr 關閉 imp

線程池

平時有接觸過多線程開發的小夥伴們應該都或多或少都有了解、使用過線程池,而《阿裏巴巴 Java 手冊》裏也有一條規範:
技術分享圖片

由此可見線程池的重要性,線程池對於限制應用程序中同一時刻運行的線程數很有用。因為每啟動一個新線程都會有相應的性能開銷,每個線程都需要給棧分配一些內存等等。

我們可以把並發執行的任務傳遞給一個線程池,來替代為每個並發執行的任務都啟動一個新的線程。只要池裏有空閑的線程,任務就會分配給一個線程執行。在線程池的內部,任務被插入一個阻塞隊列(Blocking Queue ),線程池裏的線程會去取這個隊列裏的任務。當一個新任務插入隊列時,一個空閑線程就會成功的從隊列中取出任務並且執行它。

線程池經常應用在多線程服務器上。每個通過網絡到達服務器的連接都被包裝成一個任務並且傳遞給線程池。線程池的線程會並發的處理連接上的請求。

簡單來說使用線程池有以下幾個目的:

  • 線程是稀缺資源,不能頻繁的創建。應當將其放入一個池子中,可以給其他任務進行復用,減少對象創建、消亡的開銷,性能好
  • 解耦作用;線程的創建於執行完全分開,方便維護。
  • 線程池可有效控制最大並發線程數,提高系統資源利用率,同時可以避免過多資源競爭,避免阻塞
  • 線程池可提供定時執行、定期執行、單線程以及並發數控制等功能

直接new Thread的弊端:

  • 每次new Thread 新建對象,性能差
  • 線程缺乏統一管理,可能無限制的新建線程,相互競爭,常常會出現占用過多的系統資源導致死機或者發生OOM(out of memory 內存溢出),這種問題的原因不是因為單純的new一個Thread,而是可能因為程序的bug或者設計上的缺陷導致不斷new Thread造成的。
  • 缺少更多功能,如更多執行、定期執行、線程中斷等

線程池原理:

  • 談到線程池就會想到池化技術,其中最核心的思想就是把寶貴的資源放到一個池子中;每次使用都從裏面獲取,用完之後又放回池子供其他人使用,有點吃大鍋飯的意思。

線程池類圖:
技術分享圖片

在上邊的類圖中,最上層就是Executor框架,它是一個根據一組執行策略的調用調度執行和控制異步任務的框架,目的是提供一種將任務提交與任務如何運行分離開的機制。它包含了三個executor接口:

  • Executor:運行新任務的簡單接口
  • ExecutorService:擴展了Executor,添加了用來管理執行器生命周期和任務生命周期的方法
  • ScheduledExecutorService:擴展了ExecutorService,支持Future和定期執行任務

在類圖中,我們最常使用的是ThreadPoolExecutor和Executors,這兩個類都可以創建線程池,其中ThreadPoolExecutor是可定制化的去創建線程池,而Executors則屬於是工具類,該類中已經封裝好了一些創建線程池的方法,直接調用相應的方法即可創建線程。

但《阿裏巴巴 Java 手冊》裏有一條規範指明不允許使用Executors創建線程池,具體如下:
技術分享圖片

可以說線程池體系裏最為核心的類是ThreadPoolExecutor,也是功能最強的,ThreadPoolExecutor共有四個構造函數,如下:
技術分享圖片

線程池參數

其中最多可傳入七個參數,這七個參數配合起來,構成了線程池強大的功能。參數說明:

corePoolSize:核心線程數量

maximumPoolSize:線程最大線程數

workQueue:阻塞隊列,存儲等待執行的任務,很重要,會對線程池運行過程產生重大影響

keepAliveTime:線程沒有任務執行時最多保持多久時間終止(當線程中的線程數量大於corePoolSize的時候,如果這時沒有新的任務提交核心線程外的線程不會立即銷毀,而是等待,直到等待的時間超過keepAliveTime)

unit:keepAliveTime的時間單位

threadFactory:線程工廠,用來創建線程,若不設置則使用默認的工廠來創建線程,這樣新創建出來的線程會具有相同的優先級,並且是非守護的線程,同時也會設置好名稱

rejectHandler:當拒絕處理任務時(阻塞隊列滿)的策略(AbortPolicy默認策略直接拋出異常、CallerRunsPolicy用調用者所在的線程執行任務、DiscardOldestPolicy丟棄隊列中最靠前的任務並執行當前任務、DiscardPolicy直接丟棄當前任務)

拒絕策略的實現類都在TreadPoolExecutor中:
技術分享圖片

我們來說一下其中corePoolSize、maximumPoolSize、workQueue 這三個參數的關系:

如果運行的線程數量小於corePoolSize的時候,直接創建新線程來處理任務。即使線程池中的其他線程是空閑的。如果線程池中的線程數量大於corePoolSize且小於maximumPoolSize時,那麽只有當workQueue滿的時候才創建新的線程去處理任務。如果corePoolSize與maximumPoolSize是相同的,那麽創建的線程池大小是固定的。這時如果有新任務提交,且workQueue未滿時,就把請求放入workQueue中,等待空閑線程從workQueue取出任務進行處理。如果需要運行的線程數量大於maximumPoolSize時,並且此時workQueue也滿了,那麽就使用rejectHandler參數所指定的拒絕策略去進行處理。

然後我們來具體介紹一下 workQueue, 它是保存待執行任務的一個阻塞隊列,當我們提交一個新的任務到線程池後,線程池會根據當前池中正在運行的線程數量來決定該任務的處理方式。處理方式總共有三種:

1、直接切換(SynchronusQueue)

2、×××隊列(LinkedBlockingQueue),若使用該隊列,那麽線程池中能夠創建的最大線程數為corePoolSize,這時maximumPoolSize就不會起作用了。當線程池中所有的核心線程都是運行狀態的時候,新的任務提交就會放入等待隊列中。

3、有界隊列(ArrayBlockingQueue),使用該隊列可以將線程池中的最大線程數量限制為maximumPoolSize參數所指定的值,這種方式能夠降低資源消耗,但是這種方式使得線程池對線程調度變的更困難。因為此時線程池與隊列容量都是有限的了,所以想讓線程池處理任務的吞吐率達到一個合理的範圍,又想使我們的線程調度相對簡單,並且還盡可能降低線程池對資源的消耗,那麽我們就需要合理的設置corePoolSize和maximumPoolSize這兩個參數的值

分配技巧: 如果想降低資源的消耗包括降低cpu使用率、操作系統資源的消耗、上下文切換的開銷等等,可以設置一個較大的隊列容量和較小的線程池容量,這樣會降低線程池處理任務的吞吐量。如果我們提交的任務經常發生阻塞,我們可以考慮調用相關方法調整maximumPoolSize參數的值。如果我們的隊列容量較小,通常需要把線程池的容量設置得大一些,這樣cpu的使用率相對來說會高一些。但是如果線程池的容量設置的過大,提高任務的數量過多的時候,並發量會增加,那麽線程之間的調度就是一個需要考慮的問題,這樣反而可能會降低處理任務的吞吐量。


線程池狀態

線程池有五種狀態,線程池狀態轉換過程圖如下:
技術分享圖片

  • running:運行狀態,能接受新提交的任務,也能處理阻塞隊列中的任務
  • shutdown:關閉狀態,不能處理新的任務,但卻可以繼續處理阻塞隊列中已保存的任務。在線程池處於 RUNNING 狀態時,調用 shutdown()方法會使線程池進入到該狀態。(finalize() 方法在執行過程中也會調用shutdown()方法進入該狀態);
  • stop:停止狀態,不能接受新任務,也不處理隊列中的任務,會中斷正在處理任務的線程。在線程池處於 RUNNING 或 SHUTDOWN 狀態時,調用 shutdownNow() 方法會使線程池進入到該狀態;
  • tidying:如果所有的任務都已終止了,workerCount (有效線程數) 為0,線程池進入該狀態後會調用 terminated() 方法進入TERMINATED 狀態。
  • terminated:最終狀態,在terminated() 方法執行完後進入該狀態,默認terminated()方法中什麽也沒有做。

線程池常用方法:

方法名 描述
execute() 提交任務,交給線程池執行
submit() 提交任務,能夠返回執行結果 execute+Future
shutdown() 關閉線程池,等待任務都執行完
shutdownNow() 立刻關閉線程池,不等待任務執行完
getTaskCount() 線程池已執行和未執行的任務總數
getCompleteTaskCount() 已完成的任務數量
getPoolSize() 線程池當前的線程數量
getActiveCount() 當前線程池中正在執行任務的線程數量

使用Executors創建線程池

上文中我們提到了可以使用Executors工具類方便的創建線程,該類中提供了四種創建線程池的方法,如下:

方法名 描述
newCachedThreadPool 創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程
newFixedThreadPool 創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待
newScheduledThreadPool 創建一個定長線程池,支持定時及周期性任務執行
newSingleThreadExecutor 創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行

newCachedThreadPool使用示例:

@Slf4j
public class ThreadPoolExample1 {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        // 若需使用ThreadPoolExecutor裏的方法,則需要進行強轉
//        ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int index = i;
            executorService.execute(() -> log.info("task: {}", index));
        }
        executorService.shutdown();
    }
}

newFixedThreadPool使用示例:

@Slf4j
public class ThreadPoolExample2 {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        for (int i = 0; i < 10; i++) {
            final int index = i;
            executorService.execute(() -> log.info("task: {}", index));
        }
        executorService.shutdown();
    }
}

newSingleThreadExecutor使用示例:

@Slf4j
public class ThreadPoolExample3 {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        for (int i = 0; i < 10; i++) {
            final int index = i;
            executorService.execute(() -> log.info("task: {}", index));
        }
        executorService.shutdown();
    }
}

newScheduledThreadPool使用示例:

@Slf4j
public class ThreadPoolExample4 {
    public static void main(String[] args) {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);

        // 延遲3秒執行
        executorService.schedule(() -> log.info("Scheduled run"), 3, TimeUnit.SECONDS);

        // 以指定的速率執行任務,這裏是每隔3秒執行一次任務
        executorService.scheduleAtFixedRate(() -> log.info("Scheduled run"), 1, 3, TimeUnit.SECONDS);

        // 以指定的延遲執行任務,這裏是延遲3秒執行一次任務,使用起來和scheduleAtFixedRate基本一樣
        executorService.scheduleWithFixedDelay(() -> log.info("Scheduled run"), 1, 3, TimeUnit.SECONDS);

        executorService.shutdown();
    }
}

關於延遲執行任務的操作,在Java中還可以使用Timer類進行實現,如下:

@Slf4j
public class ThreadPoolExample4 {
    public static void main(String[] args) {
        Timer timer = new Timer();
        // 每隔3秒執行一次任務
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                log.info("timer task run");
            }
        }, new Date(), 3000);
    }
}

雖然可行,但是並不建議這麽使用,在多線程並行處理定時任務時,Timer運行多個TimeTask的話,只要其中之一沒有捕獲拋出的異常,其它任務便會自動終止運行,使用ScheduledExecutorService則沒有這個問題。


使用ThreadPoolExecutor創建線程池

之前我們提到了,不建議使用Executors來創建線程池,而是使用ThreadPoolExecutor進行創建。實際上Executors裏創建的也就是ThreadPoolExecutor的實例,具體的看一下Executors類的源碼就知道了。

接下來用一個例子演示一下如何通過ThreadPoolExecutor來創建線程池,這裏使用7個參數的構造函數,示例代碼如下:

package org.zero.concurrency.demo.example.threadpool;

import lombok.extern.slf4j.Slf4j;
import org.springframework.lang.NonNull;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @program: concurrency-demo
 * @description: ThreadPoolExecutor使用示例
 * @author: 01
 * @create: 2018-10-20 16:35
 **/
@Slf4j
public class ThreadPoolExample6 {
    public static void main(String[] args) {
        // 使用ArrayBlockingQueue作為其等待隊列
        BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(5);
        // 使用自定義的ThreadFactory,目的是設置有意義的的線程名字,方便出錯時回溯
        ThreadFactory namedThreadFactory = new MyThreadFactory("test-thread");

        // 創建線程池
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
                3, 5, 1, TimeUnit.MINUTES, blockingQueue, namedThreadFactory,
                new ThreadPoolExecutor.AbortPolicy());

        // 執行任務
        poolExecutor.execute(() -> log.info("thread run"));

        // 關閉線程池
        poolExecutor.shutdown();
    }

    private static class MyThreadFactory implements ThreadFactory {
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        private MyThreadFactory(String namePrefix) {
            this.namePrefix = namePrefix + "-";
        }

        @Override
        public Thread newThread(@NonNull Runnable r) {
            Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
            if (t.isDaemon()) {
                t.setDaemon(true);
            }
            if (t.getPriority() != Thread.NORM_PRIORITY) {
                t.setPriority(Thread.NORM_PRIORITY);
            }
            return t;
        }
    }
}

線程池的創建先介紹到這,其實大部分的創建方式可以參考Executors類的源碼,所以這裏就不贅述了。

線程池的合理配置:

  • CPU密集型任務,就需要盡量壓榨CPU,參考值可以設置為NCPU+1,即CPU核心數量+1
  • IO密集型任務,參考值可以設置為2*NCPU,即CPU核心數量的2倍

最後需要說一句,線程池雖好但並非放之四海皆準,我們應當結合實際業務場景去考慮是否使用線程池。例如當線程池內需要執行的任務很小,小到執行任務的時間和任務調度的時間很接近,這時若使用線程池反而會更慢,因為任務調度和任務管理是需要耗時的。

Java並發編程(7)- 線程調度 - 線程池