1. 程式人生 > >J.U.C之線程池-ThreadPoolExecutor

J.U.C之線程池-ThreadPoolExecutor

perm problems sum 工廠方法 clas ble mic 行處理 linked

ThreadPoolExecutor

JDK1.8中對與ThreadPoolExecutor是這麽定義的:

/**
* An {@link ExecutorService} that executes each submitted task using
* one of possibly several pooled threads, normally configured
* using {@link Executors} factory methods.
*
* <p>Thread pools address two different problems: they usually
* provide improved performance when executing large numbers of
* asynchronous tasks, due to reduced per-task invocation overhead,
* and they provide a means of bounding and managing the resources,
* including threads, consumed when executing a collection of tasks.
* Each {@code ThreadPoolExecutor} also maintains some basic
* statistics, such as the number of completed tasks.
*/

ThreadPoolExecutor通常使用工廠方法(Executors)來配置執行實例,使用線程池中的線程來執行每一個提交的任務。ThreadPoolExecutor提供了兩個主要功能:減少調用每個線程的開銷,提高性能;提供了一系列方法來管理資源,監控執行。

接下來將會基於這兩點和ThreadPoolExecutor的源碼,對ThreadPoolExecutor進行解析:

  • 線程池的內部狀態
  • ThreadPoolExecutor的構造方法和參數說明
  • 工廠方法的實現和線程池的類型
  • 線程池任務的提交、執行、中斷和停止
  • 阻塞隊列的選擇和任務執行的策略
  • RejectedExecutionHandler任務的拒絕策略

線程池的內部狀態

public class ThreadPoolExecutor extends AbstractExecutorService {
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;//高3位111 能夠接收新任務,並對新添加的任務進行處理 private static final int SHUTDOWN = 0 << COUNT_BITS;//高3位000 不接受新的任務,但是可以對已經添加的任務進行處理 private static final int STOP = 1 << COUNT_BITS;//高3位001 不接收新的任務,不處理已經添加的任務,並且會中斷正在處理的任務 private static final int TIDYING = 2 << COUNT_BITS;//高3位010 當前所有的任務已經終止,然後會執行鉤子函數terminated() private static final int TERMINATED = 3 << COUNT_BITS;//高3位011 線程池徹底中止 // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; } }

變量ctl定義為AtomicInteger類型,共32位大小,記錄了“線程池中的任務數量”和“線程池的狀態”兩個信息,其中高3位表示"線程池狀態",低29位表示"線程池中的任務數量。

構造方法

ThreadPoolExecutor提供了四個重載的構造函數,先看最全的一個:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

這個構造函數一共有7個參數,每個參數的含義如下:

  • corePoolSize

線程池中核心線程的數量。在提交任務時如果線程池中核心線程的數量小於corePoolSize,線程池會新建一個線程來執行任務,直到當前線程數等於corePoolSize。

如果調用了線程池的prestartAllCoreThreads()方法,線程池會提前創建並啟動所有基本線程。

  • maximumPoolSize

線程池中允許的最大線程池數。在提交任務如果線程池的阻塞隊列已滿且當前線程池中核心線程的數量小於maximumPoolSize,則會新建一個線程來執行任務。

另外,如果線程池所用的BlockingQueue是無界的LinkedBlockingQueue,那麽該參數將沒有實際意義。

  • keepAliveTime

線程池所允許的空閑時間。線程的創建和銷毀是需要代價的。線程執行完任務後不會立即銷毀,而是繼續存活一段時間,這段時間由keepAliveTime和unit一起決定。默認情況下,該參數只有在線程數大於corePoolSize時才會生效。

  • unit(TimeUnit)
  • workQueue(BlockingQueue<Runable>)

用來保存等待執行的任務的阻塞隊列,等待的任務必須實現Runnable接口。BlockingQueue有七種實現和一種內部實現類,具體分析可以參考 https://www.cnblogs.com/CHMaple/p/9284583.html。

  • threadFactory

用於設置創建線程的工廠方法,默認使用Executors.defaultThreadFactory()方法,該方法返回了一個DefaultThreadFactory的實例。通過newThread()方法提供創建線程的功能,newThread()方法創建的線程都是“非守護線程”而且“線程優先級都是Thread.NORM_PRIORITY”。

    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

Executors工廠方法也提供了另一個線程工廠方法privilegedThreadFactory,該方法返回了一個PrivilegedThreadFactory的實例,該方法創建的新線程和當前線程具有相同的權限。

   static class PrivilegedThreadFactory extends DefaultThreadFactory {
        private final AccessControlContext acc;
        private final ClassLoader ccl;

        PrivilegedThreadFactory() {
            super();
            SecurityManager sm = System.getSecurityManager();
            if (sm != null) {
                // Calls to getContextClassLoader from this class
                // never trigger a security check, but we check
                // whether our callers have this permission anyways.
                sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);

                // Fail fast
                sm.checkPermission(new RuntimePermission("setContextClassLoader"));
            }
            this.acc = AccessController.getContext();
            this.ccl = Thread.currentThread().getContextClassLoader();
        }

        public Thread newThread(final Runnable r) {
            return super.newThread(new Runnable() {
                public void run() {
                    AccessController.doPrivileged(new PrivilegedAction<Void>() {
                        public Void run() {
                            Thread.currentThread().setContextClassLoader(ccl);
                            r.run();
                            return null;
                        }
                    }, acc);
                }
            });
        }
    }

  • handler(RejectedExecutionHandler)

線程池的拒絕執行策略,是指將任務添加到線程池而線程池拒絕該任務時所要采取的執行策略。如果線程池的線程已經達到最大數量且阻塞隊列也滿了,那麽線程池會根據構造函數的拒絕策略參數執行拒絕操作。

線程池默認提供了四種拒絕策略:AbortPolicy、CallerRunsPolicy、DiscardOldestPolicy、DiscardPolicy。

本文最後將會依次介紹每個策略的執行方式。

工廠方法的實現

工廠類Executors提供了三種默認的線程池:FixedThreadPool、SingleThreadPool、CachedThreadPool。

  • FixedThreadPool
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

  • SingleThreadPool
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

作為單一worker線程的線程池,SingleThreadExecutor把corePool和maximumPoolSize均被設置為1,和FixedThreadPool一樣使用的是無界隊列LinkedBlockingQueue,所以帶來的影響和FixedThreadPool一樣。

  • CachedThreadPool
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

CachedThreadPool的核心線程數是0,最大線程數是Integer的最大值,同時BlockingQueue使用的是SynchronousQueue,這個阻塞隊列的特點是沒有容量,每一個put操作必須要等待一個take操作,否則不能添加元素。

所以當新的任務到來的時候,CachedThreadPool會重用一個空閑的線程執行任務或者創建一個新的線程來執行任務。keepAliveTime和unit的值分別是60和秒,意味著空閑的進程超過60秒就會被終止。

CachedThreadPool的優點是在線程資源和系統資源充足的情況下會盡可能快的完成任務,但是缺點也很明顯,當線程的處理速度跟不上主線程提交任務的速度的時候,CachedThreadPool會不斷創建新的線程來執行任務,最終可能導致系統耗盡CPU和內存資源。

所以使用CachedThreadPool的時候一定要註意控制並發的任務數,否則高並發的情況下可能會導致嚴重的性能問題。

J.U.C之線程池-ThreadPoolExecutor