1. 程式人生 > >執行緒池的優雅關閉實踐

執行緒池的優雅關閉實踐

原文:https://www.jianshu.com/p/bdf06e2c1541

平時開發中,大家更多的關注的是執行緒池的建立、任務的提交和執行。往往會忽略執行緒池的關閉,甚至忘記呼叫shutdown()方法,導致記憶體溢位。大多知道需要呼叫shutdown()關閉執行緒池,也少研究其真正的關閉過程。

首先看原始碼中的一句註釋:

A pool that is no longer referenced in a program and has no remaining threads will be shutdown automatically.
如果程式中不再持有執行緒池的引用,並且執行緒池中沒有執行緒時,執行緒池將會自動關閉。

執行緒池自動關閉的兩個條件:1、執行緒池的引用不可達;2、執行緒池中沒有執行緒;

這裡對於條件2解釋一下,執行緒池中沒有執行緒是指執行緒池中的所有執行緒都已執行完自動消亡。然而我們常用的FixedThreadPool的核心執行緒沒有超時策略,所以並不會自動關閉。

展示兩種不同執行緒池 不關閉 的情況:

1、FixedThreadPool 示例

public static void main(String[] args) {
    while(true) {
        ExecutorService executorService = Executors.newFixedThreadPool(8);
        executorService.execute(() -> System.out.println("running"));
        executorService = null;
    }
}

輸出結果:

running
......
running
Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread
    at java.lang.Thread.start0(Native Method)
    at java.lang.Thread.start(Thread.java:714)
    at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:950)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1357)
    at test.PoolTest.main(PoolTest.java:29)

因為FixedThreadPool的核心執行緒不會自動超時關閉,使用時必須在適當的時候呼叫shutdown()方法。

2、 CachedThreadPool 示例

public static void main(String[] args) {
    while(true) {
        // 預設keepAliveTime為 60s
        ExecutorService executorService = Executors.newCachedThreadPool(); 
        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;
        // 為了更好的模擬,動態修改為1納秒
        threadPoolExecutor.setKeepAliveTime(1, TimeUnit.NANOSECONDS);
        threadPoolExecutor.execute(() -> System.out.println("running"));
    }
}

輸出結果:

running
running
running
running
running
......

CachedThreadPool 的執行緒 keepAliveTime 預設為 60s ,核心執行緒數量為 0 ,所以不會有核心執行緒存活阻止執行緒池自動關閉。 詳見 執行緒池之ThreadPoolExecutor構造 ,為了更快的模擬,構造後將 keepAliveTime 修改為1納秒,相當於執行緒執行完馬上會消亡,所以執行緒池可以被回收。實際開發中,如果CachedThreadPool 確實忘記關閉,在一定時間後是可以被回收的。但仍然建議顯示關閉。

然而,執行緒池關閉的意義不僅僅在於結束執行緒執行,避免記憶體溢位,因為大多使用的場景並非上述示例那樣 朝生夕死。執行緒池一般是持續工作的全域性場景,如資料庫連線池。

本文更多要討論的是當執行緒池呼叫shutdown方法後,會經歷些什麼?思考一下幾個問題:

  1. 是否可以繼續接受新任務?繼續提交新任務會怎樣?
  2. 等待佇列裡的任務是否還會執行?
  3. 正在執行的任務是否會立即中斷?

問題1:是否可以繼續接受新任務?繼續提交新任務會怎樣?

public static void main(String[] args) {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
    executor.execute(() -> System.out.println("before shutdown"));
    executor.shutdown();
    executor.execute(() -> System.out.println("after shutdown"));
}

輸出結果如下:

before shutdown
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task PoolTest$$Lambda$2/[email protected] rejected from [email protected][Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    at PoolTest.main(PoolTest.java:12)

當執行緒池關閉後,繼續提交新任務會丟擲異常。這句話也不夠準確,不一定是丟擲異常,而是執行拒絕策略,預設的拒絕策略是丟擲異常。可參見 執行緒池之ThreadPoolExecutor構造 裡面自定義執行緒池的例子,自定義了忽略策略,但被拒絕時並沒有丟擲異常。

問題2:等待佇列裡的任務是否還會執行?

public class WaitqueueTest {
    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
        for(int i = 1; i <= 100 ; i++){
            workQueue.add(new Task(String.valueOf(i)));
        }
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, workQueue);
        executor.execute(new Task("0"));
        executor.shutdown();
        System.out.println("workQueue size = " + workQueue.size() + " after shutdown");
    }
    
    static class Task implements Runnable{
        String name;
        
        public Task(String name) {
            this.name = name;
        }
        
        @Override
        public void run() {
            for(int i = 1; i <= 10; i++){
                System.out.println("task " + name + " is running");
            }
            System.out.println("task " + name + " is over");
        }
    }
}

這個demo解釋一下,我們用LinkedBlockingQueue構造了一個執行緒池,線上程池啟動前,我們先將工作佇列填充100個任務,然後執行task 0 後立即shutdown()執行緒池,來驗證執行緒池關閉佇列的任務執行狀態。
輸出結果如下:

......
task 0 is running
task 0 is over
workQueue size = 100 after shutdown //表示執行緒池關閉後,佇列任然有100個任務
task 1 is running
......
task 100 is running
task 100 is over

從結果中我們可以看到,執行緒池雖然關閉,但是佇列中的任務任然繼續執行,所以用 shutdown()方式關閉執行緒池時需要考慮是否是你想要的效果。

如果你希望執行緒池中的等待佇列中的任務不繼續執行,可以使用shutdownNow()方法,將上述程式碼進行調整,如下:

public class WaitqueueTest {
    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
        for(int i = 1; i <= 100 ; i++){
            workQueue.add(new Task(String.valueOf(i)));
        }
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, workQueue);
        executor.execute(new Task("0"));
        // shutdownNow有返回值,返回被拋棄的任務list
        List<Runnable> dropList = executor.shutdownNow();
        System.out.println("workQueue size = " + workQueue.size() + " after shutdown");
        System.out.println("dropList size = " + dropList.size());
    }
    
    static class Task implements Runnable{
        String name;
        
        public Task(String name) {
            this.name = name;
        }
        
        @Override
        public void run() {
            for(int i = 1; i <= 10; i++){
                System.out.println("task " + name + " is running");
            }
            System.out.println("task " + name + " is over");
        }
    }
}

輸出結果如下:

task 0 is running
workQueue size = 0 after shutdown
task 0 is running
task 0 is running
task 0 is running
task 0 is running
task 0 is running
task 0 is running
task 0 is running
task 0 is running
task 0 is running
dropList size = 100
task 0 is over

從上述輸出可以看到,只有任務0執行完畢,其他任務都被drop掉了,dropList的size為100。通過dropList我們可以對未處理的任務進行進一步的處理,如log記錄,轉發等;

問題3:正在執行的任務是否會立即中斷?

要驗證這個問題,需要對執行緒的 interrupt 方法有一定了解。


推薦閱讀 ——執行緒中斷機制
關於 interrupt 方法:
首先,一個執行緒不應該由其他執行緒來強制中斷或停止,而是應該由執行緒自己自行停止。
所以,Thread.stop, Thread.suspend, Thread.resume 都已經被廢棄了。
而 Thread.interrupt 的作用其實也不是中斷執行緒,而是「通知執行緒應該中斷了」,具體到底中斷還是繼續執行,應該由被通知的執行緒自己處理。
具體來說,當對一個執行緒,呼叫 interrupt() 時,
① 如果執行緒處於被阻塞狀態(例如處於sleep, wait, join 等狀態),那麼執行緒將立即退出被阻塞狀態,並丟擲一個InterruptedException異常。僅此而已。
② 如果執行緒處於正常活動狀態,那麼會將該執行緒的中斷標誌設定為 true,僅此而已。被設定中斷標誌的執行緒將繼續正常執行,不受影響。
interrupt() 並不能真正的中斷執行緒,需要被呼叫的執行緒自己進行配合才行。也就是說,一個執行緒如果有被中斷的需求,那麼就可以這樣做。
① 在正常執行任務時,經常檢查本執行緒的中斷標誌位,如果被設定了中斷標誌就自行停止執行緒。
② 在呼叫阻塞方法時正確處理InterruptedException異常。(例如,catch異常後就結束執行緒。)


public class InteruptTest {

    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        executor.execute(new Task("0"));
        Thread.sleep(1);
        executor.shutdown();
        System.out.println("executor has been shutdown");
    }

    static class Task implements Runnable {
        String name;

        public Task(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            
            for (int i = 1; i <= 100 && !Thread.interrupted(); i++) {
                Thread.yield();
                System.out.println("task " + name + " is running, round " + i);
            }
            
        }
    }
}

輸出結果如下:

task 0 is running, round 1
task 0 is running, round 2
task 0 is running, round 3
......
task 0 is running, round 28
executor has been shutdown
......
task 0 is running, round 99
task 0 is running, round 100

為了體現在任務執行中打斷,在主執行緒進行短暫 sleep , task 中 呼叫 Thread.yield() ,出讓時間片。從結果中可以看到,執行緒池被關閉後,正則執行的任務沒有被 interrupt。說明shutdown()方法不會 interrupt 執行中執行緒。再將其改修改為shutdownNow() 後輸出結果如下:

task 0 is running, round 1
task 0 is running, round 2
......
task 0 is running, round 56
task 0 is running, round 57
task 0 is running, round 58
task 0 is running, round 59
executor has been shutdown

修改為shutdownNow() 後,task任務沒有執行完,執行到中間的時候就被 interrupt 後沒有繼續執行了。

總結,想要正確的關閉執行緒池,並不是簡單的呼叫shutdown方法那麼簡單,要考慮到應用場景的需求,如何拒絕新來的請求任務?如何處理等待佇列中的任務?如何處理正在執行的任務?想好這幾個問題,在確定如何優雅而正確的關閉執行緒池。

PS:執行緒被 interrupt 後,需要再run方法中單獨處理 interrupted 狀態,interrupt 更類似一個標誌位,不會直接打斷執行緒的執行。



作者:徐志毅
連結:https://www.jianshu.com/p/bdf06e2c1541
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯絡作者獲得授權並註明出處。