1. 程式人生 > >執行緒池工作機制與原理

執行緒池工作機制與原理

書接上文,Java執行緒池
接下來記錄一下執行緒池的工作機制和原理

執行緒池的兩個核心佇列:

  • 執行緒等待池,即執行緒佇列BlockingQueue。
  • 任務處理池(PoolWorker),即正在工作的Thread列表(HashSet<Worker>)。

執行緒池的核心引數:

  • 核心池大小(corePoolSize),即固定大小,設定好之後,執行緒池的穩定峰值,達到這個值之後池的執行緒數大小不會釋放。
  • 最大處理執行緒池數(maximumPoolSize),當執行緒池裡面的執行緒數超過corePoolSize,小於maximumPoolSize時會動態建立與回收執行緒池裡面的執行緒池資源。

執行緒池的執行機制:
舉個栗子。假如有一個工廠,工廠裡面有10個人,每個工人同時只能做一件事情。因此只要當10個工人中有工人是空閒的,來了任務就分配給空閒的工人做;當10個工人都有任務時,如果還來任務,就把任務進行排隊等待。
如果說新任務數目增長的速度遠遠大於工作做任務的速度,那麼此時工廠的主管可能就需要採取補救措施了,比如重新招4個工人進來;然後就將任務分配給這4個剛招進來的工人處理。
如果說這14個工人做任務的速度還是不夠,此時工廠主管就要考慮不再接受新的任務或者拋棄前面的一些任務了。當這14個工人當中有人空閒時,而新任務增長的速度又比較緩慢,工廠主管就要考慮辭掉4個臨時工了,只保持原來10個工人,比較額外的工人是需要花費的。
而這個栗子中永遠等待幹活的10個工人機制就是workerQueue。這個栗子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。也就是說corePoolSize就是執行緒池的大小,maximumPoolSize在我看來就是一種執行緒池任務超過負荷的一種補救措施,即任務量突然過大時的一種補救措施。再看看下面圖好好理解一下。工人永遠在等待幹活,就像workerQueue永遠在迴圈幹活一樣,除非,整個執行緒池停止了。


執行緒池原理圖

執行緒池裡面的執行緒的時序圖如下圖所示:


執行緒的時序圖

自定義執行緒池與ExecutorService

自定義執行緒池需要用到ThreadFactory,本節將通過建立一個執行緒的例子對ExecutorService及其引數進行詳細講解。

1.認識ExecutorService家族

ExecutorService家族成員如下所示:


ExecutorService家族

使用startUML畫的,我是UML菜鳥,所以湊合著看下。

上圖中主要元素說明如下:
Executor:執行緒池的頂級介面,但是嚴格意義上講Executor並不是一個執行緒池,而只是一個執行執行緒的工具。
ExecutorService:真正執行緒池介面。這個介面繼承了Executor介面,並聲明瞭一些方法:
submit、invokeAll、invokeAny以及shutDown等。
AbstractExecutorService實現了ExecutorService介面,基本實現了ExecutorService中宣告的所有方法。
ThreadPoolExecutor:ExecutorService的預設實現,繼承了類AbstractExecutorService。
ScheduledExecutorService:與Timer/TimerTask類似,解決那些需要任務重複執行的問題。
ScheduledThreadPoolExecutor:繼承ThreadPoolExecutor的ScheduledExecutorService介面實現,週期性任務排程的類實現。
Executors是個執行緒工廠類,方便我們快速地建立執行緒池。

2.利用ThreadFactory建立一個執行緒

java.util.concurrent.ThreadFactory提供了一個建立執行緒的工廠的介面。
ThreadFactory原始碼如下:

public interface ThreadFactory{
  @override
  public Thread newThread(Runnable r);
}

我們可以看到上面的介面類中有一個newThread()的方法,為此我們自己手動定義一個執行緒工廠類,有木有激動啊,呵呵,下面我們就手動寫一個自己的執行緒工廠類吧!
MyThreadFactory.java

public class MyThreadFactory implements ThreadFactory{
  @Override
  public Thread newThread(Runnable r){
        return new Thread(r);
  }
}

上面已經建立好了我們自己的執行緒工廠類,但是啥都沒有做,就是直接new了一個Thread就返回回去了,我們一般在建立執行緒的時候,都需要定義其執行緒的名字,因為我們在定義了執行緒的名字之後就能在出現問題的時候根據監視工具來查詢錯誤的來源,所以我們來看下官方實現的ThreadFactory吧!
這個類在java.util.concurrent.Executors類中的靜態類中DefaultThreadFactory

/**
*  The default thread factory
*/
static class DefaultThreadFactory implements ThreadFactory{
  private static final AtomicInteger poolNumber=new AtomicInteger(1);
  private final ThreadGroup group;
  private final AtomicInteger threadNumber=new AtomicInteger(1);
  private final String namePrefix;

  DefaultThreadFactory(){
    SecurityManager s=System.getSecurityManager();
    group=(s!=null)?s.getThreadGroup():Thread.currentThread().getThreadGroup();
    namePrefix="pool-"+poolNumber.getAndIncrement()+"-thread-";
  }
  public Thread newThread(Runnable r){
      Thread t=new Thread(group,r,namePrefix+threadNumber.getAndIncrement(),0);
      if((t.isDaemon())
          t.setDaemon(false);
      if(t.getPriority()!=Thread.NORM_PRIORITY)
          t.setPriority(Thread.NORM_PRIORITY);
      return t;
  }
}

3.瞭解執行緒池的拒絕策略(RejectExecutionHandler)

當呼叫ThreadPoolExecutor的execute方法時,而此時執行緒池處於一個飽和的狀態,並且任務佇列也已經滿了那麼就需要做丟棄處理,RejectExecutionHandler就是這樣的一個處理介面類。
RejectExecutionHandler.java

public interface RejectedExecutionHandler {

    /**
     * Method that may be invoked by a {@link ThreadPoolExecutor} when
     * {@link ThreadPoolExecutor#execute execute} cannot accept a
     * task.  This may occur when no more threads or queue slots are
     * available because their bounds would be exceeded, or upon
     * shutdown of the Executor.
     *
     * <p>In the absence of other alternatives, the method may throw
     * an unchecked {@link RejectedExecutionException}, which will be
     * propagated to the caller of {@code execute}.
     *
     * @param r the runnable task requested to be executed
     * @param executor the executor attempting to execute this task
     * @throws RejectedExecutionException if there is no remedy
     */
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

在JDK裡面有4中拒絕策略,如下圖所示:


執行緒池拒絕策略
  • AbortPolicy:一言不合就拋異常(預設使用策略)。
  • CallerRunsPolicy:只用呼叫者所線上程來執行任務。
  • DiscardOldestPolicy:丟棄佇列裡最近的一個任務,並執行當前任務。
  • DiscardPolicy:不處理,直接丟棄。

來看下原始碼吧:
AbortPolicy : 一言不合就拋異常的

   /**
     * A handler for rejected tasks that throws a
     * {@code RejectedExecutionException}.
     */
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always.
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

CallerRunsPolicy:呼叫者所線上程來執行任務

    /**
     * A handler for rejected tasks that runs the rejected task
     * directly in the calling thread of the {@code execute} method,
     * unless the executor has been shut down, in which case the task
     * is discarded.
     */
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

DiscardOldestPolicy :丟棄佇列裡面最近的一個任務,並執行當前任務

    /**
     * A handler for rejected tasks that discards the oldest unhandled
     * request and then retries {@code execute}, unless the executor
     * is shut down, in which case the task is discarded.
     */
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }

        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

DiscardPolicy : 不處理,直接丟棄

/**
     * A handler for rejected tasks that silently discards the
     * rejected task.
     */
    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }

        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

思考問題:
為什麼有任務拒絕的情況發生呢:
這裡先假設有一個前提:執行緒池裡面有一個任務佇列,用於快取所有待處理的任務,正在處理的任務將從任務佇列中移除。因此,在任務佇列長度有限的情況下,就會出現現任務的拒絕情況,需要一種策略來處理髮生這種已滿無法加入的情況。另外,線上程池關閉的時候,也需要對任務加入佇列操作進行額外的協調處理。

4.ThreadPoolExecutor詳解

ThreadPoolExecutor類是執行緒池中最核心的一個類,因此如果要想透徹的瞭解Java執行緒池,必須先了解這個大BOSS,下面來看下其原始碼:
4種構造方法:

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime,TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime,TimeUnit unit,
                              BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);
    }

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
                             BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,RejectedExecutionHandler handler) {
        if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

通過原始碼我們清楚的看到,最終建構函式呼叫了最後一個建構函式,後面的那個建構函式才是真正的建構函式,接下來研究一下引數。

  • int corePoolSize:核心池大小,這個引數跟後面講的執行緒池原理有很大的關係。在建立了執行緒池之後,預設情況下,執行緒池中並沒有任何執行緒,而是等待所有的任務到來之時才進行建立執行緒去執行任務,除非呼叫了prestartAllCoreThreads()或者prestartCoreThread()方法 ,從這個兩個方法的名字可以知道是預建立執行緒的意思,即在沒有任務來臨之前先建立好corePoolSize個執行緒或者一個執行緒。預設情況下,在建立好執行緒池之後,執行緒池中的執行緒數為0,當有任務來之後,就會建立一個執行緒去執行任務,當執行緒池中的執行緒數量達到corePoolSize後,就會把達到的任務放到快取佇列中去。
  • int maximumPoolSize:執行緒池最大執行緒數量,這是個非常重要的引數,它表示線上程池中最多能建立執行緒的數量;在corePoolSize和maximumPoolSize的執行緒數會被自動釋放,而小於corePoolSize的則不會。
  • long keepAliveTime:表示執行緒沒有執行任務時最多保持多久時間會終止。預設情況下,只有當執行緒池中的執行緒數大於corePoolSize時,keepAliveTime才會生效,直到執行緒池數量不大於corePoolSize,即只有當執行緒池數量大於corePoolSize數量,超出這個數量的執行緒一旦到達keepAliveTime就會終止。但是如果呼叫了allowCoreThreadTimeout(boolean)方法,即使執行緒池的執行緒數量不大於corePoolSize,執行緒也會在keepAliveTime之後就終止,知道執行緒池的數量為0為止。
  • TimeUnit unit:引數keepAliveTime的時間單位,一個時間單位列舉類。
  • BlockingQueue workQueue:一個阻塞佇列,用來儲存等待執行任務的佇列,這個引數選擇也很重要,會對執行緒池的執行過程產生重大影響,一般來說,這裡的阻塞佇列就是(ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue;)。
  • ThreadFactory ThreadFactory:執行緒工廠,主要用來建立執行緒;可以是一個自定義的執行緒工廠,預設就是Executors.defaultThreadFactory()。用來線上程池中建立執行緒。
  • RejectedExecutionHandler handler:表示當拒絕處理任務時的策略,也是可以自定義的,預設是我們前面的4種取值:
  • ThreadPoolExecutor.AbortPolicy(預設的,一言不合即拋異常的)
  • ThreadPoolExecutor.DiscardPolicy(一言不合就丟棄任務)
  • ThreadPoolExecutor.DiscardOldestPolicy(一言不合就把最近的任務給拋棄,然後執行當前任務)
  • ThreadPoolExecutor.CallerRunsPolicy(由呼叫者所線上程來執行任務)

所以想自定義執行緒池就可以從上面的幾個引數入手。接下來具體看下程式碼,瞭解一下實現原理:

   // 預設異常處理機制
   private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
   //任務快取佇列,用來存放等待執行的任務
   private final BlockingQueue<Runnable> workQueue;
   //執行緒池的主要狀態鎖,對執行緒狀態(比如執行緒大小、runState等)的改變都需要這個鎖
   private final ReentrantLock mainLock = new ReentrantLock();
   //用來存放工作集
   private final HashSet<Worker> workers = new HashSet<Worker>();
   //volatile 可變變數關鍵字,寫的時候用mainLock做鎖,讀的時候無鎖,高效能
   private volatile long keepAliveTime;
   //是否允許核心執行緒超時
   private volatile boolean allowCoreThreadTimeOut;
   //核心執行緒數量
   private volatile int corePoolSize;
   //執行緒最大執行緒數量
   private volatile int maximumPoolSize;
   //任務拒絕策略
   private volatile RejectedExcutionHandler handler;

結合之前的知識,大概就能猜出裡面是怎麼實現的了,具體可以參考一下JDK的原始碼,這樣我們就能做到了解原理又會用了。

5.自定義實現一個簡單的Web請求連線池

我們來自定義一個簡單的Web請求執行緒池。模仿Web服務的需求場景說明如下:

  • 伺服器可容納的最小請求數是多少。
  • 可以動態擴充的請求數大小是多少。
  • 多久回收多餘執行緒數即請求數。
  • 使用者訪問量打了怎麼處理。
  • 執行緒佇列機制採取有優先順序的排隊的執行機制。
    根據上面的場景,看下這個執行緒池如何編寫?
    public class MyExecutors extends Executors{
      //利用預設執行緒工廠和PriorityBlockingQueue佇列機制,當然了,我們還可以自定義ThreadFactory和繼承queue進行自定義擴充套件
     public static ExecutorService newMyWebThreadPool(int minSpareThreads,int maxThreads,int maxIdleTime){
      return new ThreadPoolExecutor(minSpareThread,maxThreads,maxIdleTime,TimeUnit.MILLISECONDS,
            new PriorityBlockingQueue<Runnable>());
    }
    }

6.執行緒池在工作中的錯誤使用

  • (1)分不清楚執行緒是單例還是多物件。
  • (2)執行緒池數量設定很大。
  • (3)注意死鎖問題