1. 程式人生 > >執行緒池原始碼剖析

執行緒池原始碼剖析

執行緒池的作用

執行緒池能有效的處理多個執行緒的併發問題,避免大量的執行緒因為互相強佔系統資源導致阻塞現象,能夠有效的降低頻繁建立和銷燬執行緒對效能所帶來的開銷。

執行緒池的真相

真正執行緒池的實現是通過ThreadPoolExecutor,ThreadPoolExecutor通過配置不同的引數配置來建立執行緒池。下面簡單的介紹一下各個執行緒池的區別和用處。

fixThreadPool 固定執行緒池

Executor executor = Executors.newFixedThreadPool(10);
 public static ExecutorService newFixedThreadPool
(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }

我的理解這是一個有指定的執行緒數的執行緒池,有核心的執行緒,裡面有固定的執行緒數量,響應的速度快。固定的執行緒數由系統資源設定。

注:核心執行緒是沒有超時機制的,佇列大小沒有限制,除非執行緒池關閉了核心執行緒才會被回收。

CacheThreadPool 快取執行緒池

    Executor executor =  Executors.newCachedThreadPool();
 public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new
SynchronousQueue<Runnable>()); }

構造方法解釋:只有非核心執行緒,最大執行緒數很大(Int.Max(values)),它會為每一個任務新增一個新的執行緒,這邊有一個超時機制,當空閒的執行緒超過60s內沒有用到的話,就會被回收。缺點就是沒有考慮到系統的實際記憶體大小。

SingleThreadPool 單執行緒執行緒池

Executor executor = Executors.newSingleThreadExecutor();
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
        private final ExecutorService e;
        DelegatedExecutorService(ExecutorService executor) { e = executor; }

看這個名字就知道這個傢伙是隻有一個核心執行緒,底層還進行了封裝。但其實就是隻是將執行緒池的應用加了final關鍵字

ScheduledThreadPool

Executors.newScheduledThreadPool(10);
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

這個執行緒池就厲害了,是唯一一個有延遲執行和週期重複執行的執行緒池。它的核心執行緒池固定,非核心執行緒的數量沒有限制,但是閒置時會立即會被回收。

執行緒池最核心方法

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
    }

這個方法是所有執行緒池呼叫的最底層方法。總共有7個引數(面試曾問過引數的含義)

corePoolSize

核心池的大小,,執行緒池預設建立的執行緒數。除非執行緒池銷燬,否則就不會銷燬

maximumPoolSize

執行緒池最大執行緒數,它表示線上程池中最多能建立多少個執行緒;

keepAliveTime

表示執行緒沒有任務執行時最多保持多久時間會終止。預設情況下,只有當執行緒池中的執行緒數大於corePoolSize時,keepAliveTime才會起作用,直到執行緒池中的執行緒數不大於corePoolSize,即當執行緒池中的執行緒數大於corePoolSize時,如果一個執行緒空閒的時間達到keepAliveTime,則會終止,直到執行緒池中的執行緒數不超過corePoolSize。但是如果呼叫了allowCoreThreadTimeOut(boolean)方法,線上程池中的執行緒數不大於corePoolSize時,keepAliveTime引數也會起作用,直到執行緒池中的執行緒數為0;

unit

引數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性:
- TimeUnit.DAYS; //天
- TimeUnit.HOURS; //小時
- TimeUnit.SECONDS; //秒
- TimeUnit.MILLISECONDS; //毫秒
- TimeUnit.MICROSECONDS; //微妙
- TimeUnit.NANOSECONDS; //納秒

workQueue

一個阻塞佇列,用來儲存等待執行的任務,這個引數的選擇也很重要,會對執行緒池的執行過程產生重大影響,一般來說,這裡的阻塞佇列有以下幾種選擇:
- ArrayBlockingQueue:基於陣列的先進先出佇列,此佇列建立時必須指定大小;
- LinkedBlockingQueue:基於連結串列的先進先出佇列,如果建立時沒有指定此佇列大小,則預設為Integer.MAX_VALUE;
- synchronousQueue:這個佇列比較特殊,它不會儲存提交的任務,而是將直接新建一個執行緒來執行新來的任務。

ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。執行緒池的排隊策略與BlockingQueue有關。

threadFactory

執行緒工廠,主要用來建立執行緒;

handler

表示當拒絕處理任務時的策略,有以下四種取值:

ThreadPoolExecutor.AbortPolicy:丟棄任務並丟擲RejectedExecutionException異常。 
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不丟擲異常。 
ThreadPoolExecutor.DiscardOldestPolicy:丟棄佇列最前面的任務,然後重新嘗試執行任務(重複此過程)
ThreadPoolExecutor.CallerRunsPolicy:由呼叫執行緒處理該任務

執行緒池家族譜

ExecutorService extend Executor
    AbstractExecutorService implements ExecutorService
    ThreadPoolExecutor extends AbstractExecutorService

Executor是一個頂層介面,在它裡面只聲明瞭一個方法execute(Runnable),返回值為void,引數為Runnable型別,從字面意思可以理解,就是用來執行傳進去的任務的;

然後ExecutorService介面繼承了Executor介面,並聲明瞭一些方法:submit、invokeAll、invokeAny以及shutDown等;

抽象類AbstractExecutorService實現了ExecutorService介面,基本實現了ExecutorService中宣告的所有方法;

然後ThreadPoolExecutor繼承了類AbstractExecutorService。

執行緒池的狀態

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
 // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

值分別是。-1,0,1,2,3,
當建立執行緒池後,初始時,執行緒池處於RUNNING狀態;

如果呼叫了shutdown()方法,則執行緒池處於SHUTDOWN狀態,此時執行緒池不能夠接受新的任務,它會等待所有任務執行完畢;

如果呼叫了shutdownNow()方法,則執行緒池處於STOP狀態,此時執行緒池不能接受新的任務,並且會去嘗試終止正在執行的任務;

當執行緒池處於SHUTDOWN或STOP狀態,並且所有工作執行緒已經銷燬,任務快取佇列已經清空或執行結束後,執行緒池被設定為TERMINATED狀態。

執行緒池的重要變數

private final BlockingQueue<Runnable> workQueue;              //任務快取佇列,用來存放等待執行的任務
private final ReentrantLock mainLock = new ReentrantLock();   //執行緒池的主要狀態鎖,對執行緒池狀態(比如執行緒池大小
                                                              //、runState等)的改變都要使用這個鎖
private final HashSet<Worker> workers = new HashSet<Worker>();  //用來存放工作集

private volatile long  keepAliveTime;    //執行緒存活時間   
private volatile boolean allowCoreThreadTimeOut;   //是否允許為核心執行緒設定存活時間
private volatile int   corePoolSize;     //核心池的大小(即執行緒池中的執行緒數目大於這個引數時,提交的任務會被放進任務快取佇列)
private volatile int   maximumPoolSize;   //執行緒池最大能容忍的執行緒數

private volatile int   poolSize;       //執行緒池中當前的執行緒數

private volatile RejectedExecutionHandler handler; //任務拒絕策略

private volatile ThreadFactory threadFactory;   //執行緒工廠,用來建立執行緒

private int largestPoolSize;   //用來記錄執行緒池中曾經出現過的最大執行緒數

private long completedTaskCount;   //用來記錄已經執行完畢的任務個數