1. 程式人生 > >【小家Java】一次Java執行緒池誤用(newFixedThreadPool)引發的線上血案和總結

【小家Java】一次Java執行緒池誤用(newFixedThreadPool)引發的線上血案和總結

相關閱讀

【小家java】java5新特性(簡述十大新特性) 重要一躍
【小家java】java6新特性(簡述十大新特性) 雞肋升級
【小家java】java7新特性(簡述八大新特性) 不溫不火
【小家java】java8新特性(簡述十大新特性) 飽受讚譽
【小家java】java9新特性(簡述十大新特性) 褒貶不一
【小家java】java10新特性(簡述十大新特性) 小步迭代
【小家java】java11新特性(簡述八大新特性) 首個重磅LTS版本


【小家java】Java中的執行緒池,你真的用對了嗎?(教你用正確的姿勢使用執行緒池)
小家Java】一次Java執行緒池誤用(newFixedThreadPool)引發的線上血案和總結


【小家java】BlockingQueue阻塞佇列詳解以及5大實現(ArrayBlockingQueue、DelayQueue、LinkedBlockingQueue…)
【小家java】用 ThreadPoolExecutor/ThreadPoolTaskExecutor 執行緒池技術提高系統吞吐量(附帶執行緒池引數詳解和使用注意事項)


這是一個十分嚴重的線上問題

自從最近的某年某月某天起,線上服務開始變得不那麼穩定(軟病)。在高峰期,時常有幾臺機器的記憶體持續飆升,並且無法回收,導致服務不可用。

給出監控中GC的取樣曲線:
在這裡插入圖片描述
記憶體使用曲線如下:
在這裡插入圖片描述

如上兩張圖顯示:18:50-19:00的這10分鐘階段裡,服務已經處於不可用的狀態了。這就導致了:上游服務的超時異常會增加,該臺機器會觸發熔斷。

熔斷觸發後,這臺機器的流量會打到其他機器,其他機器發生類似的情況的可能性會提高,極端情況會引起所有服務宕機,造成雪崩,曲線掉底。

問題分析和猜想

結合我們的業務情況,我們監控到在那段時間裡,訪問量是最高的,屬於一個高峰情況,因此我們初步斷定,這個和流量高併發有密不可分個的關係。

1、因為線上記憶體過大,如果採用 jmap dump的方式,這個任務可能需要很久才可以執行完,同時把這麼大的檔案存放起來匯入工具也是一件很難的事情

2、再看JVM啟動引數,也很久沒有變更過 Xms, Xmx, -XX:NewRatio, -XX:SurvivorRatio, 雖然沒有仔細分析程式使用記憶體情況,但看起來也無大礙。

3、於是開始找程式碼,某年某天某月~ 嗯,注意到一段這樣的程式碼提交:

private static ExecutorService executor = Executors.newFixedThreadPool(15);
public static void push2Kafka(Object msg) {
    executor.execute(new WriteTask(msg,  false));    
}

這段程式碼的功能是:每次線上呼叫,都會把計算結果的日誌打到 Kafka,Kafka消費方再繼續後續的邏輯。

看這塊程式碼的問題:咋一看,好像沒什麼問題,但深入分析,問題就出現在
Executors.newFixedThreadPool(15)這段程式碼上。

因為使用了 newFixedThreadPool 執行緒池,而它的工作機制是,固定了N個執行緒,而提交給執行緒池的任務佇列是不限制大小的,如果Kafka發訊息被阻塞或者變慢,那麼顯然佇列裡面的內容會越來越多,也就會導致這樣的問題。

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

如上,採用的是LinkedBlockingQueue,而它預設是一個無界佇列。因此若使用不當,講很快導致記憶體被打滿,需要謹慎啊。

驗證猜想

為了驗證這個想法,做了個小實驗,把 newFixedThreadPool 執行緒池的執行緒個數調小一點,然後自己模擬壓測一下:
測試程式碼如下:

/**
 * @author [email protected]
 * @description
 * @date 2018-11-04 10:13
 */
public class Main {

    //建立一個固定執行緒池
    private static ExecutorService executor = Executors.newFixedThreadPool(1);


    //向kafka裡推送消費
    public static void push2Kafka(Object msg) {
        executor.execute(() -> {
            try {
                //模擬 佔用的記憶體大小
                Byte[] bytes = new Byte[1024 * 1000 * 1000];
                System.out.println(Thread.currentThread().getName() + "-->任務放到執行緒池:" + msg);
                TimeUnit.MINUTES.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }


    public static void main(String[] args) {

        //模擬高併發環境下  一直向執行緒池裡面不停的塞任務
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            System.out.println("塞任務start..." + i);
            push2Kafka(i);
            System.out.println("塞任務end..." + i);
        }

    }
}

開啟JConsole檢視JVM的CPU、記憶體相關使用情況:
在這裡插入圖片描述
在這裡插入圖片描述
記憶體情況逐漸攀升,最終可以看出程式近乎停止。最終丟擲記憶體異常

Exception in thread "pool-1-thread-295" java.lang.OutOfMemoryError: Java heap space

然而,電腦本機的實體記憶體,也是幾乎會被佔滿:
在這裡插入圖片描述

下面是程式啟用和停止的記憶體情況:
在這裡插入圖片描述
在這裡插入圖片描述

綜上所訴,我們的猜想是正確的。如果消費的速度小於生產的速度,記憶體隨著時間的堆積,很快就能被打滿了。

解決方案

問題根源找到了,解決的方法其實就非常的簡單了,採取了自定義執行緒池引數。

在我們的修復方案中,選擇的就是有界佇列,雖然會有部分任務被丟失,但是我們線上是排序日誌蒐集任務,所以對部分對丟失是可以容忍的。

Java提供的四種常用執行緒池解析 Executors

既然樓主踩坑就是使用了 JDK 的預設實現,那麼再來看看這些預設實現到底幹了什麼,封裝了哪些引數。簡而言之 Executors 工廠方法Executors.newCachedThreadPool() 提供了無界執行緒池,可以進行自動執行緒回收;Executors.newFixedThreadPool(int) 提供了固定大小執行緒池,內部使用無界佇列;Executors.newSingleThreadExecutor() 提供了單個後臺執行緒。

newCachedThreadPool:可快取執行緒池
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
} 

這種型別的執行緒池特點是:

  • 工作執行緒的建立數量幾乎沒有限制(其實也有限制的,數目為Interger. MAX_VALUE), 這樣可靈活的往執行緒池中新增執行緒。
  • 如果長時間沒有往執行緒池中提交任務,即如果工作執行緒空閒了指定的時間(預設為1分鐘),則該工作執行緒將自動終止。終止後,如果你又提交了新的任務,則執行緒池重新建立一個工作執行緒。
  • 在使用CachedThreadPool時,一定要注意控制任務的數量,否則,由於大量執行緒同時執行,很有會造成系統癱瘓。
public class Main {

    public static void main(String[] args) {
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            final int index = i;
            try {
                Thread.sleep(index * 100);
            } catch (Exception e) {
                e.printStackTrace();
            }

            cachedThreadPool.execute(() -> System.out.println(index + "當前執行緒" + Thread.currentThread().getName()));
        }

    }
}
輸出:
0當前執行緒pool-1-thread-1
1當前執行緒pool-1-thread-1
2當前執行緒pool-1-thread-1
3當前執行緒pool-1-thread-1
4當前執行緒pool-1-thread-1
5當前執行緒pool-1-thread-1
6當前執行緒pool-1-thread-1
7當前執行緒pool-1-thread-1
8當前執行緒pool-1-thread-1
9當前執行緒pool-1-thread-1

發現10個執行緒都是使用的執行緒1,執行緒池為無限大,當執行第二個任務時第一個任務已經完成,會複用執行第一個任務的執行緒,而不用每次新建執行緒。

newFixedThreadPool
 public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
 }

看程式碼一目瞭然了,執行緒數量固定,使用無限大的佇列。再次強調,樓主就是踩的這個無限大佇列的坑。

newScheduledThreadPool

建立一個定長執行緒池,支援定時及週期性任務執行。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
}

在來看看ScheduledThreadPoolExecutor()的建構函式:

 public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    } 

ScheduledThreadPoolExecutor的父類即ThreadPoolExecutor,因此這裡各引數含義和上面一樣。值得關心的是DelayedWorkQueue這個阻塞對列。

它作為靜態內部類就在ScheduledThreadPoolExecutor中進行了實現。簡單的說,DelayedWorkQueue是一個無界佇列,它能按一定的順序對工作佇列中的元素進行排列。

newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

注意:該靜態方法,禁止使用,因為裡面有不少坑,這裡不做過多解釋

關於執行緒池的阻塞佇列的各種用法,請參見博文:
【小家java】BlockingQueue阻塞佇列詳解以及5大實現(ArrayBlockingQueue、DelayQueue、LinkedBlockingQueue…)

結束語

雖然之前學習了不少相關知識,但是隻有在實踐中踩坑才能印象深刻吧

可以通過Executors靜態工廠構建執行緒池,但一般不建議這樣使用。

附:ThreadFactory簡單介紹

ThreadFactory是一個執行緒工廠。用來建立執行緒。這裡為什麼要使用執行緒工廠呢?其實就是為了統一在建立執行緒時設定一些引數,如是否守護執行緒。執行緒一些特性等,如優先順序。通過這個TreadFactory創建出來的執行緒能保證有相同的特性。它首先是一個介面類,而且方法只有一個。就是建立一個執行緒。

public interface ThreadFactory {    
    Thread newThread(Runnable r);  
}  

所以我們可以自己實現這個工廠,然後定製屬於我們自己的一類執行緒

class MyThreadFactory implements ThreadFactory {

        private int counter;
        private String name;
        private List<String> stats;

        public MyThreadFactory(String name) {
            counter = 0;
            this.name = name;
            stats = new ArrayList<String>();
        }

        @Override
        public Thread newThread(Runnable run) {
            Thread t = new Thread(run, name + "-Thread-" + counter);
            counter++;
            stats.add(String.format("Created thread %d with name %s on%s\n",t.getId(), t.getName(), new Date()));
            return t;
        }

        public String getStas() {
            StringBuffer buffer = new StringBuffer();
            Iterator<String> it = stats.iterator();
            while (it.hasNext()) {
                buffer.append(it.next());
                buffer.append("\n");
            }
            return buffer.toString();
        }

    }
//使用:
MyThreadFactory factory = new MyThreadFactory("MyThreadFactory");
Thread thread = factory.newThread(new MyTask(i));  
thread.start();