1. 程式人生 > >java 執行緒池初步涉足

java 執行緒池初步涉足

   先看以下程式碼,是我們建立執行緒池的一種方式:

ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();

  可以進入Executors類看一下,java建立執行緒池的四種方式,分別有以下四個大類:

(1)newCachedThreadPool建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒(空閒執行緒超過存活時間的可以回收),若無可回收,則新建執行緒。

(2)newFixedThreadPool 建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。

(3)newScheduledThreadPool 建立一個定長執行緒池,支援定時及週期性任務執行。

(4)newSingleThreadExecutor 建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。

    當然在Executors中創先以上四種執行緒池的方式還有很多,一會有所涉及。

    我們現在進入Executors類看看建立上述四種執行緒池有哪些方式:

(1)newFixedThreadPool

    /**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * {@code nThreads} threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }


    /**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue, using the provided
     * ThreadFactory to create new threads when needed.  At any point,
     * at most {@code nThreads} threads will be active processing
     * tasks.  If additional tasks are submitted when all threads are
     * active, they will wait in the queue until a thread is
     * available.  If any thread terminates due to a failure during
     * execution prior to shutdown, a new one will take its place if
     * needed to execute subsequent tasks.  The threads in the pool will
     * exist until it is explicitly {@link ExecutorService#shutdown
     * shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @param threadFactory the factory to use when creating new threads
     * @return the newly created thread pool
     * @throws NullPointerException if threadFactory is null
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

     這邊有兩種方式是用來建立newFixedThreadPool執行緒池的,一種是直接傳入執行緒的個數,另外一種是傳進執行緒的個數,還有執行緒工廠(ThreadFactory)。有些任務需求對於如何建立執行緒有自己安排,所以需要傳入自己實現ThreadFactory介面的執行緒工廠類。仔細看,其實這兩個方法都使用了類ThreadPoolExecutor。可以去類ThreadPoolExecutor中看看裡面建構函式中引數的意思,其建構函式程式碼如下:可以看到他呼叫了this(....)建構函式,我順便把他底層的建構函式都寫在後面,方便檢視。

       public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    } 
   /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

這裡面一共有六個引數,分別如下:

corePoolSize是執行緒池中所維持的執行緒池大小。這些執行緒即使是空閒狀態,也會存在於執行緒池中,不會被銷燬。這個執行緒池的核心執行緒數也可以設定為0,像newCachedThreadPool中就是將核心執行緒數設定為0的。

maxPoolSize是執行緒池的最大執行緒數。這個最大執行緒數是對於或等於核心執行緒數的,當你用newFixedThreadPool和newSingleThreadPool時,核心執行緒數和最大執行緒數是相等的,這一點你可以在這兩種執行緒池的建構函式中得到驗證。

keepAliveTime是用來約束某種執行緒的生存時間的,這個某種執行緒是什麼意思呢,當執行緒池所使用的執行緒數設為n,這個某種執行緒是一類執行緒,就是n-corePoolSize的執行緒,就是超過核心執行緒數的執行緒。當執行緒池所建立的執行緒數大於核心執行緒數時,此時會計算當前的這些執行緒中哪些執行緒是空閒的,當空閒時間超過這個設定的keepAliveTime時,該執行緒就會被回收。

unit就是時間單位

workQueue:工作佇列,該佇列是當核心執行緒數已經使用完了,之後提交的任務就將放入這個工作佇列,尤其需要注意的是,只有當你用execute()方法提交的任務才會放在這個工作佇列中。一般到佇列滿的時候,才會建立先的執行緒來執行任務,當然要保證執行緒池的所有執行緒數小於或等於最大執行緒數。

threadFactory:執行緒工廠用於建立執行緒的。有些需求可能需要自己建立的執行緒。

handler:是拒絕策略,也叫飽和策略,當執行緒池中的執行緒數達到maxPoolSize,並且workQueue(有界佇列)已經滿了,就需要採用飽和策略了,對於新提交的任務是直接拒絕還是怎麼樣,是在這邊設定的。

      這邊瞭解了底層建立執行緒池的引數,回過頭來看看newFixedThreadPool是怎麼建立執行緒池的:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

newFixedThreadPool是是有一個引數,就是執行緒數nThread。這個引數用於設定核心執行緒數和最大執行緒數,其中的工作佇列是LinkedBlockingQueue,這個佇列是連結串列實現的,也就是說該工作佇列不會滿,除非你限制他的長度。而且該執行緒池中的最大執行緒數就是核心執行緒數。

接著我們看看其他幾個執行緒池的構造方式。

(2)newSingleThreadExecutor

看看建構函式,核心執行緒數和最大執行緒數都是1,生存時間0,也是LinkedBlockingQueue

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

(3)newCachedThreadPool

這邊有兩個建構函式,就是傳入承諾書不一樣,一個啥也沒傳,一個穿了執行緒工廠。最大執行緒數是Integer的最大值,核心執行緒數為0,執行緒最大空閒時間為60秒。工作佇列是SynchronousQueue,這個queue是即是交付的,就是任務一到就建立執行緒去執行。整個工作機制如下:來一個任務就建立執行緒去執行,當然有空閒執行緒讓這些空閒執行緒去執行,空閒執行緒有一個60s生存時間。

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    /**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available, and uses the provided
     * ThreadFactory to create new threads when needed.
     * @param threadFactory the factory to use when creating new threads
     * @return the newly created thread pool
     * @throws NullPointerException if threadFactory is null
     */
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

(4)newScheuledThreadPool

   /**
     * Creates a thread pool that can schedule commands to run after a
     * given delay, or to execute periodically.
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle
     * @return a newly created scheduled thread pool
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

    /**
     * Creates a thread pool that can schedule commands to run after a
     * given delay, or to execute periodically.
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle
     * @param threadFactory the factory to use when the executor
     * creates a new thread
     * @return a newly created scheduled thread pool
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     * @throws NullPointerException if threadFactory is null
     */
    public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }

這個定時執行緒池底層使用了ScheuledThreadPoolExecytor類,那我們去看看這個類是怎麼構造的

   /**
     * Creates a new {@code ScheduledThreadPoolExecutor} with the
     * given core pool size.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

該類繼承 extends ThreadPoolExecutor,所使用的引數就是執行緒池核心執行緒數,最大執行緒數是Integer的最大值,佇列用了延遲佇列,沒有生存時間這一說法。