1. 程式人生 > >危險的Hystrix執行緒池

危險的Hystrix執行緒池

本文介紹Hystrix執行緒池的工作原理和引數配置,指出存在的問題並提供規避方案,閱讀本文需要對Hystrix有一定的瞭解。

文字討論的內容,基於hystrix 1.5.18:

    <dependency>
      <groupId>com.netflix.hystrix</groupId>
      <artifactId>hystrix-core</artifactId>
      <version>1.5.18</version>
    </dependency>

執行緒池和Hystrix Command之間的關係

當hystrix command的隔離策略配置為執行緒,也就是execution.isolation.strategy設定為THREAD時,command中的程式碼會放到執行緒池裡執行,跟發起command呼叫的執行緒隔離開。摘要官方wiki如下:

execution.isolation.strategy

This property indicates which isolation strategy HystrixCommand.run() executes with, one of the following two choices:

THREAD — it executes on a separate thread and concurrent requests are limited by the number of threads in the thread-pool
SEMAPHORE — it executes on the calling thread and concurrent requests are limited by the semaphore count

一個線上的服務,往往會有很多hystrix command分別用來管理不同的外部依賴。 但會有幾個hystrix執行緒池存在呢,這些command跟執行緒池的對應關係又是怎樣的呢,是一對一嗎?

答案是不一定,command跟執行緒池可以做到一對一,但通常不是,受到HystrixThreadPoolKey和HystrixCommandGroupKey這兩項配置的影響。

優先採用HystrixThreadPoolKey來標識執行緒池,如果沒有配置HystrixThreadPoolKey那麼就使用HystrixCommandGroupKey來標識。command跟執行緒池的對應關係,就看HystrixCommandKey、HystrixThreadPoolKey、HystrixCommandGroupKey這三個引數的配置。

獲取執行緒池標識的程式碼如下,可以看到跟我的描述是一致的:

    /*
     * ThreadPoolKey
     *
     * This defines which thread-pool this command should run on.
     *
     * It uses the HystrixThreadPoolKey if provided, then defaults to use HystrixCommandGroup.
     *
     * It can then be overridden by a property if defined so it can be changed at runtime.
     */
    private static HystrixThreadPoolKey initThreadPoolKey(HystrixThreadPoolKey threadPoolKey, HystrixCommandGroupKey groupKey, String threadPoolKeyOverride) {
        if (threadPoolKeyOverride == null) {
            // we don't have a property overriding the value so use either HystrixThreadPoolKey or HystrixCommandGroup
            if (threadPoolKey == null) {
                /* use HystrixCommandGroup if HystrixThreadPoolKey is null */
                return HystrixThreadPoolKey.Factory.asKey(groupKey.name());
            } else {
                return threadPoolKey;
            }
        } else {
            // we have a property defining the thread-pool so use it instead
            return HystrixThreadPoolKey.Factory.asKey(threadPoolKeyOverride);
        }
    }

Hystrix會保證同一個執行緒池標識只會建立一個執行緒池:

    /*
     * Use the String from HystrixThreadPoolKey.name() instead of the HystrixThreadPoolKey instance as it's just an interface and we can't ensure the object
     * we receive implements hashcode/equals correctly and do not want the default hashcode/equals which would create a new threadpool for every object we get even if the name is the same
     */
    /* package */final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();

    /**
     * Get the {@link HystrixThreadPool} instance for a given {@link HystrixThreadPoolKey}.
     * <p>
     * This is thread-safe and ensures only 1 {@link HystrixThreadPool} per {@link HystrixThreadPoolKey}.
     *
     * @return {@link HystrixThreadPool} instance
     */
    /* package */static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
        // get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work
        String key = threadPoolKey.name();

        // this should find it for all but the first time
        HystrixThreadPool previouslyCached = threadPools.get(key);
        if (previouslyCached != null) {
            return previouslyCached;
        }

        // if we get here this is the first time so we need to initialize
        synchronized (HystrixThreadPool.class) {
            if (!threadPools.containsKey(key)) {
                threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
            }
        }
        return threadPools.get(key);
    }

Hystrix執行緒池引數一覽

  • coreSize 核心執行緒數量
  • maximumSize 最大執行緒數量
  • allowMaximumSizeToDivergeFromCoreSize 允許maximumSize大於coreSize,只有配了這個值coreSize才有意義
  • keepAliveTimeMinutes 超過這個時間多於coreSize數量的執行緒會被回收,只有maximumsize大於coreSize,這個值才有意義
  • maxQueueSize 任務佇列的最大大小,當執行緒池的執行緒執行緒都在工作,也不能建立新的執行緒的時候,新的任務會進到佇列裡等待
  • queueSizeRejectionThreshold 任務佇列中儲存的任務數量超過這個值,執行緒池拒絕新的任務。這跟maxQueueSize本來是一回事,只是受限於hystrix的實現方式maxQueueSize不能動態配置,所以有了這個配置。

根據給定的執行緒池引數猜測執行緒池表現

可以看到hystrix的執行緒池引數跟JDK執行緒池ThreadPoolExecutor引數很像但又不一樣,即便是完整地看了文件,仍然讓人迷惑。不過無妨,先來猜猜幾種配置下的表現。

coreSize = 2; maxQueueSize = 10

執行緒池中常駐2個執行緒。新任務提交到執行緒池,有空閒執行緒則直接執行,否則入隊等候。等待佇列中的任務數=10時,拒絕接受新任務。

coreSize = 2; maximumSize = 5; maxQueueSize = -1

執行緒池中常駐2個執行緒。新任務提交到執行緒池,有空閒執行緒則直接執行,沒有空閒執行緒時,如果當前執行緒數小於5則建立1個新的執行緒用來執行任務,否則拒絕任務。

coreSize = 2; maximumSize = 5; maxQueueSize = 10

這種配置下從官方文件中已經看不出來實際表現會是怎樣的。猜測有如下兩種可能:

  • 可能一。執行緒池中常駐2個執行緒。新任務提交到執行緒池,2個執行緒中有空閒則直接執行,否則入隊等候。當2個執行緒都在工作且等待佇列中的任務數=10時,開始為新任務建立執行緒,直到執行緒數量為5,此時開始拒絕新任務。這樣的話,對資源敏感型的任務比較友好,這也是JDK執行緒池ThreadPoolExecutor的行為。

  • 可能二。執行緒池中常駐2個執行緒。新任務提交到執行緒池,有空閒執行緒則直接執行,沒有空閒執行緒時,如果當前執行緒數小於5則建立1個新的執行緒用來執行任務。當執行緒數量達到5個且都在工作時,任務入隊等候。等待佇列中的任務數=10時,拒絕接受新任務。這樣的話,對延遲敏感型的任務比較友好。

兩種情況都有可能,從文件中無法確定究竟如何。

併發情況下Hystrix執行緒池的真正表現

本節中,通過測試來看看執行緒池的行為究竟會怎樣。

還是這個配置:

coreSize = 2; maximumSize = 5; maxQueueSize = 10

我們通過不斷提交任務到hystrix執行緒池,並且在任務的執行程式碼中使用CountDownLatch佔住執行緒來模擬測試,程式碼如下:

public class HystrixThreadPoolTest {

  public static void main(String[] args) throws InterruptedException {
    final int coreSize = 2, maximumSize = 5, maxQueueSize = 10;
    final String commandName = "TestThreadPoolCommand";

    final HystrixCommand.Setter commandConfig = HystrixCommand.Setter
        .withGroupKey(HystrixCommandGroupKey.Factory.asKey(commandName))
        .andCommandKey(HystrixCommandKey.Factory.asKey(commandName))
        .andCommandPropertiesDefaults(
            HystrixCommandProperties.Setter()
                .withExecutionTimeoutEnabled(false))
        .andThreadPoolPropertiesDefaults(
            HystrixThreadPoolProperties.Setter()
                .withCoreSize(coreSize)
                .withMaximumSize(maximumSize)
                .withAllowMaximumSizeToDivergeFromCoreSize(true)
                .withMaxQueueSize(maxQueueSize)
                .withQueueSizeRejectionThreshold(maxQueueSize));

    // Run command once, so we can get metrics.
    HystrixCommand<Void> command = new HystrixCommand<Void>(commandConfig) {
      @Override protected Void run() throws Exception {
        return null;
      }
    };
    command.execute();
    Thread.sleep(100);

    final CountDownLatch stopLatch = new CountDownLatch(1);
    List<Thread> threads = new ArrayList<Thread>();

    for (int i = 0; i < coreSize + maximumSize + maxQueueSize; i++) {
      final int fi = i + 1;

      Thread thread = new Thread(new Runnable() {
        public void run() {
          try {
            HystrixCommand<Void> command = new HystrixCommand<Void>(commandConfig) {
              @Override protected Void run() throws Exception {
                stopLatch.await();
                return null;
              }
            };
            command.execute();
          } catch (HystrixRuntimeException e) {
            System.out.println("Started Jobs: " + fi);
            System.out.println("Job:" + fi + " got rejected.");
            printThreadPoolStatus();
            System.out.println();
          }
        }
      });
      threads.add(thread);
      thread.start();
      Thread.sleep(200);

      if(fi == coreSize || fi == coreSize + maximumSize || fi == coreSize + maxQueueSize ) {
        System.out.println("Started Jobs: " + fi);
        printThreadPoolStatus();
        System.out.println();
      }
    }

    stopLatch.countDown();

    for (Thread thread : threads) {
      thread.join();
    }

  }

  static void printThreadPoolStatus() {
    for (HystrixThreadPoolMetrics threadPoolMetrics : HystrixThreadPoolMetrics.getInstances()) {
      String name = threadPoolMetrics.getThreadPoolKey().name();
      Number poolSize = threadPoolMetrics.getCurrentPoolSize();
      Number queueSize = threadPoolMetrics.getCurrentQueueSize();
      System.out.println("ThreadPoolKey: " + name + ", PoolSize: " + poolSize + ", QueueSize: " + queueSize);
    }

  }

}

執行程式碼得到如下輸出:

// 任務數 = coreSize。此時coreSize個執行緒在工作
Started Jobs: 2
ThreadPoolKey: TestThreadPoolCommand, PoolSize: 2, QueueSize: 0

// 任務數 > coreSize。此時仍然只有coreSize個執行緒,多於coreSize的任務進入等候佇列,沒有建立新的執行緒  
Started Jobs: 7
ThreadPoolKey: TestThreadPoolCommand, PoolSize: 2, QueueSize: 5

// 任務數 = coreSize + maxQueueSize。此時仍然只有coreSize個執行緒,多於coreSize的任務進入等候佇列,沒有建立新的執行緒  
Started Jobs: 12
ThreadPoolKey: TestThreadPoolCommand, PoolSize: 2, QueueSize: 10

// 任務數 > coreSize + maxQueueSize。此時仍然只有coreSize個執行緒,等候佇列已滿,新增任務被拒絕 
Started Jobs: 13
Job:13 got rejected.
ThreadPoolKey: TestThreadPoolCommand, PoolSize: 2, QueueSize: 10

Started Jobs: 14
Job:14 got rejected.
ThreadPoolKey: TestThreadPoolCommand, PoolSize: 2, QueueSize: 10

Started Jobs: 15
Job:15 got rejected.
ThreadPoolKey: TestThreadPoolCommand, PoolSize: 2, QueueSize: 10

Started Jobs: 16
Job:16 got rejected.
ThreadPoolKey: TestThreadPoolCommand, PoolSize: 2, QueueSize: 10

Started Jobs: 17
Job:17 got rejected.
ThreadPoolKey: TestThreadPoolCommand, PoolSize: 2, QueueSize: 10

完整的測試程式碼,參見這裡

可以看到Hystrix執行緒池的實際表現,跟之前的兩種猜測都不同,跟JDK執行緒池的表現不同,跟另一種合理猜測也不通。當maxSize > coreSize && maxQueueSize != -1的時候,maxSize這個引數根本就不起作用,執行緒數量永遠不會超過coreSize,對於的任務入隊等候,佇列滿了,就直接拒絕新任務。

不得不說,這是一種讓人疑惑的,非常危險的,容易配置錯誤的執行緒池表現。

JDK執行緒池ThreadPoolExecutor

繼續分析Hystrix執行緒池的原理之前,先來複習一下JDK中的執行緒池。

只說跟本文討論的內容相關的引數:

  • corePoolSize核心執行緒數,maximumPoolSize最大執行緒數。這個兩個引數跟hystrix執行緒池的coreSize和maximumSize含義是一致的。
  • workQueue任務等候佇列。跟hystrix不同,jdk執行緒池的等候佇列不是指定大小,而是需要使用方提供一個BlockingQueue。
  • handler當執行緒池無法接受任務時的處理器。hystrix是直接拒絕,jdk執行緒池可以定製。

可以看到,jdk的執行緒池使用起來更加靈活。配置引數的含義也十分清晰,沒有hystrx執行緒池裡面allowMaximumSizeToDivergeFromCoreSize、queueSizeRejectionThreshold這種奇奇怪怪讓人疑惑的引數。

關於jdk執行緒池的引數配置,參加如下jdk原始碼:


    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

那麼在跟hystrix執行緒池對應的引數配置下,jdk執行緒池的表現會怎樣呢?

corePoolSize = 2; maximumPoolSize = 5; workQueue = new ArrayBlockingQueue(10); handler = new ThreadPoolExecutor.DiscardPolicy()

這裡不再測試了,直接給出答案。執行緒池中常駐2個執行緒。新任務提交到執行緒池,2個執行緒中有空閒則直接執行,否則入隊等候。當2個執行緒都在工作且等待佇列中的任務數=10時,開始為新任務建立執行緒,直到執行緒數量為5,此時開始拒絕新任務。

相關邏輯涉及的原始碼貼在下面。值得一提的是,jdk執行緒池並不根據等候任務的數量來判斷等候佇列是否已滿,而是直接呼叫workQueue的offer方法,如果workQueue接受了那就入隊等候,否則執行拒絕策略。

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

可以看到hystrix執行緒池的配置引數跟jdk執行緒池是非常像的,從名字到含義,都基本一致。

為什麼

事實上hystrix的執行緒池,就是在jdk執行緒池的基礎上實現的。相關程式碼如下:


    public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
        final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);

        final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
        final int dynamicCoreSize = threadPoolProperties.coreSize().get();
        final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
        final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
        final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);

        if (allowMaximumSizeToDivergeFromCoreSize) {
            final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
            if (dynamicCoreSize > dynamicMaximumSize) {
                logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
                        dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ".  Maximum size will be set to " +
                        dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
                return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
            } else {
                return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
            }
        } else {
            return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
        }
    }

    public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
        /*
         * We are using SynchronousQueue if maxQueueSize <= 0 (meaning a queue is not wanted).
         * <p>
         * SynchronousQueue will do a handoff from calling thread to worker thread and not allow queuing which is what we want.
         * <p>
         * Queuing results in added latency and would only occur when the thread-pool is full at which point there are latency issues
         * and rejecting is the preferred solution.
         */
        if (maxQueueSize <= 0) {
            return new SynchronousQueue<Runnable>();
        } else {
            return new LinkedBlockingQueue<Runnable>(maxQueueSize);
        }
    }

既然hystrix執行緒池基於jdk執行緒池實現,為什麼在如下兩個基本一致的配置上,行為卻不一樣呢?

//hystrix
coreSize = 2; maximumSize = 5; maxQueueSize = 10

//jdk
corePoolSize = 2; maximumPoolSize = 5; workQueue = new ArrayBlockingQueue(10); handler = new ThreadPoolExecutor.DiscardPolicy()

jdk在佇列滿了之後會建立執行緒執行新任務直到執行緒數量達到maximumPoolSize,而hystrix在佇列滿了之後直接拒絕新任務,maximumSize這項配置成了擺設。

原因就在於hystrix判斷佇列是否滿是否要拒絕新任務,沒有通過jdk執行緒池在判斷,而是自己判斷的。參見如下hystrix原始碼:

    public boolean isQueueSpaceAvailable() {
        if (queueSize <= 0) {
            // we don't have a queue so we won't look for space but instead
            // let the thread-pool reject or not
            return true;
        } else {
            return threadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get();
        }
    }

    public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
        if (threadPool != null) {
            if (!threadPool.isQueueSpaceAvailable()) {
                throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
            }
        }
        return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action), delayTime, unit);
    }

可以看到hystrix在佇列大小達到maxQueueSize時,根本不會往底層的ThreadPoolExecutor提交任務。ThreadPoolExecutor也就沒有機會判斷workQueue能不能offer,更不能建立新的執行緒了。

怎麼辦

對用慣了jdk的ThreadPoolExecutor的人來說,再用hystrix的確容易出錯,筆者就曾在多個重要線上服務的程式碼裡看到過錯誤的配置,稱一聲危險的hystrix執行緒池不為過。

那怎麼辦呢?

配置的時候規避問題

同時配置maximumSize > coreSize,maxQueueSize > 0,像下面這樣,是不行了。

coreSize = 2; maximumSize = 5; maxQueueSize = 10

妥協一下,如果對延遲比較看重,配置maximumSize > coreSize,maxQueueSize = -1。這樣在任務多的時候,不會有等候佇列,直接建立新執行緒執行任務。

coreSize = 2; maximumSize = 5; maxQueueSize = -1

如果對資源比較看重, 不希望建立過多執行緒,配置maximumSize = coreSize,maxQueueSize > 0。這樣在任務多的時候,會進等候佇列,直到有執行緒空閒或者超時。

coreSize = 2; maximumSize = 5; maxQueueSize = -1

在hystrix上修復這個問題

技術上是可行的,有很多方案可以做到。但Netflix已經宣佈不再維護hystrix了,這條路也就不通了,除非維護自己的hystrix分支版本。

Reference

https://github.com/Netflix/Hystrix/wiki/Configuration
https://github.com/Netflix/Hystrix/issues/1589
https://github.com/Netflix/Hystrix/pull/1