1. 程式人生 > >Java線程池之ThreadPoolExecutor

Java線程池之ThreadPoolExecutor

一次 ray 線程的創建 代碼 記錄日誌 turn throwable 限制 dex

前言

  線程池可以提高程序的並發性能(當然是合適的情況下),因為對於沒有線程的情況下,我們每一次提交任務都新建一個線程,這種方法存在不少缺陷:

1. 線程的創建和銷毀的開銷非常高,線程的創建需要時間,會延遲任務的執行,會消耗大量的系統資源。

2. 活躍的線程會消耗系統資源,而大量的空閑線程會占用許多內存,給垃圾回收器帶來很大的壓力,而大量線程在競爭CPU資源的時間還會產生氣體的性能開銷。

3. 系統在可創建的線程上存在一個限制,如果超過了這個限制,很可能拋出OOM。

  我們不難發現,在一定範圍下,增加線程能夠提高系統的吞吐量,而當線程數超過合理值後,增加的線程反而會降低程序的執行速度。

Executor的引入

  在JDK1.5中,我們引入了一個線程池框架,Executor框架,它能夠分解任務的創建和執行過程。它包括Executor、ExecutorService、Callable等接口和Executors、ThreadPoolExecutor實現類等。

註意點

  當然了看待事物需要辯證,是否使用了Executor框架就能很好地將復雜的任務執行解耦開來的。這邊我們其實需要限制一下它的使用範圍。

  • 對於依賴型的任務而言,不是很適合使用線程池去操作,容易引發死鎖,因為這種情況下我們需要小心維持這些任務的執行順序,以保證不會觸發死鎖。
  • 對於通過線程封閉實現線程安全的任務而言,使用單線程的Executor能夠保證更安全的並發。
  • 使用ThreadLocal的任務,因為線程池會復用線程,這將導致任務的ThreadLocal值失去意義(除非線程本地值受限於任務的生命周期)。
  • 對響應時間敏感的任務,假設我們將一個執行時間很長的任務,或者多個執行時間很長的任務放到一個單線程的Executor中或者一個包含少量線程的線程池中,都會降低程序的響應速度。

合適的線程數

  線程池在對處理同一類型的任務且相互獨立的時候,能達到性能上的最佳,否則任務時長不一致很容易引起擁塞或是饑餓。

那線程池的線程數以多少為合適呢,對於計算密集型的任務而言,我們最好設置的線程數 = CPU數+1;(+1是為了保證當某個線程因為缺頁故障或其他原因而暫停時,這個+1的線程能夠確保CPU的時鐘周期不會被浪費);而對於包含IO操作或者其他阻塞操作的任務時,由於線程不會一直執行,所以線程池的規模應該更大點。

線程池的使用 

 在實際使用過程中,我們一般借助於ThreadPoolExecutor來完成線程池的創建。ThreadPoolExecutor具有極好的擴展性,除了系統提供的四種常用的線程池,如CachedThreadPoolExecutor,FixedThreadPoolExecutor,SingleThreadPoolExecutor,ScheduledThreadExecutor。我們可以自定義線程池的構造函數,如線程池的基本線程數、最大線程數、線程池的超時時間(時間+時間單位),線程池的任務隊列,線程池的線程工廠,線程池的飽和策略。

當然,我們在使用系統提供的四種線程池的時候,同樣可以在後來修改線程池的配置。

線程的任務隊列

LinkedBlockQueue:無界 CachedThreadPoolExecutor FixedThreadPoolExecutor的默認任務隊列

ArrayBlockQueue:有界

PriorityQueue:優先級隊列,有界 ScheduledThreadPoolExecutor的默認任務隊列

SynchronousQueue:同步移交,隊列容量為0,僅當有線程準備好時才會將任務放到隊列中。

飽和策略

飽和策略的設置

  對於有界隊列而言,當有界隊列被填滿後,這時候我們需要用到線程的飽和策略,前面提到我們可以在後面配置線程池的設置,而飽和策略的修改就是通過ThreadPoolExecutor的setRejectedExecutionHandler方法來進行修改的(當某個任務唄提交到一個已經被關閉的Executor中,也會用到飽和策略)

飽和策略主要有以下幾種:AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy。

  1. AbortPolicy:中止策略,該策略將拋出未受檢的RejectedExecution,調用者可以捕獲這個異常根據實際需要編寫直接的處理代碼。
  2. CallerRunsPolicy:調用者運行,該策略不會拋棄任務,也不會拋出異常,而是將某些任務回退到調用者中,從而降低新任務的流量。它不會在線程池的某個線程中執行新提交的任務,而是調用了execute的線程中執行任務,當線程池的任務隊列被填滿後,下一個任務會在調用execute時在主線程中執行,由於執行任務需要一定時間,這段時間內主線程顯然不能提交任務,從而保證線程池在這段時間內處理現有任務。
  3. DiscardPolicy:拋棄策略,舍棄任務。
  4. DiscardOldestPolicy:拋棄下一個將被執行的任務,然後嘗試重新提交新的任務,(不適用於和優先隊列合用,因為這樣將拋棄的將是優先級最高的等待任務)

當然,我們可以使用Semaphore(信號量)來控制任務的提交速率。

線程工廠 

  我們可以通過自定義線程工廠,來實現我們自己的線程。具體示例如下所示:

  自定義的線程工廠,記錄了線程池的名字。

import java.util.concurrent.ThreadFactory;

/**
 * Created by DB on 2017/9/1.
 */
public class MyThreadFactory implements ThreadFactory {
    private final String poolName;

    public MyThreadFactory(String poolName) {
        this.poolName = poolName;
    }

    @Override
    public Thread newThread(Runnable r) {
        return new MyAPPThread(r,poolName);
    }
}

自定義的線程類:實現了指定線程的名字,設置自定義UncaughtExecptionHandler。

import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * Created by DB on 2017/9/1.
 */
public class MyAPPThread extends Thread {
    public static  final  String DEFALUT_NAME = "MyAppThread";
    private static volatile boolean debugLifecycle = false;
    private static final AtomicInteger created = new AtomicInteger();
    private static final AtomicInteger alive = new AtomicInteger();
    private static final Logger log = Logger.getAnonymousLogger();

    public MyAPPThread(Runnable r){
        this(r,DEFALUT_NAME);
    }
    public MyAPPThread(Runnable r,String name){
        super(r,name +"-"+created.incrementAndGet());
        setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
            @Override
            public void uncaughtException(Thread t, Throwable e) {
                log.log(Level.SEVERE,"Uncaught in thread"+t.getName(),e);
            }
        });
    }

    public void run(){
        boolean debug = debugLifecycle;
        if(debug){
            log.log(Level.FINE,"Created" +getName());
        }
        try {
            alive.incrementAndGet();
            super.run();
        }finally {
            alive.decrementAndGet();
            if(debug){
                log.log(Level.FINE,"Exiting"+getName());
            }
        }
    }
    public static int getThreadsCreated(){
        return  created.get();
    }
    public static int getThreadsAlive(){
        return alive.get();
    }
    public static boolean getDebug(){
        return  debugLifecycle;
    }
    public static  void setDebug(Boolean b){
        debugLifecycle=b;
    }
}

擴展ThreadPoolExecutor

  ThreadPoolExecutor是可擴展的,它提供了幾個在子類中可以改寫的方法:beforeExecute、afterExecute和terminated,通過實現這些方法我們可以實現擴展。

在執行任務的線程中將調用beforeExecute和afterExecute方法,在這些方法中我們可以添加日誌、計時。監視或者統計信息收集的功能。無論任務是從run中正常返回還是拋出一個異常而返回,afterExecute都會被調用。而beforeExecute拋出RuntimeException時,任務將不被執行,afterExecute當然也不會被調用。

在線程池完成關閉操作時,會調用terminated,也就在所有任務都已經完成並且所有工作者線程也已經關閉後。在這個方法中我們可以實現Executor在其生命周期中分配的各種資源,還有執行發送通知、記錄日誌或者收集finalize統計信息等操作。

  具體的框架如下:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * Created by DB on 2017/9/1.
 */
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
    }

    @Override
    protected void terminated() {
        super.terminated();
    }
}

Java線程池之ThreadPoolExecutor