1. 程式人生 > >Java多執行緒程式設計-(7)-使用執行緒池實現執行緒的複用和一些坑的避免

Java多執行緒程式設計-(7)-使用執行緒池實現執行緒的複用和一些坑的避免

原文出自 : https://blog.csdn.net/xlgen157387/article/details/78253096

執行緒複用:執行緒池

首先舉個例子:

假設這裡有一個系統,大概每秒需要處理5萬條資料,這5萬條資料為一個批次,而這沒秒傳送的5萬條資料資料需要經過兩個處理過程,第一步是資料存入資料庫,第二步是對資料進行其他業務的分析,假設第一步我是用的是普通的JDBC插入資料,為了不影響程式的繼續執行,我寫了一個執行緒,讓這個子執行緒不阻塞主執行緒,繼續處理第二步驟的資料,我們知道插入5萬條資料大概需要2至3秒的時間,如果每一批次插入資料庫的時候,就建立一個執行緒進行處理,可想而知,由於插入資料庫的時間較久,不能很快的處理,這樣的話,一段時間之後,系統中就會有很多的這種插入資料的執行緒(PS:只是假設場景,方案設計的可能不合理)。

new Thread(new Runnable() {
    @Override
    public void run() {
        System.out.println("5萬條資料入庫!");
    }
}).start();
  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

如果,我們使用上述的方式去建立執行緒,使用start()方法啟動執行緒,該執行緒會在run()方法結束後,自動回收該執行緒。雖然如此,在上邊的場景中執行緒中業務的處理速度完全達不到我們的要求,系統中的執行緒會逐漸變大,進而消耗CPU資源,大量的執行緒搶佔寶貴的記憶體資源,可能還會出現OOM,即便沒有出現,大量的執行緒回收也會個GC帶來很大的壓力。

可想而知,雖然多執行緒技術可以充分發揮多核處理器的計算能力,提高生產系統的吞吐量和效能。但是,若不加控制和管理的隨意使用執行緒,對系統的效能反而會產生不利的影響。

還拿上邊的例子說,如果我們使用執行緒池的方式的話,可以實現指定執行緒的數量,這樣的話就算再多的資料需要入庫,只需要排隊等待執行緒池的執行緒即可,就不會出現執行緒池過多而消耗系統資源的情況,當然這只是意見簡單的場景。

執行緒的生命週期流程圖

說到這裡,有人要說了執行緒不是攜帶資源的最小單位,作業系統的書籍中還給我們說了執行緒之間的切換消耗很小嗎?雖然如此,執行緒是一種輕量級的工具(或者稱之為:輕量級程序),但其建立和關閉依然需要花費時間,如果為了一個很簡單的任務就去建立一個執行緒,很有可能出現建立和銷燬執行緒所佔用的時間大於該執行緒真實工作所消耗的時間,反而得不償失。

那麼什麼是執行緒池?

為了避免系統頻繁的建立和銷燬執行緒,我們可以將建立的執行緒進行復用。資料庫中的資料庫連線池也是此意。

這裡寫圖片描述

線上程池中總有那麼幾個活躍的執行緒,也有一定的最大值限制,一個業務使用完執行緒之後,不是立即銷燬而是將其放入到執行緒池中,從而實現執行緒的複用。簡而言之:建立執行緒變成了從執行緒池獲取空閒的執行緒,關閉執行緒變成了向池子中歸還執行緒。

再多的概念,不過多解釋,因為很基礎,也不是本文的重點。

使用執行緒池的好處

Java中的執行緒池是運用場景最多的併發框架,幾乎所有需要非同步或併發執行任務的程式都可以使用執行緒池。在開發過程中,合理地使用執行緒池能夠帶來3個好處:

第一:降低資源消耗。通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗。

第二:提高響應速度。當任務到達時,任務可以不需要等到執行緒建立就能立即執行。

第三:提高執行緒的可管理性。執行緒是稀缺資源,如果無限制地建立,不僅會消耗系統資源,還會降低系統的穩定性,使用執行緒池可以進行統一分配、調優和監控。但是,要做到合理利用執行緒池,必須對其實現原理了如指掌。

JDK對執行緒池的支援

JDK提供的Executor框架

JDK提供了Executor框架,可以讓我們有效的管理和控制我們的執行緒,其實質也就是一個執行緒池。Executor下的介面和類繼承關係如下:

這裡寫圖片描述

其中,ExecutorService介面定義如下:

這裡寫圖片描述

如果使用Executor框架的話,Executors類是常用的,其方法如下:

這裡寫圖片描述

其中常用幾類如下:

public static ExecutorService newFixedThreadPool()
public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newCachedThreadPool()
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
public static ScheduledExecutorService newScheduledThreadPool()

  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

1、newFixedThreadPool:該方法返回一個固定執行緒數量的執行緒池;

2、newSingleThreadExecutor:該方法返回一個只有一個現成的執行緒池;

3、newCachedThreadPool:返回一個可以根據實際情況調整執行緒數量的執行緒池;

4、newSingleThreadScheduledExecutor:該方法和newSingleThreadExecutor的區別是給定了時間執行某任務的功能,可以進行定時執行等;

5、newScheduledThreadPool:在4的基礎上可以指定執行緒數量。

建立執行緒池實質呼叫的還是ThreadPoolExecutor

在Executors類中,我們拿出來一個方法簡單分析一下:

這裡寫圖片描述

可以看出,類似的其他方法一樣,在Executors內部建立執行緒池的時候,實際建立的都是一個ThreadPoolExecutor物件,只是對ThreadPoolExecutor構造方法,進行了預設值的設定。ThreadPoolExecutor的構造方法如下:

這裡寫圖片描述

引數含義如下:

1、corePoolSize 核心執行緒池大小;
2、maximumPoolSize 執行緒池最大容量大小;
3、keepAliveTime 執行緒池空閒時,執行緒存活的時間;
4、TimeUnit 時間單位;
5、ThreadFactory 執行緒工廠;
6、BlockingQueue任務佇列;
7、RejectedExecutionHandler 執行緒拒絕策略;
  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

其中這裡的任務佇列有以下幾種:

Executor框架例項

1、例項一:

public class ThreadPoolDemo {

    public static void main(String[] args) {

        ExecutorService executorService = Executors.newFixedThreadPool(4);

        for (int i = 0; i < 10; i++) {
            int index = i;
            executorService.submit(() -> System.out.println("i:" + index +
                    " executorService"));
        }
        executorService.shutdown();
    }
}
  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

submit(Runnable task)方法提交一個執行緒。

但是使用最新的“阿里巴巴編碼規範外掛”檢測一下會發現:

這裡寫圖片描述

執行緒池不允許使用Executors去建立,而是通過ThreadPoolExecutor的方式,
這樣的處理方式讓寫的同學更加明確執行緒池的執行規則,規避資源耗盡的風險。 
說明:Executors各個方法的弊端:

1)newFixedThreadPool和newSingleThreadExecutor:
  主要問題是堆積的請求處理佇列可能會耗費非常大的記憶體,甚至OOM。
2)newCachedThreadPool和newScheduledThreadPool:
  主要問題是執行緒數最大數是Integer.MAX_VALUE,可能會建立數量非常多的執行緒,甚至OOM。
  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

阿里巴巴編碼規範外掛地址:https://github.com/alibaba/p3c

2、例項二:

遵循阿里巴巴編碼規範的提示,示例如下:

public class ThreadPoolDemo {

    public static void main(String[] args) {

        ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0L, 
                TimeUnit.MILLISECONDS, 
                new LinkedBlockingQueue<>(10), 
                Executors.defaultThreadFactory(), 
                new ThreadPoolExecutor.AbortPolicy());

        for (int i = 0; i < 10; i++) {
            int index = i;
            executorService.submit(() -> System.out.println("i:" + index + 
                    " executorService"));
        }
        executorService.shutdown();
    }
}
  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

或者這樣:

public class ThreadPoolDemo {

    public static void main(String[] args) {

        ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 2, 0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(10),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());

        for (int i = 0; i < 10; i++) {
            int index = i;
            pool.submit(() -> System.out.println("i:" + index +
                    " executorService"));
        }
        pool.shutdown();
    }
}
  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

3、例項三:

自定義ThreadFactory、自定義執行緒拒絕策略

public static void main(String[] args) {

        ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(10),
                new ThreadFactory() { //自定義ThreadFactory
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r);
                        thread.setName(r.getClass().getName());
                        return thread;
                    }
                },
                new ThreadPoolExecutor.AbortPolicy()); //自定義執行緒拒絕策略

        for (int i = 0; i < 10; i++) {
            int index = i;
            executorService.submit(() -> System.out.println("i:" + index));
        }

        executorService.shutdown();
    }
}

  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

更多例項程式碼,可參考:https://gitee.com/xuliugen/codes/ta5dbsge0kvhy62qu8li157

使用submit的坑

首先看一下例項:

public class ThreadPoolDemo3 {

    public static void main(String[] args) {

        ExecutorService executorService = Executors.newFixedThreadPool(4);

        for (int i = 0; i < 5; i++) {
            int index = i;
            executorService.submit(() -> divTask(100, index));


        }
        executorService.shutdown();
    }

    private static void divTask(int a, int b) {
        double result = a / b;
        System.out.println(result);
    }
}
  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

執行結果:

這裡寫圖片描述

上述程式碼,可以看出執行結果為4個,因該是有5個的,但是當i=0的時候,100/0是會報錯的,但是日誌資訊中沒有任何資訊,是為什麼那?如果使用了submit(Runnable task) 就會出現這種情況,任何的錯誤資訊都出現不了!

這是因為使用submit(Runnable task) 的時候,錯誤的堆疊資訊跑出來的時候會被內部捕獲到,所以打印不出來具體的資訊讓我們檢視,解決的方法有如下兩種:

1、使用execute()代替submit();

public class ThreadPoolDemo3 {

    public static void main(String[] args) {

        ExecutorService executorService = Executors.newFixedThreadPool(4);

        for (int i = 0; i < 5; i++) {
            int index = i;
            executorService.execute(() -> divTask(100, index));
        }
        executorService.shutdown();
    }

    private static void divTask(int a, int b) {
        double result = a / b;
        System.out.println(result);
    }
}

  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

執行結果:

這裡寫圖片描述

2、使用Future

public class ThreadPoolDemo3 {

    public static void main(String[] args) {

        ExecutorService executorService = Executors.newFixedThreadPool(4);

        for (int i = 0; i < 5; i++) {
            int index = i;
            Future future = executorService.submit(() -> divTask(200, index));
            try {
                future.get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
        executorService.shutdown();
    }

    private static void divTask(int a, int b) {
        double result = a / b;
        System.out.println(result);
    }
}

  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

執行結果:

這裡寫圖片描述

3、execute和submit的區別

(1)execute()方法用於提交不需要返回值的任務,所以無法判斷任務是否被執行緒池執行成功。通過以下程式碼可知execute()方法輸入的任務是一個Runnable類的例項。

(2)submit()方法用於提交需要返回值的任務。執行緒池會返回一個future型別的物件,通過這個future物件可以判斷任務是否執行成功,並且可以通過future的get()方法來獲取返回值,get()方法會阻塞當前執行緒直到任務完成,而使用get(long timeout,TimeUnit unit)方法則會阻塞當前執行緒一段時間後立即返回,這時候有可能任務沒有執行完。