【小家Java】一次Java執行緒池誤用(newFixedThreadPool)引發的線上血案和總結
相關閱讀
【小家java】java5新特性(簡述十大新特性) 重要一躍
【小家java】java6新特性(簡述十大新特性) 雞肋升級
【小家java】java7新特性(簡述八大新特性) 不溫不火
【小家java】java8新特性(簡述十大新特性) 飽受讚譽
【小家java】java9新特性(簡述十大新特性) 褒貶不一
【小家java】java10新特性(簡述十大新特性) 小步迭代
【小家java】java11新特性(簡述八大新特性) 首個重磅LTS版本
【小家java】Java中的執行緒池,你真的用對了嗎?(教你用正確的姿勢使用執行緒池)
小家Java】一次Java執行緒池誤用(newFixedThreadPool)引發的線上血案和總結
【小家java】BlockingQueue阻塞佇列詳解以及5大實現(ArrayBlockingQueue、DelayQueue、LinkedBlockingQueue…)
【小家java】用 ThreadPoolExecutor/ThreadPoolTaskExecutor 執行緒池技術提高系統吞吐量(附帶執行緒池引數詳解和使用注意事項)
這是一個十分嚴重的線上問題
自從最近的某年某月某天起,線上服務開始變得不那麼穩定(軟病)。在高峰期,時常有幾臺機器的記憶體持續飆升,並且無法回收,導致服務不可用。
給出監控中GC的取樣曲線:
記憶體使用曲線如下:
如上兩張圖顯示:18:50-19:00的這10分鐘階段裡,服務已經處於不可用的狀態了。這就導致了:上游服務的超時異常會增加,該臺機器會觸發熔斷。
熔斷觸發後,這臺機器的流量會打到其他機器,其他機器發生類似的情況的可能性會提高,極端情況會引起所有服務宕機,造成雪崩,曲線掉底。
問題分析和猜想
結合我們的業務情況,我們監控到在那段時間裡,訪問量是最高的,屬於一個高峰情況,因此我們初步斷定,這個和流量高併發有密不可分個的關係。
1、因為線上記憶體過大,如果採用 jmap dump的方式,這個任務可能需要很久才可以執行完,同時把這麼大的檔案存放起來匯入工具也是一件很難的事情
2、再看JVM啟動引數,也很久沒有變更過 Xms, Xmx, -XX:NewRatio, -XX:SurvivorRatio, 雖然沒有仔細分析程式使用記憶體情況,但看起來也無大礙。
3、於是開始找程式碼,某年某天某月~ 嗯,注意到一段這樣的程式碼提交:
private static ExecutorService executor = Executors.newFixedThreadPool(15);
public static void push2Kafka(Object msg) {
executor.execute(new WriteTask(msg, false));
}
這段程式碼的功能是:每次線上呼叫,都會把計算結果的日誌打到 Kafka,Kafka消費方再繼續後續的邏輯。
看這塊程式碼的問題:咋一看,好像沒什麼問題,但深入分析,問題就出現在
Executors.newFixedThreadPool(15)
這段程式碼上。
因為使用了 newFixedThreadPool 執行緒池,而它的工作機制是,固定了N個執行緒,而提交給執行緒池的任務佇列是不限制大小的,如果Kafka發訊息被阻塞或者變慢,那麼顯然佇列裡面的內容會越來越多,也就會導致這樣的問題。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
如上,採用的是LinkedBlockingQueue,而它預設是一個無界佇列。因此若使用不當,講很快導致記憶體被打滿,需要謹慎啊。
驗證猜想
為了驗證這個想法,做了個小實驗,把 newFixedThreadPool 執行緒池的執行緒個數調小一點,然後自己模擬壓測一下:
測試程式碼如下:
/**
* @author [email protected]
* @description
* @date 2018-11-04 10:13
*/
public class Main {
//建立一個固定執行緒池
private static ExecutorService executor = Executors.newFixedThreadPool(1);
//向kafka裡推送消費
public static void push2Kafka(Object msg) {
executor.execute(() -> {
try {
//模擬 佔用的記憶體大小
Byte[] bytes = new Byte[1024 * 1000 * 1000];
System.out.println(Thread.currentThread().getName() + "-->任務放到執行緒池:" + msg);
TimeUnit.MINUTES.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
public static void main(String[] args) {
//模擬高併發環境下 一直向執行緒池裡面不停的塞任務
for (int i = 0; i < Integer.MAX_VALUE; i++) {
System.out.println("塞任務start..." + i);
push2Kafka(i);
System.out.println("塞任務end..." + i);
}
}
}
開啟JConsole檢視JVM的CPU、記憶體相關使用情況:
記憶體情況逐漸攀升,最終可以看出程式近乎停止。最終丟擲記憶體異常
Exception in thread "pool-1-thread-295" java.lang.OutOfMemoryError: Java heap space
然而,電腦本機的實體記憶體,也是幾乎會被佔滿:
下面是程式啟用和停止的記憶體情況:
綜上所訴,我們的猜想是正確的。如果消費的速度小於生產的速度,記憶體隨著時間的堆積,很快就能被打滿了。
解決方案
問題根源找到了,解決的方法其實就非常的簡單了,採取了自定義執行緒池引數。
在我們的修復方案中,選擇的就是有界佇列
,雖然會有部分任務被丟失,但是我們線上是排序日誌蒐集任務,所以對部分對丟失是可以容忍的。
Java提供的四種常用執行緒池解析 Executors
既然樓主踩坑就是使用了 JDK 的預設實現,那麼再來看看這些預設實現到底幹了什麼,封裝了哪些引數。簡而言之 Executors 工廠方法Executors.newCachedThreadPool() 提供了無界執行緒池,可以進行自動執行緒回收;Executors.newFixedThreadPool(int) 提供了固定大小執行緒池,內部使用無界佇列;Executors.newSingleThreadExecutor() 提供了單個後臺執行緒。
newCachedThreadPool:可快取執行緒池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
這種型別的執行緒池特點是:
- 工作執行緒的建立數量幾乎沒有限制(其實也有限制的,數目為Interger. MAX_VALUE), 這樣可靈活的往執行緒池中新增執行緒。
- 如果長時間沒有往執行緒池中提交任務,即如果工作執行緒空閒了指定的時間(預設為1分鐘),則該工作執行緒將自動終止。終止後,如果你又提交了新的任務,則執行緒池重新建立一個工作執行緒。
- 在使用CachedThreadPool時,一定要注意控制任務的數量,否則,由於大量執行緒同時執行,很有會造成系統癱瘓。
public class Main {
public static void main(String[] args) {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int index = i;
try {
Thread.sleep(index * 100);
} catch (Exception e) {
e.printStackTrace();
}
cachedThreadPool.execute(() -> System.out.println(index + "當前執行緒" + Thread.currentThread().getName()));
}
}
}
輸出:
0當前執行緒pool-1-thread-1
1當前執行緒pool-1-thread-1
2當前執行緒pool-1-thread-1
3當前執行緒pool-1-thread-1
4當前執行緒pool-1-thread-1
5當前執行緒pool-1-thread-1
6當前執行緒pool-1-thread-1
7當前執行緒pool-1-thread-1
8當前執行緒pool-1-thread-1
9當前執行緒pool-1-thread-1
發現10個執行緒都是使用的執行緒1,執行緒池為無限大,當執行第二個任務時第一個任務已經完成,會複用執行第一個任務的執行緒,而不用每次新建執行緒。
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
看程式碼一目瞭然了,執行緒數量固定,使用無限大的佇列。再次強調,樓主就是踩的這個無限大佇列的坑。
newScheduledThreadPool
建立一個定長執行緒池,支援定時及週期性任務執行。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
在來看看ScheduledThreadPoolExecutor()的建構函式:
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
ScheduledThreadPoolExecutor的父類即ThreadPoolExecutor,因此這裡各引數含義和上面一樣。值得關心的是DelayedWorkQueue這個阻塞對列。
它作為靜態內部類就在ScheduledThreadPoolExecutor中進行了實現。簡單的說,DelayedWorkQueue是一個無界佇列,它能按一定的順序對工作佇列中的元素進行排列。
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
注意:該靜態方法,禁止使用,因為裡面有不少坑,這裡不做過多解釋
關於執行緒池的阻塞佇列的各種用法,請參見博文:
【小家java】BlockingQueue阻塞佇列詳解以及5大實現(ArrayBlockingQueue、DelayQueue、LinkedBlockingQueue…)
結束語
雖然之前學習了不少相關知識,但是隻有在實踐中踩坑才能印象深刻吧
可以通過Executors靜態工廠構建執行緒池,但一般不建議這樣使用。
附:ThreadFactory簡單介紹
ThreadFactory是一個執行緒工廠。用來建立執行緒。這裡為什麼要使用執行緒工廠呢?其實就是為了統一在建立執行緒時設定一些引數,如是否守護執行緒。執行緒一些特性等,如優先順序。通過這個TreadFactory創建出來的執行緒能保證有相同的特性。它首先是一個介面類,而且方法只有一個。就是建立一個執行緒。
public interface ThreadFactory {
Thread newThread(Runnable r);
}
所以我們可以自己實現這個工廠,然後定製屬於我們自己的一類執行緒
class MyThreadFactory implements ThreadFactory {
private int counter;
private String name;
private List<String> stats;
public MyThreadFactory(String name) {
counter = 0;
this.name = name;
stats = new ArrayList<String>();
}
@Override
public Thread newThread(Runnable run) {
Thread t = new Thread(run, name + "-Thread-" + counter);
counter++;
stats.add(String.format("Created thread %d with name %s on%s\n",t.getId(), t.getName(), new Date()));
return t;
}
public String getStas() {
StringBuffer buffer = new StringBuffer();
Iterator<String> it = stats.iterator();
while (it.hasNext()) {
buffer.append(it.next());
buffer.append("\n");
}
return buffer.toString();
}
}
//使用:
MyThreadFactory factory = new MyThreadFactory("MyThreadFactory");
Thread thread = factory.newThread(new MyTask(i));
thread.start();