1. 程式人生 > >自定義執行緒池影響的線上事故

自定義執行緒池影響的線上事故

  作為一個牛逼的程式設計師,相信大家肯定是接觸過多執行緒的概念的。並且可能會在實際的工作中因為一些業務場景需要使用自定義執行緒池來執行批量的任務或對執行緒進行管理。同樣,我們專案中也存在一個兩個場景需要使用執行緒池。而這兩個場景分別為:

1、持續監聽某個外部介面的不間斷的返回資訊,其實就是長連結的阻塞介面,總共12種資源需要監聽,所以就意味需要12個不間斷的執行緒執行阻塞任務。

2、RabbitMQ的消費者,因為需要應用啟動的時候就執行訊息的消費,所以也通過執行緒池中獲取執行緒執行消費任務。

一、先看執行緒池的定義

public class ThreadPoolUtil {

    private static Logger logger = LoggerFactory.getLogger(ThreadPoolUtil.class);

    private static volatile ThreadPoolExecutor threadPoolExecutor = null;
    /**
     * 建立
     * @return
     */
    private static AtomicInteger nextId = new AtomicInteger(0);
    public static ThreadPoolExecutor createExecutor(){

        int corePoolSize = 12; // 核心執行緒12個
        int maxPoolSize = 16; // 最大執行緒數 16個
        int keepAliveSeconds = 60; //閒置存活時間60秒
        BlockingQueue<Runnable> queue = new ArrayBlockingQueue(500); // 臨時佇列500個
        RejectedExecutionHandler rejectedExecutionHandler = (r, executor) -> logger.error("佇列已經滿了{},直接拒絕吧", executor.getTaskCount()); 

    // 同步程式碼塊 synchronized (ThreadPoolUtil.class){ if (threadPoolExecutor != null){ return threadPoolExecutor; }         //  建立單例的執行緒池 threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveSeconds, TimeUnit.SECONDS, queue, r -> { String fileName = Thread.currentThread().getStackTrace()[5].getFileName(); // 獲取外部使用者層的呼叫棧資訊 String threadName = fileName.substring(0,fileName.indexOf("."))+"-"; // 獲取呼叫棧的名稱,作為執行緒的名稱 Thread thread = new Thread(r, threadName+nextId.incrementAndGet()); return thread; }, rejectedExecutionHandler); } return threadPoolExecutor; } }

  看看上面的執行緒池設計,好像是沒有啥問題的。如果是放在普通的可終結的任務使用當前執行緒池,理論上是沒有太大問題。但是!我們的應用剛好這幾個任務都是阻塞的。阻塞就意味著執行緒是無法回收的,其他的任務使用這個執行緒池之後,就只能先放到佇列中,然後一直得不到釋放的執行緒資源執行。最終佇列積壓,任務被拋棄。

 

二、線上事故描述

  因為在初始化的時候,已經將 12 個監聽都啟動了,並且使用的是當前執行緒池構造工具。啟動完成之後,12個核心執行緒就一直被阻塞佔用。這12個資源的監聽還是比較正常的,並且能夠對監聽資料進行處理和執行。

  因為需要MQ消費端啟動的時候就可以執行消費,所以在啟動的時候,設定了啟動配置類中呼叫上述工具建立執行緒池,相當於用新的執行緒執行訊息監聽的動作。然而MQ卻遲遲不見消費的過程,導致訊息佇列一直積壓。並且無法完成正確的資料處理。

三、問題猜測及理論支撐

  猜測:沒有被消費,應該就是我們的執行緒池中沒有空閒的執行緒進行訊息監聽處理。初始化的時候的消費監聽的任務被直接丟棄到了執行緒池的任務佇列中,而這個執行緒池的任務佇列中數資料只有在兩種情況下才可能被執行。

  第一種:執行緒池中有空閒的執行緒,可以進行執行

  第二種:訊息佇列滿了,開啟了加大了執行緒池的執行緒數以便執行堆積的任務

  而我們的這個一步開啟MQ消費監聽的任務被髮送到執行緒池的時候,因為核心執行緒數就是 12 ,而我們前面的資源監聽介面已經開啟了12個阻塞任務,所以就沒有了可用執行緒。所以被存放到了執行緒池待執行任務佇列中。可怕的是,我們這個執行緒池的佇列大小為500 ,很顯然 1 < 500 ,所以就無法觸發執行緒加大的動作,導致這個佇列的任務“被遺忘”。

 

 理論支撐:

  執行緒池的核心引數包括: coreSize , maxSize, quauaSize,RejectedExecutionHandler

  分別為:核心執行緒數,最大執行緒數,可積壓的任務數,拒絕策略

  當建立執行緒的時候,首先會先建立核心執行緒數等量的執行緒,比如上面就是 12個核心執行緒, 而當我們的核心執行緒都在執行階段的時候,再次加入的任務就會被存放到任務佇列中。當任務不斷的增加並且幅度遠遠大於核心執行緒的處理速度,導致任務佇列存放到最大值,比如上面的500,那麼就需要增加執行緒數,此時就是需要增加執行緒數到最大值,比如上面的16,然而,增大了之後,發現已然不能處理消化任務的投放數量,這個時候就用不同的處理策略,比如上面的  rejectedExecutionHandler 就是直接丟棄。

  

  猜測和理論匹配一下的話就是:核心執行緒是12 ,這12個執行緒被資源監聽的阻塞任務佔用無法釋放,而開啟消費監聽的任務被丟到了待執行的任務佇列中,此時,任務佇列又不滿足益處的條件,所以就沒有增加新的執行緒來處理,以至於,這個建立消費監聽的任務就“被遺忘”了。

 

  如何進行論證呢?使用如下測試程式碼

  

 public static void main(String[] args) {
        ThreadPoolExecutor executor = createExecutor();
      // 臨界值 分別設定12 16 512 518
        for (int i =0; i < $臨界值;i++){

            int finalI = i;
            executor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("當前任務序號為:"finalI +" ,活躍執行緒數"+ executor.getActiveCount());
                    Thread.sleep(10000*1000); // 這就當作是持久的任務在執行
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
    }

   測試結果:

  臨界值為 12, 核心執行緒剛好夠用

    

  臨界值為 16 , 雖然任務數大於了核心執行緒,但是並沒有新建執行緒數。所以驗證任務被放到了佇列中,先使用佇列存放,佇列滿了再開新執行緒

   

  臨界值為 512 任務數量大於  核心執行緒數  所以新任務放到了佇列中,且剛好不會有超出,不觸發新的執行緒建立

    

  臨界值為 516 任務數量大於  ( 核心執行緒數 + 佇列大小 ) 所有活躍執行緒被加到最大

  

  臨界值為 518, 任務數量大於  ( 佇列大小 + 最大執行緒數) 所有產生丟棄

  

 

 

四、如何解決當前事故

  出現這個問題之後,我們直接就增加了核心執行緒的數量,以保證整體大於在阻塞任務的數量。比如我們這個就是重新設定為核心執行緒數量 16 > 12,

  同時,我們將阻塞任務同非阻塞任務所建立的執行緒池進行隔離,以減少共用執行緒池造成的 正常任務被遺忘的可能性。

 

五、如何設定你的執行緒池大小

  那麼在開發中,如何設定i執行緒池的大小?其實這沒有特定的規範,需要結合自己任務的執行時間而考慮,

  但是最好提前考慮好,任務是否為阻塞性任務,如果是的話,建議做好執行緒隔離。

  在我們一般將核心執行緒設定為 n + 1  (n 為核心數量)

  最大執行緒數量設定 2n + 1  (n 為核心數量)