1. 程式人生 > >從零開始學多執行緒之自定義配置執行緒池(七)

從零開始學多執行緒之自定義配置執行緒池(七)

等待其他資源,可能會產生執行緒飢餓死鎖

線上程池中如果一個任務依賴於其它任務的執行,就可能產生死鎖.在一個單執行緒化的Executor中,提交兩個任務,任務二滯留在工作佇列中等待第一個任務完成,但是第一個任務不會完成,因為它在等待第二個任務的完成(需要第二個任務執行的結果進行運算),這就會發生死鎖.

在一個大的執行緒池中,如果所有執行緒執行的任務都阻塞線上程池中,等待著仍然處於同一工作佇列中的其它任務,那麼會發生同樣的問題.這被稱作執行緒飢餓死鎖(thread starvation deadlock)

產生死鎖的情況: 只要池任務開始了無限期的阻塞,其目的是等待一些資源或條件,此時只有另一個池任務的活動才能使那些條件成立,比如等待返回值.除非你能保證這個池足夠大,否則會產生執行緒飢餓死鎖.

池任務等待另一個池任務的結果,可能會發生死鎖:

public class ThreadDeadLock {
     ExecutorService exec = Executors.newSingleThreadExecutor();



   public  class Task implements Callable{

        @Override
        public Object call() throws Exception {
            //等待另一個池任務的結果
            Future<String> future1 = exec.submit(new LockTask());
            Future<String> future2 = exec.submit(new LockTask());
            //可能發生死鎖
            return future1.get()+future2.get();

        }
    }

    
無論何時,你提交了一個非獨立的Executor任務,要明確出現執行緒飢餓死鎖的可能性,並且,在程式碼或者配置檔案以及其他可以配置Executor的地方,任何有關池的大小和配置約束都要寫入文件

耗時的任務,設定超時時間

如果你的執行緒池不夠大,又有很多耗時的任務,這會影響服務的響應性.這時候你可以限定任務等待資源的時間,而不是無限制地等下去.

耗時的任務可能會死鎖或者響應的很慢

大多數平臺類庫中的阻塞方法,都有限時和非限時兩個版本.例如Blocking.put.如果超時你可以把任務標記為失敗,終止或者把他重新返回佇列,準備之後執行.這樣無論每個任務的最終結果是成功還是失敗,都保證了任務會向前發展,這樣可以更快地將執行緒從任務中解放出來

.(如果執行緒池頻頻被阻塞的任務充滿,這同樣可能是池太小的一個訊號).

定製執行緒池的大小

不要硬編碼執行緒池的大小

執行緒池合理的長度取決於未來提交的任務型別和所部屬系統的特徵.池的長度應該由某種配置機制來提供,或者利用Runtime.availableProcessors(獲取你電腦的處理器數量),動態進行計算

執行緒池過大&過小的壞處

執行緒池過大: 執行緒對稀缺的CPU和記憶體資源的競爭,會導致記憶體的高使用量.執行緒間切換也會消耗資源

執行緒池過小:由於存在很多可用的處理器資源沒用,會對吞吐量造成損失

制定執行緒池大小依據的內容

正確的定製執行緒池的長度,需要理解你的計算環境、資源預算和任務的自身特性.

部署系統中安裝了多少個CPU?多少記憶體?任務主要執行的是計算、I/O還是一些混合操作?它們是否需要像JDBC Connection 這樣的稀缺資源?

如果你有不同類別的任務,它們擁有差別很大行為,那麼請考慮使用多個執行緒池,這樣每個執行緒池可以根據不同任務的工作負載進行調節.

計算密集型和I/O密集型的執行緒選擇

計算密集型:一直在計算,cpu利用率高,過多的執行緒沒有意義,反而切換執行緒會消耗額外的資源.

I/O密集型:例如查詢資料庫,等待資料造成的阻塞,CPU利用率低,多個執行緒可以提高響應速度.

對於計算密集型的任務,一個有N個處理器的系統通常使用一個N+1個執行緒的執行緒池來獲得最優的利用率(計算密集型的執行緒恰好在某時因為發生一個頁錯誤或者因其它原因而暫停,剛好有一個"額外"的執行緒,可以確保在這種情況下CPU週期不會中斷工作).

對於包含了I/O和其他阻塞操作的任務,不是所有的執行緒都會在所有的時間被排程,因此你需要一個更大的池.

在一個基準負載下,可以使用不同大小的執行緒池執行你的應用程式,並觀察CPU利用率的水平.

計算執行緒池大小的公式

N = CPU的數量

U = 目標CPU的使用率,介於0-1之間

W/C = 等待時間(wait)和計算時間(calculate)的比率

為保持處理器到達期望的使用率,最優的池的大小等於:

num(執行緒數) = N * U * (1 + W/C);

你可以使用Runtime來獲得CPU的數目:

int nCpus = Runtime.getRuntime().availableProcessors();

簡單的例子

通過Runtime.getRuntime().availableProcessors();得到我的電腦cpu數是4, 我期望cpu的使用率是100%,假設等待時間是10秒,計算時間是1秒.那麼我最優的池大小就是:

4 * 100% * (1+10/1) = 44

執行緒池的長度和資源池的長度互相影響

當任務需要使用池化的資源時,比如資料庫連結,執行緒池的長度和資源池的長度會互相影響.

如果每一個任務都需要一個數據庫連結,那麼連線池的大小就限制了執行緒池的有效大小;類似地,當執行緒池中的任務是連線池的唯一消費者時,那麼執行緒池的大小反而又會限制了連線池的有效大小.

配置ThreadPoolExecutor

靈活配置ThreadPoolExecutor

使用Executors工廠方法可以建立各種型別的執行緒池,newCachedThreadPool、newFixedThreadPool和newScheduledThreadExecutor等.如果這些執行策略不能滿足你的需求,你可以 new ThreadPoolExecutor(傳遞各種引數)來配置.

ThreadPoolExecutor有很多建構函式 image

最後一個建構函式是功能最全的,也是最常用的,原始碼:

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

它有五個引數分別是:

  • corePoolSize 核心池大小
  • maximumPoolSize 最大池大小
  • keepAliveTime 存活時間
  • TimeUnit 時間單元
  • BlockingQueue 工作佇列
  • ThreadFactory 執行緒工廠
  • RejectedExecutionHandler 拒絕執行處理器

注意原始碼,限定了設定這幾個值的範圍,不滿足就會報非法引數異常,當時博主就是將核心池的值設定比最大池的值大,報了這個異常,看了原始碼才曉得:

 if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();

corePoolSize(核心池大小)、maximum pool size(最大池的大小)和存活時間(keep-alive time)共同管理著執行緒的建立與銷燬.

corePoolSize: 執行緒池的實現試圖維護池的大小(Eexcutors.newSingleThreadExecutor就是一種實現);即時沒有執行任務,池的大小也等於核心池的大小,工作佇列充滿後會建立更多的執行緒.(Executors.newCacheThreadPool池的大小就是不固定的,隨著任務增減執行緒的數量).

maximumPoolSize:最大池的大小制約著執行緒池可以同時執行的的最大執行緒數(限制併發數量),如果一個執行緒閒置的時間超過了存活時間就會被回收.並且同時執行的執行緒的數量不能超過核心池大小,執行緒池會終止超過的數量.

keep alive time & TimeUniht: 存活時間保證了空閒的執行緒會被回收,這樣釋放了這部分被佔有的資源可以做更有用的事情.

兩種特殊的執行緒池實現

Executors.newFixedThreadPool和newCachedThreadPool是兩種特殊的實現.

newFixedThreadPool設定了核心池和最大池的大小,而且永遠不會超時(不會回收執行緒).

newCachedThreadPool將最大池設定為了Integer.MAX_VALUE,核心池設定為0,超時設定為一分鐘,這樣創建出來的可無限擴大的執行緒池,會在需求量減少的情況下減少執行緒數量.

不要將核心池大小設定為0

我們自己手動建立ThreadPoolExecutor的時候,不要將corePoolSize設定為0,除非你用的是SynchronousQueue佇列(newCachedThreadPool用的就是),否則佇列不充滿,就不會執行.

管理任務佇列

用定長的執行緒池去替代為每個任務都建立一個執行緒的方式,在高負載的情況下,使得程式更不容易崩潰了,暫時沒時間處理的任務會放進阻塞佇列裡,這是一個更優的方案,但是如果傳遞進來的任務超過處理的速速,程式仍然有可能崩潰.

過多的請求會使程式崩潰或者響應很慢

過多的請求會導致兩個問題:

  1. 耗盡記憶體
  2. 響應速度很慢(後請求的使用者需要等到前面的請求執行完)

佇列可以緩和上述問題.

佇列有助於緩和瞬時的任務激增,但是最終你還需要遏制請求的達到速率.

ThreadPoolExecutor的第五個引數是一個BlockingQueue阻塞佇列.佇列有三種:無限佇列、有限佇列和同步移交.佇列的選擇和很多其他的配置引數都有關係,比如池的大小.

newFixedThreadPool 和 newSingleThreadPool預設使用的是無界的LinkedBlockingQueue.任務無限多,佇列無限長.最後就是程式崩潰.

所以我們最終的選擇是使用有界佇列,例如ArrayBlockingQueue或者有限的LinkedBlockingQueue以及PriorityBlockingQueue(自定義優先順序的佇列),有界佇列滿了以後有飽和策略可以處理那些沒放進佇列中的請求.

池的大小和佇列的大小相輔相成

一個大佇列加一個小池,可以控制對記憶體和CPU的使用,可以減少上下文切換,不過會影響吞吐量.

SynchronousQueue

對於龐大或者無限的池可以使用SynchronousQueue,這不是一個真正的佇列,原來的佇列可以理解為把任務存在一個容器裡,執行緒放進去,執行緒取出來,而SynchronousQueue相當於是一個手遞手傳遞.

兩種情況可以使用SynchronousQueue:

  • 任務可以被拒絕
  • 池無限大

newCachedThreadPool使用的就是這個佇列.

newCachedThreadPool和定長執行緒池之間的選擇

如果不會因為負載過大導致程式崩潰就是用newCachedThreadPool,因為它的佇列用的是SynchronousQueue,它有更佳的吞吐量.

還有一點要特別注意,如果任務之間相互依賴(一個任務依賴於另一個任務的結果,像這樣的)最好使用newCachedThreadPool,否則可能產生執行緒飢餓死鎖.

反之,像網際網路應用程式還是應該使用定長的執行緒池.

飽和策略:處理未放入佇列的任務

ThreadPoolExecutor的第七個引數, 可以用過呼叫setRejectedExecutionHandler來修改(如果執行緒池關閉了也會用到飽和策略).

Java提供了幾種RejectedExecutionHandler實現:

  • AbortPolicy
  • CallerRunsPolicy
  • DiscardPolicy
  • DiscardOldestPolicy

預設的AbortPolicy會引起execute丟擲未檢查的RejectedExecutionException;呼叫者可以捕獲這個異常,然後編寫滿足自己需求的處理程式碼(例如:持久化這個任務,一會再執行).

DiscardPolicy策略會預設放棄這個任務;

DiscardOldestPolicy會放棄本該接下來執行的任務,同時還會嘗試去重新提交新任務.(無法和優先順序佇列同時使用,遺棄的任務是優先順序最高的)

CallerRunsPolicy(呼叫者執行)既不會丟棄任務也不會丟擲異常,它會把任務推送會呼叫者那裡,以減緩新任務流,它不會再池執行緒中執行最新提交的任務,會在呼叫執行緒池execute或submit方法的執行緒中執行.

public class ThreadPool {

    private static class  Worker implements Runnable{
        @Override
        public void run() {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+" is running");
        }
    }


    public static void main(String [] args){
        //Executors.newSingleThreadExecutor()
        ExecutorService executorService = new ThreadPoolExecutor(2, 2, 1,
                TimeUnit.MICROSECONDS, new ArrayBlockingQueue<Runnable>(2), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r);
            }
        },new ThreadPoolExecutor.CallerRunsPolicy());


        for (int i = 0; i < 5; i++) {
            executorService.submit(new Worker());
        }

    }
}

列印輸出:

main is running

Thread-1 is running

Thread-0 is running

Thread-0 is running

Thread-1 is running

證明在main方法的執行緒中執行了最新的任務.

執行100次程式碼中的迴圈會迴圈輸出,main 和 Thread-1 Thread-0,相當於加上主執行緒三個執行緒併發執行.

當所有的池執行緒都被佔用,而且工作佇列已充滿後,下一個任務會在主執行緒中執行.主執行緒執行任務的時候會花費一些時間,這時候主執行緒是不能提交任何任務的,所以這也給工作者執行緒一些時間來追趕進度.

這期間主執行緒不會呼叫accept接受新的請求,而會在TCP層的佇列中等候.如果持續高負載的話,最終會由TCP層判斷它的連結請求佇列是否已經排滿,如果已滿就開始丟棄請求任務.

當伺服器過載的時候,首先執行緒池裡的所有執行緒都忙碌了起來,然後阻塞佇列滿了,接著TCP層滿了,最終就是使用者請求失敗.

這使得伺服器在高負載下可以平緩地劣化(graceful degradation).

執行緒工廠

ThreadPoolExecutor構造方法的第六個引數.

ThreadFactory介面只有一個方法 newThread,線上程池需要建立一個新執行緒時使用的.

定製這個工廠的用處:

  • 為池執行緒指明一個UncaughtExceptionHandler(上篇部落格有解釋)
  • 例項化一個定製Thread類的例項(例如給執行緒新增一個新的名稱)
  • 修改執行緒的優先順序(不要這樣做)
  • 修改後臺狀態(不要這樣做)

自定義執行緒池:

public class CustomThreadFactory implements ThreadFactory {

    private String poolName;

    public CustomThreadFactory(String poolName) {
        this.poolName = poolName;
    }

    @Override
    public Thread newThread(Runnable r) {
        return new MyThread(r,poolName);
    }
}

自定義執行緒:

public class MyThread extends Thread {

    private final Logger log = Logger.getAnonymousLogger();

    public MyThread(Runnable target,String name) {
        super(target,name);
        //執行緒異常終止的時候會得到記錄.
        setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
            @Override
            public void uncaughtException(Thread t, Throwable e) {
                log.info("Uncaught in thread"+t.getName()+e);
            }
        });
    }

    @Override
    public void run(){
        //可以做一些額外的日誌記錄
        super.run();

    }
}

重新設定執行緒池的引數

ThreadPoolExecutor的屬性可以在建立後,通過setters方法重新設定.

如果執行緒池是通過Executors工廠方法建立的(除newSingleThreadExecutor以外),可以先轉型為ThreadPoolExecutor,然後在設定.

public class UpdateExecutor {
    public static void main(String [] args){
        Executor executor = Executors.newCachedThreadPool();
        ((ThreadPoolExecutor)executor).setCorePoolSize(111);
        
    }
}

如果你不想你的執行緒池被修改可以使用Executors.unconfigurableExecutorService()方法.使執行緒池無法被修改

Executor executor = Executors.unconfigurableExecutorService(Executors.newCachedThreadPool());

擴充套件ThreadPoolExecutor

ThreadPoolExecutor提供了幾個函式讓子類去覆寫來擴充套件ThreadPoolExecutor

  • beforeExecute
  • afterExecute
  • terminate

beforeExecute在任務執行前呼叫,afterExecute在任務執行後呼叫,可以用它們來寫日誌.

無論任務是正常地從run返回,還是丟擲一個異常,afterExecutor都會被呼叫(如果任務完成後丟擲一個Error,afterExecute不會被呼叫).如果beforeExecutor丟擲一個RuntimeException,任務不會被執行,afterExecutor也不會被呼叫.

terminated會線上程池關閉後呼叫.也就是當所有任務都已完成並且所有工作者執行緒都已經關閉後,會執行terminated.

示例:

public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
    public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        System.out.println("beforeExecute方法執行了");
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        System.out.println("afterExecute方法執行了");
    }


    @Override
    protected void terminated() {
        System.out.println("terminated方法執行了");
    }

    public static void main(String[] args) throws InterruptedException {
        CustomThreadPoolExecutor customThreadPoolExecutor = new CustomThreadPoolExecutor(1,
                1, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3));

                customThreadPoolExecutor.submit(()-> System.out.println("任務執行了"));

                customThreadPoolExecutor.shutdown();
                customThreadPoolExecutor.awaitTermination(1,TimeUnit.SECONDS);
    }

}

輸出:
beforeExecute方法執行了
任務執行了
afterExecute方法執行了
terminated方法執行了

總結

對於併發執行的任務,Executor框架是強大且靈活的.它提供了大量可調節的選項,比如建立和關閉執行緒的策略,處理佇列任務的策略,並且提供了幾個鉤子函式用於擴充套件它的行為.然而,和大多數的框架一樣,草率地將一些設定組合在一起,並不能很好地工作;一些型別的任務需要特定的執行策略,而一些引數組合在一起後可能產生意外的後果.

下一篇會更新關於死鎖的部落格.

再見.