1. 程式人生 > >Java執行緒池實現

Java執行緒池實現

電腦的CPU資源是有限的,任務的處理速度與執行緒數量之間並不是正相關。當執行緒數量過多,CPU要頻繁的在不同執行緒切換,反而會引起處理效能的下降。執行緒池中最大的執行緒數,是考慮多種因素來事先設定的,比如硬體的條件,業務的型別等等。

當我們向一個固定大小的的執行緒池中請求一個執行緒時,當執行緒池中沒有空閒資源了,這個時候執行緒池如何處理這個請求?是拒絕請求還是排隊請求?各種策略又是如何實現的呢?

實際上,這些問題的處理並不複雜,底層的資料結構,就是佇列(queue)。

一、Java執行緒池介紹

1,執行緒池的作用

限制系統中執行執行緒的數量。
減少了建立和銷燬執行緒的次數,重複利用執行緒。

2,主要的類

Executor:執行執行緒的介面
ExecutorSerivce: 執行緒池介面
ThreadPoolExecutor :執行緒池類
Executors:常用執行緒池工廠

3,常用的執行緒池

配置執行緒池是比較複雜的過程,所有可以使用現有的執行緒池工廠生成常用的執行緒池:

  1. newFixedThreadPool 建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。為了合理利用資源,我們通常把定長池的長度設定為當前PC機獲取cpu核心數:Runtime.getRuntime().availableProcessors():獲取當前CPU核心數;
  2. newCachedThreadPool建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒;
  3. newScheduledThreadPool 建立一個定長執行緒池,支援定時及週期性任務執行;
  4. newSingleThreadExecutor 建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class MyThreadPool {

    public static void main(String [] args){
        int num = Runtime.getRuntime().availableProcessors();
        Executor executor = Executors.newFixedThreadPool(num);
        for (int i = 0 ; i<num ; i++){
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("我是一個子執行緒!!");
                }
            });
        }
    }
}

我們再來看Executors.newFixedThreadPool(num),點進去,會發現就是new了一個LinkedBlockingQueue:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

二、執行緒池和佇列結合實現一個日誌處理

JDK自己的執行緒池底層不光是用佇列實現的,我們也可以使用執行緒池和佇列相結合,來實現一些功能。

通常我們會把要執行的任務放入一個佇列中,由執行緒池來執行,比如爬蟲、日誌。我們先來看一個執行緒池和佇列結合實現日誌記錄的例子。

import com.swagger.demo.Entity.LogContentEntity;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

import javax.servlet.http.HttpServletRequest;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

@Configuration
@Aspect
@Component
public class AopLogConfig implements Runnable {

    @Autowired
    private HttpServletRequest request;

    private LinkedBlockingQueue<LogContentEntity> logQueue;

    public AopLogConfig() {
        //Spring啟動後,該物件建立時。初始化佇列以及執行緒池。
        logQueue = new LinkedBlockingQueue<LogContentEntity>(3000);
        int num = Runtime.getRuntime().availableProcessors();
        ExecutorService  executor = Executors.newFixedThreadPool(num);
        for (int i = 0 ;i<num ;i++){
            executor.execute(this);
        }
    }

    @Before("execution(public * com.swagger.demo.controller..*.*(..))")
    public void doBefore(JoinPoint joinPoint) throws Exception{

        //日誌記錄的資訊可自行修改
        LogContentEntity Log = new LogContentEntity();
        String method = request.getMethod();
        Log.setHttpMethod(method);
        String url = request.getRequestURL().toString();
        Log.setUrl(url);
        String ip = request.getRemoteAddr();
        Log.setIp(ip);
        Log.setContent("test Log Content");
        //將需要記錄的日誌物件放到佇列中等待執行緒非同步執行。
        logQueue.put(Log);
    }

    @Override
    public void run() {
        try{
            while(true){
                //如果佇列裡沒有,則會阻塞;
                LogContentEntity take = logQueue.take();
                //日誌處理邏輯可自行修改;
                System.out.println(take.toString());
            }
        }catch(Exception e){
            e.printStackTrace();
        }
    }
}

三、執行緒池+佇列以優先順序方式執行佇列任務

import java.util.concurrent.TimeUnit;

public class MyPriorityTask implements Runnable, Comparable<MyPriorityTask> {
    private int priority;
    private String name;

    public MyPriorityTask(String name, int priority) {
        this.name = name;
        this.priority = priority;
    }

    public void run() {
        System.out.printf("MyPriorityTask: %s Priority :%d\n", name, priority);
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public int compareTo(MyPriorityTask o) {
        if (this.getPriority() < o.getPriority()) {
            return 1;
        }
        if (this.getPriority() > o.getPriority()) {
            return -1;
        }
        return 0;
    }

    public int getPriority() {
        return priority;
    }
}

import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 1, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>());
        for (int i = 0; i < 100; i++) {
            MyPriorityTask task = new MyPriorityTask("Task " + i, 0);
            executor.execute(task);
            System.out.println(executor.getTaskCount());
        }
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        for (int i = 101; i < 8; i++) {
            MyPriorityTask task = new MyPriorityTask("Task " + i, 1);
            executor.execute(task);
            System.out.println(executor.getTaskCount());
        }
        try {
            executor.awaitTermination(1, TimeUnit.DAYS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.printf("Main: End of the program.\n");

    }
}

四、使用執行緒池的一些陷阱

儘管執行緒池對於構建多執行緒應用是個很強大的機制,但它也不是沒有缺點的。使用執行緒池構建的應用會面臨其他多執行緒應用所面對的一樣的併發風險,比如同步錯誤和死鎖,此外執行緒池還有其他的一些特有缺陷,比如 執行緒池-關聯 死鎖,資源不足,還有執行緒洩漏。

1.死鎖

任何多執行緒應用都會面臨死鎖的風險。彼此雙方都在等待一個事件,而這個事件只能有對方提供,這樣一對程序或者執行緒我們稱之為死鎖。死鎖最簡單的情況是執行緒 A 持有了物件 X 的獨佔鎖,執行緒 A 在等待物件 Y 的鎖,而執行緒 B 恰恰持有了物件 Y 的獨佔鎖,執行緒 B 在等待物件 X 的鎖。除非有某種辦法能夠打破這種鎖等待(Java 鎖機制不能支援這個),否則的話這一對死鎖執行緒將會永久地等待下去。
既然死鎖是所有多執行緒程式設計都將面臨的風險,執行緒池為我們引入了另一種死鎖:執行緒池中所有執行緒都在阻塞等待佇列中另一個任務的執行結果,但是另一個任務無法得到執行,因為池中壓根兒就沒用空閒的可用執行緒。這種情況線上程池用於一些相互影響物件的模擬實現中可能會出現,這些模擬物件彼此傳送查詢然後作為任務佇列進行執行,發起查詢的物件同步等待響應。

2.資源不足

執行緒池的優點之一是他們在大多數情況下比其他的排程機制具備更好的效能,比如我們上面所討論的那幾種。但這個取決於你有沒有恰當地配置了執行緒池大小。執行緒佔用大量的資源,包括記憶體和其他系統資源。除了執行緒物件所必須的記憶體之外,每個執行緒還需要兩個執行呼叫棧,這個棧可能會很大。此外,JVM 可能還會為每個 Java 執行緒建立一個本地執行緒,這樣將會佔用額外的系統資源。最後,雖然執行緒之間切換的排程開銷很小,大量的執行緒上下文切換也會影響到你的應用效能。
如果執行緒池過大的話,這些眾多執行緒所消耗的資源將會明顯影響到系統性能。時間會浪費線上程之間的切換上,配置有比你實際需要更多的執行緒會引起資源不足的問題,因為池中執行緒所佔用的資源如果用在其他任務上可能會更高效。除了這些執行緒本身所使用的資源之外,服務請求時所做的工作可能會需要額外資源,比如 JDBC 連線,套接字,或者檔案。這些也是有限的資源,而且對它們進行過高併發請求的話可能會導致失效,比如無法分配一個 JDBC 連線。

3.併發錯誤

執行緒池以及其他佇列機制依賴於 wait() 和 notify() 方法的使用,這可能會變得很棘手。如果編碼不當的話,很可能會導致通知丟失,結果就是池中的執行緒都處於一個空閒的狀態,而實際上佇列中有任務需要處理。在使用這些工具的時候要打起十二萬分的精神;即便是專家在用它們的時候也經常會失誤。幸運的是,可以使用一些現成的實現,這些實現久經考驗,比如下文將會討論到的 你無須自行編碼 實現的 java.util.concurrent 包。

4.執行緒洩漏

各種各樣的執行緒池中存在的一個重大的危險就是執行緒洩漏,當一個執行緒被從執行緒池中移除去執行一個任務,任務執行結束之後卻沒有返還給執行緒池的時候,就會出現這種危險。出現這種情況的一種方式是當任務丟擲一個 RuntimeException 或一個 Error 時。如果執行緒池類沒有捕捉到這些,該執行緒將會傻傻地存在於執行緒池之中,而執行緒池的執行緒數量則會被永久地減一。當這種情況發生的次數足夠多的時候,執行緒池最終將為空(無可用執行緒),而系統則會癱瘓,因為已經沒有執行緒來處理任務了。
癱瘓的任務,比如那些永久等待不保證可用資源或者等待已經回家了的使用者輸入的任務,也可以造成相等於執行緒洩漏一樣的後果。如果一個執行緒永久地被這樣一個任務所佔用了的話,它造成的影響和從池中移除是一樣的。像這樣的任務應該要麼給它們一個執行緒池之外的執行緒,要麼控制一下它們的等待時間。

5.請求過載

伺服器很可能會被鋪天蓋地而來的請求所淹沒。這種情況下,我們可能並不想讓每個進來的請求都放進我們的工作佇列,因為等待執行的任務佇列也可能會佔用過多系統資源並導致資源不足。這時候要做什麼就取決於你的決定了,比如你可以通過一個表示伺服器暫時太忙的響應來拒絕這些請求。

五、高效執行緒池使用指南

你只需要遵循一些簡單的指導方針,執行緒池就可以成為你構建服務應用的一個非常有效的方法:

  • 不要把同步等待其他任務執行結果的任務放進任務佇列。這將導致上文所描述那種死鎖,池中所有執行緒都在等待一個任務的執行結果,而佇列中的這個任務無法得到執行因為所有執行緒都在使用中。
  • 可能長時間操作的任務放入執行緒池的時候要慎重。如果程式必須要等待一個資源,比如一個 I/O 的完成,定義一個最長等待時間,然後失敗或稍後重新執行。這就保證了通過將一個執行緒從一個可能會完成的任務中釋放出來而最終一些其他任務得到成功執行。
  • 理解你的任務。想要有效地調整執行緒池大小,你需要理解佇列中那些任務要做的事情。它們是 CPU 密集型操作嗎?它們會長時間佔用 I/O 嗎?你的答案會影響到你對你的應用的配置。如果這些任務來自不同的類、有著截然不同的特徵,為不同型別的任務定製不同的工作佇列也許更行得通,這樣每個池都能夠得到有據配置。

執行緒池大小配置

調整執行緒池的大小在很大程度上是一件避免兩個錯誤的事情:擁有過多或過少的執行緒。幸運的是,對於大多數應用而言太多或太少之間的中間地帶還是很寬廣的。
回顧應用中使用執行緒的兩個主要優點:在等待一個諸如 I/O 之類的慢操作的時候程序能夠繼續進行,利用多個處理器的可用性。在一個 N 處理器主機上執行一個計算密集型的應用,通過設定執行緒數量為 N 增加額外的執行緒可能會提高吞吐量,但新增的額外執行緒超過 N 的話就沒有什麼好處了。確實,過多的執行緒甚至會降低效能因為會帶來額外的上下文切換開銷。
執行緒池最佳大小取決於可用處理器的數量和工作佇列中任務的性質。對於在一個 N-處理器 系統中一個的將持有完全計算密集型任務的工作佇列,通常獲得 CPU 最大利用率的話是配置執行緒池大小為 N 或 N + 1 個執行緒。
對於可能要等待 I/O 完成的任務,比如,一個從 socket 中讀取一個 HTTP 請求的任務 - 你需要增加執行緒池的執行緒的數量超出可用處理器的數量,因為所有的執行緒都在同一時間工作。通過分析,你可以為一個典型的請求估算出等待時間(WT)和服務時間(ST)之間的比率。比如我們稱這個比率為 WT/ST,對於一個 N-處理器系統,你需要大約 N * (1 + WT/ST) 個執行緒來保持處理器得到充分利用。
處理器利用率並非配置執行緒池大小的唯一依據。因為線上程池增長的時候,你可能會遇到排程器的侷限性,記憶體可用性,或者其他系統資源,比如 socket 的數量,開啟檔案的處理,或者資料庫連線等問題。

六、總結

  1. 使用JDK的方法建立會產生OOM情況,主要原因是用LinkedBlockingQueue佇列,該佇列可以導致OOM。
  2. 執行緒可以使用用阿里巴巴推薦的方法,但是因為定執行緒數量,並且佇列用的是ArrayBlockingQueue,所以效率較低,不過可以保證記憶體不會OOM。
  3. 無需自行編碼。Doug Lea 寫了一個傑出的開源併發工具包,java.util.concurrent,包含了互斥,信合,能夠在併發訪問下效能表現良好的集合類諸如佇列和雜湊表,以及一些工作佇列的實現。這個包裡的 PooledExecutor 類是一個高效的、被廣泛使用的、基於工作佇列的一個執行緒池的正確實現。不用再嘗試著自己去寫程式碼實現了,那樣很容易出錯,你可以考慮使用 java.util.concurrent 包裡的一些工具。
  4. 執行緒池是構建伺服器應用的很有用的一個工具。它的概念很簡單,但在實現或者使用的時候需要注意一些問題,比如死鎖,資源不足,以及 wait() 和 notify() 的複雜性。如果你發現自己的應用需要一個執行緒池,考慮一下使用 java.util.concurrent 包裡的某個 Executor 類,比如 PooledExecutor,不要去從頭寫一個。如果你發現你在建立一些要處理簡短任務的執行緒,你就應該考慮使用執行緒池了。

我的微信公眾號:架構真經(id:gentoo666),分享Java乾貨,高併發程式設計,熱門技術教程,微服務及分散式技術,架構設計,區塊鏈技術,人工智慧,大資料,Java面試題,以及前沿熱門資訊等。每日更新哦!

參考資料:

  1. https://blog.csdn.net/weixin_39770927/article/details/81360511
  2. https://blog.csdn.net/zhangqinfu/article/details/52931530
  3. https://blog.csdn.net/every__day/article/details/83900109
  4. https://blog.csdn.net/wwp231/article/details/52504687
  5. https://blog.csdn.net/qq360694660/article/details/78296919
  6. https://blog.csdn.net/defonds/article/details/43796951