1. 程式人生 > >線程池異常處理之重啟線程處理任務

線程池異常處理之重啟線程處理任務

啟動 環長 topic 過程 pre 設置 消費 execution 記錄

線程池異常處理之重啟線程處理任務

本文記錄一下在使用線程池過程中,如何處理 while(true)循環長期運行的任務,在業務處理邏輯中,如果拋出了運行時異常時怎樣重新提交任務。

這種情形在Kafka消費者中遇到,當為每個Consumer開啟一個線程時, 在線程的run方法中會有while(true)循環中消費Topic數據。

本文會借助Google Guava包中的com.google.common.util.concurrent.ThreadFactoryBuilder類創建線程工廠,因為它能不僅很方便地為線程池設置一個易讀的名稱,而且很方便地設置線程執行過程中出現異常時 用來處理異常的 異常處理器

,示例如下:

 MyExceptionHandler exceptionHandler = new MyExceptionHandler();
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("test-%d")
                .setUncaughtExceptionHandler(exceptionHandler).build();

當線程執行過程中出現了異常,MyExceptionHandler#uncaughtException(...)方法就會由JVM調用。在java.lang.ThreadGroup#uncaughtException

方法註釋提到:由於每個線程都隸屬於某個線程組,如果該線程所屬的線程組有父線程組,則調用父線程組中指定的異常處理器;若沒有父線程組,則判斷 有沒有 為線程自定義 異常處理器,而在本文中,定義了自己的異常處理器:MyExceptionHandler,因此線程執行異常時就會調用MyExceptionHandler#uncaughtException(...)

創建好了線程工廠,接下來就是創建線程池了。

CustomThreadPoolExecutor threadPoolExecutor = new CustomThreadPoolExecutor(1, 2, 1, TimeUnit.HOURS, taskQueue,threadFactory);

CustomThreadPoolExecutor 繼承ThreadPoolExecutor擴展線程池的功能:若線程執行某任務失敗時 需要重新提交該任務,可以重寫CustomThreadPoolExecutor#afterExecute方法,在該方法中實現提交任務。

public class CustomThreadPoolExecutor extends ThreadPoolExecutor {

    public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        //若線程執行某任務失敗了,重新提交該任務
        if (t != null) {
            Runnable task =  r;
            System.out.println("restart task...");
            execute(task);
        }
    }
}

如果在new ThreadPoolExecutor時未傳入 ThreadFactory參數,如下:

BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(16);
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 1, TimeUnit.HOURS, taskQueue);

其實是調用Executors.defaultThreadFactory()創建默認的ThreadFactory:

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

它為每個創建的線程設置了名字:"pool-xxx-thread-xxx"。而采用默認的ThreadFactory時相應的默認的異常處理器執行邏輯是由java.lang.ThreadGroup#uncaughtException方法來處理的,其中處理異常的相關源碼如下:

else if (!(e instanceof ThreadDeath)) {
                System.err.print("Exception in thread \""
                                 + t.getName() + "\" ");
                e.printStackTrace(System.err);
            }

如果線程執行過程中拋出的錯誤 不是 ThreadDeath對象,那麽只是簡單地:打印線程名稱,並將堆棧信息記錄到控制臺中,任務結束。如果是一個ThreadDeath對象,看ThreadDeath類的源碼註釋可知:異常處理器不會被調用,程序不會輸出任何日誌信息。(有木有碰到這種情況,線程池中的線程不知不覺地消失了……)

The ThreadGroup#uncaughtException top-level error handler does not print out a message if ThreadDeath is never caught.

在本文的示例程序CustomThreadPoolExecutorTest.java中,為了模擬在while(true)循環中拋出異常,定義一個 Boolean 變量 stop 使得線程執行一段時間拋出一個異常:也即先讓test線程運行一段時間,然後主線程設置 stop 變量的值,使得test線程拋出運行時異常。(完整代碼可參考文末)

if (stop) {
    throw new RuntimeException("running encounter exception");
 }

線程池提交 while(true)循環任務:

        threadPoolExecutor.execute(()->{
            //提交的是一個while(true)任務,正常運行時這類任務不會結束
            while (true) {
                System.out.println("start processing");
                try {
                    //模擬任務每次執行耗時1000ms
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    //ignore
                }
                System.out.println("finish processing");

                if (stop) {
                    throw new RuntimeException("running encounter exception");
                }
            }
        });

threadPoolExecutor.execute提交了一個任務,這會耗費一個線程來執行該任務,由於任務是個while(true)循環,正常情況下該任務不會終止。換句話說,這個任務會"永久"占用線程池中的一個線程。因此,對於while(true)循環的任務需要註意:

創建線程池new ThreadPoolExecutor(...)時,指定的 corePoolSize 不能小於 需要提交的任務個數,否則有些任務不能立即啟動,線程池需要增加線程(最大增加到maximumPoolSize 個線程)來處理。如果 maximumPoolSize 小於需要提交的任務個數,由於每個任務永久地占用一個線程執行,那麽有些任務就只能一直堆積在taskQueue 任務隊列中了

而在本示例中,main 線程通過設置 stop 變量讓 test 線程拋出異常,自定義的異常處理器MyExceptionHandler就會處理該異常,並且在該任務執行“完成”後,JVM會調用線程池的afterExecute(...)方法,又重新提交該任務。

總結

這篇文章總結了本人在使用JAVA線程池中的一些理解,寫代碼以線程池方式提交任務,程序跑一段時間,沒有數據輸出了,好像暫停了,看堆棧信息線程莫名其妙地消失了,或者阻塞在任務隊列上拿不到Task了……因此需要明白線程池底層執行的機制。

  1. 在實現Kafka消費者過程中,每個消費者一個線程,使用線程池來管理線程、提交任務。但總過一段時間後Kafka Broker Rebalance,看後臺日誌是Kafka Consumer在解析一些消息時拋出了運行時異常。這樣線程池就結束了這個任務,由於沒有重寫afterExecute()方法 當任務出現異常時重新提交任務。因此,這意味著永久丟失了一個消費者線程。而少了一個消費者,Kafka就發生了Rebalance。
  2. 盡量使用線程池來管理線程,而不是自己 new Thread(),一方面是采用線程池可方便地為每個線程設置合理的名稱,這樣便於debug。另一方面,通過 implements Thread.UncaughtExceptionHandler自定義線程運行時異常處理器,可方便地打印出線程異常日誌。
  3. 可繼承ThreadPoolExecutor擴展線程池功能,比如在任務執行完成後,執行一些額外的操作。關於如何擴展線程池,ElasticSearch源碼中線程池模塊很值得借鑒。
  4. 上文中提到的異常處理器 和 向線程池提交任務的拒絕策略RejectedExecutionHandler是兩回事。另外,為了圖方便,直接在main方法中創建線程池了,實際應用中肯定不能這樣。這裏給出的代碼只是Examples。

最後給出一個思考問題:針對需要長期運行的任務,比如每隔一段時間從Redis中讀取若幹條數據。是提交一個Runnable任務,這個Runnable任務裏是個while(true)循環讀取數據:

        executor.execute(()->{
            while (true) {
                //讀若幹條數據
                read();
                sleep(1000);
            }
        });

還是:在一個外部while循環中,不斷地向 taskQueue 任務隊列中提交任務呢?

        while (true) {
            executor.execute(()->{
                read();
            });
            sleep(1000);
        }

CustomThreadPoolExecutorTest.java 完整代碼:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

public class CustomThreadPoolExecutorTest {
    private static volatile boolean stop = false;
    public static void main(String[] args)throws InterruptedException {
        BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(16);
        //定義 線程執行過程中出現異常時的 異常處理器
        MyExceptionHandler exceptionHandler = new MyExceptionHandler();
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("test-%d")
                .setUncaughtExceptionHandler(exceptionHandler).build();
        CustomThreadPoolExecutor threadPoolExecutor = new CustomThreadPoolExecutor(1, 2, 1, TimeUnit.HOURS, taskQueue,threadFactory);

        threadPoolExecutor.execute(()->{
            //提交的是一個while(true)任務,正常運行時這類任務不會結束
            while (true) {
                System.out.println("start processing");
                try {
                    //模擬任務每次執行耗時1000ms
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    //ignore
                }
                System.out.println("finish processing");

                if (stop) {
                    throw new RuntimeException("running encounter exception");
                }
            }
        });

        Thread.sleep(2000);
        //模擬 test- 線程 在執行任務過程中拋出異常
        stop = true;
        Thread.sleep(1000);
        stop = false;
    }

    private static class MyExceptionHandler implements Thread.UncaughtExceptionHandler {
        @Override
        public void uncaughtException(Thread t, Throwable e) {
            System.out.println(String.format("thread name %s, msg %s", t.getName(), e.getMessage()));
        }
    }
}

ThreadPoolExecutorTest.java 測試線程在執行過程中拋出ThreadDeath對象:

import java.util.concurrent.*;
public class ThreadPoolExecutorTest {
    private static volatile boolean stop = false;
    public static void main(String[] args) throws InterruptedException{
        BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(16);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 1, TimeUnit.HOURS, taskQueue);
        executor.execute(()->{
            while (true) {
                System.out.println("start processing");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    //ignore
                }
                System.out.println("finish processing");
                if (stop) {
                    throw new ThreadDeath();
//                    throw new RuntimeException("runtime exception");
                }
            }
        });
        Thread.sleep(3000);
        stop = true;
        Thread.sleep(2000);

        executor.execute(()->{
            //能夠繼續提交任務執行
            System.out.println("continue submit runnable task,is All thread in thread pool dead?");
        });
    }
}

參考資料:

  • Reexecute task from within UncaughtExceptionHandler?

原文:https://www.cnblogs.com/hapjin/p/10240863.html

線程池異常處理之重啟線程處理任務