1. 程式人生 > >Spring執行緒池配置模板設計(基於Springboot)

Spring執行緒池配置模板設計(基於Springboot)

目錄

執行緒池配置模板

springboot給我們提供了一個執行緒池的實現,它的底層是由我們傳統執行緒池ThreadPoolTaskExecutor來實現的。並對它進行了一些功能的增強,比如對執行緒狀態的監聽,在我們在使用的時候更加的方便。在這裡給各位同學一個配置模板,簡單的講解下Spring執行緒池的底層原理(在最後的原始碼章節)。

基礎的註解解釋

@Configuration:這是 Spring 3.0 新增的一個註解,用來代替 applicationContext.xml 配置檔案,所有這個配置檔案裡面能做到的事情都可以通過這個註解所在類來進行註冊。

@Bean:用來代替 XML 配置檔案裡面的 <bean ...> 配置。

常用配置引數

  • corePoolSize :執行緒池的核心池大小,在建立執行緒池之後,執行緒池預設沒有任何執行緒。

執行緒池建立之後,執行緒池中的執行緒數為0,當任務過來就會建立一個執行緒去執行,直到執行緒數達到corePoolSize 之後,就會被到達的任務放在佇列中。換句更精煉的話:corePoolSize 表示允許執行緒池中允許同時執行的最大執行緒數。

如果執行了執行緒池的prestartAllCoreThreads()方法,執行緒池會提前建立並啟動所有核心執行緒。

  • maximumPoolSize :執行緒池允許的最大執行緒數,他表示最大能建立多少個執行緒。maximumPoolSize肯定是大於等於corePoolSize。
  • keepAliveTime:表示執行緒沒有任務時最多保持多久然後停止。預設情況下,只有執行緒池中執行緒數大於corePoolSize 時,keepAliveTime 才會起作用。換句話說,當執行緒池中的執行緒數大於corePoolSize,並且一個執行緒空閒時間達到了keepAliveTime,那麼就是shutdown。如果配置了 allowCoreThreadTimeOut=true,那麼核心執行緒池也會參與到超時的計時中。
  • Unit:keepAliveTime 的單位。
  • workQueue :一個阻塞佇列,用來儲存等待執行的任務,當執行緒池中的執行緒數超過它的corePoolSize的時候,執行緒會進入阻塞佇列進行阻塞等待。通過workQueue,執行緒池實現了阻塞功能
  • threadFactory :執行緒工廠,用來建立執行緒。
  • handler :表示當拒絕處理任務時的策略。
    • AbortPolicy:丟棄任務並丟擲RejectedExecutionException
    • CallerRunsPolicy:只要執行緒池未關閉,該策略直接在呼叫者執行緒中,運行當前被丟棄的任務。顯然這樣做不會真的丟棄任務,但是,任務提交執行緒的效能極有可能會急劇下降。
    • DiscardOldestPolicy:丟棄佇列中最老的一個請求,也就是即將被執行的一個任務,並嘗試再次提交當前任務。
    • DiscardPolicy:丟棄任務,不做任何處理。
  • allowCoreThreadTimeOut:設定為true則執行緒池會回收核心執行緒池的執行緒,false則只會回收超過核心執行緒池的執行緒。預設為false。

配置類設計

這是博主自己寫的一個關於Springboot執行緒池的配置類,參考了一些文章的規範,可以直接使用。

@EnableAsync
@Configuration
public class LogThreadPoolConfig {

  @Bean(name = "logThreadPool")
  public ThreadPoolTaskExecutor LogThreadPoolTask() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    LogThreadPoolProperties properties = this.logThreadPoolProperties();

    executor.setCorePoolSize(properties.getCorePoolSize());
    executor.setMaxPoolSize(properties.getMaxPoolSize());
    executor.setQueueCapacity(properties.getQueueCapacity());
    executor.setKeepAliveSeconds(properties.getKeepAliveSeconds());
    executor.setThreadNamePrefix(properties.getThreadName());
    switch (properties.getRejectedExecutionHandler()) {
      case "abortPolicy":
        executor.setRejectedExecutionHandler(new AbortPolicy());
        break;
      case "callerRunsPolicy":
        executor.setRejectedExecutionHandler(new CallerRunsPolicy());
        break;
      case "discardOldestPolicy":
        executor.setRejectedExecutionHandler(new DiscardOldestPolicy());
        break;
      case "discardPolicy":
        executor.setRejectedExecutionHandler(new DiscardOldestPolicy());
        break;
      default:
        executor.setRejectedExecutionHandler(new CallerRunsPolicy());
        break;
    }
    executor.initialize();
    return executor;
  }


  @Bean
  @ConfigurationProperties(prefix = "threadpool.log")
  public LogThreadPoolProperties logThreadPoolProperties() {
    return new LogThreadPoolProperties();
  }


  //@Getter lombok提供的getset方法生成註解
  //@Setter
  @Configuration
  public static class LogThreadPoolProperties {

    /**
     * 執行緒字首名
     */
    private String threadName;
    /**
     * 核心執行緒池大小
     */
    private int corePoolSize;
    /**
     * 最大執行緒數
     */
    private int maxPoolSize;
    /**
     * 佇列大小
     */
    private int queueCapacity;
    /**
     * 執行緒池維護空閒執行緒存在時間
     */
    private int keepAliveSeconds;
    /**
     * 拒絕策略
     */
    private String rejectedExecutionHandler;

  }
}

這樣就可以在yml檔案中配置引數了:

threadpool:
  log:
    threadName: ThreadPool-log- # 執行緒池字首名
    corePoolSize: 8             # 核心執行緒池數:IO型推薦設定為cpu核心數*2;cpu型推薦設定為cpu數+1
    maxPoolSize: 16             # 最大執行緒池數
    queueCapacity: 1000         # 執行緒池阻塞佇列容量
    keepAliveSeconds: 60        # 允許執行緒空閒時間
    # 拒絕策略 abortPolicy callerRunsPolicy discardOldestPolicy discardPolicy
    rejectedExecutionHandler: callerRunsPolicy

執行緒池使用

Spring提供了註解方式來方便我們使用執行緒池,只需要在要非同步處理的方法上加 @Async("你配置的執行緒池名字")就可以了,注意這個類需要被spring掃描並納入管理,所以要加@Service、@Component等註解。

@Service
public class ServiceImpl implements Service {

  @Override
  @Async("logThreadPool")
  public void addOperationLog(BaseLog baseLog) {
    //你要非同步執行的邏輯
  }
}

具體的非同步效果可以自測一下

ThreadPoolTaskExecutor原始碼

springboot給我們提供了一個執行緒池的實現,它的底層是由我們傳統執行緒池ThreadPoolTaskExecutor來實現的。

public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
  private final Object poolSizeMonitor = new Object();
  private int corePoolSize = 1;
  private int maxPoolSize = 2147483647;
  private int keepAliveSeconds = 60;
  private int queueCapacity = 2147483647;
  private boolean allowCoreThreadTimeOut = false;
  private TaskDecorator taskDecorator;
  /**
   * 在這可以看到,其底層封裝了我們熟悉的threadPoolExecutor,這是JDK提供給我們的執行緒池實現
   */
  private ThreadPoolExecutor threadPoolExecutor;

  public ThreadPoolTaskExecutor() {
  }

  /**
   * 這些都是些get/set
   */
  public void setCorePoolSize(int corePoolSize) {
    Object var2 = this.poolSizeMonitor;
    synchronized(this.poolSizeMonitor) {
      this.corePoolSize = corePoolSize;
      if (this.threadPoolExecutor != null) {
        this.threadPoolExecutor.setCorePoolSize(corePoolSize);
      }

    }
  }
  public int getCorePoolSize() {
    Object var1 = this.poolSizeMonitor;
    synchronized(this.poolSizeMonitor) {
      return this.corePoolSize;
    }
  }
  public void setMaxPoolSize(int maxPoolSize) {
    Object var2 = this.poolSizeMonitor;
    synchronized(this.poolSizeMonitor) {
      this.maxPoolSize = maxPoolSize;
      if (this.threadPoolExecutor != null) {
        this.threadPoolExecutor.setMaximumPoolSize(maxPoolSize);
      }
    }
  }
  public int getMaxPoolSize() {
    Object var1 = this.poolSizeMonitor;
    synchronized(this.poolSizeMonitor) {
      return this.maxPoolSize;
    }
  }
  public void setKeepAliveSeconds(int keepAliveSeconds) {
    Object var2 = this.poolSizeMonitor;
    synchronized(this.poolSizeMonitor) {
      this.keepAliveSeconds = keepAliveSeconds;
      if (this.threadPoolExecutor != null) {
        this.threadPoolExecutor.setKeepAliveTime((long)keepAliveSeconds, TimeUnit.SECONDS);
      }
    }
  }
  public int getKeepAliveSeconds() {
    Object var1 = this.poolSizeMonitor;
    synchronized(this.poolSizeMonitor) {
      return this.keepAliveSeconds;
    }
  }
  public void setQueueCapacity(int queueCapacity) {
    this.queueCapacity = queueCapacity;
  }

  public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
    this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
  }

  public void setTaskDecorator(TaskDecorator taskDecorator) {
    this.taskDecorator = taskDecorator;
  }

  /**
   * 這是初始化方法,可以在這裡把JDK提供的ThreadPoolExecutor初始化了
   */
  protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
    BlockingQueue<Runnable> queue = this.createQueue(this.queueCapacity);
    ThreadPoolExecutor executor;
    if (this.taskDecorator != null) {
      executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, (long)this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler) {
        public void execute(Runnable command) {
          super.execute(ThreadPoolTaskExecutor.this.taskDecorator.decorate(command));
        }
      };
    } else {
      executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, (long)this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler);
    }

    if (this.allowCoreThreadTimeOut) {
      executor.allowCoreThreadTimeOut(true);
    }

    this.threadPoolExecutor = executor;
    return executor;
  }

  protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
    return (BlockingQueue)(queueCapacity > 0 ? new LinkedBlockingQueue(queueCapacity) : new SynchronousQueue());
  }

  public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException {
    Assert.state(this.threadPoolExecutor != null, "ThreadPoolTaskExecutor not initialized");
    return this.threadPoolExecutor;
  }

  public int getPoolSize() {
    return this.threadPoolExecutor == null ? this.corePoolSize : this.threadPoolExecutor.getPoolSize();
  }

  public int getActiveCount() {
    return this.threadPoolExecutor == null ? 0 : this.threadPoolExecutor.getActiveCount();
  }

  public void execute(Runnable task) {
    ThreadPoolExecutor executor = this.getThreadPoolExecutor();

    try {
      executor.execute(task);
    } catch (RejectedExecutionException var4) {
      throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, var4);
    }
  }

  public void execute(Runnable task, long startTimeout) {
    this.execute(task);
  }

  public Future<?> submit(Runnable task) {
    ThreadPoolExecutor executor = this.getThreadPoolExecutor();

    try {
      return executor.submit(task);
    } catch (RejectedExecutionException var4) {
      throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, var4);
    }
  }

  public <T> Future<T> submit(Callable<T> task) {
    ThreadPoolExecutor executor = this.getThreadPoolExecutor();

    try {
      return executor.submit(task);
    } catch (RejectedExecutionException var4) {
      throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, var4);
    }
  }

  //這些都是些Spring對執行緒池的功能增強,一般用不到
  public ListenableFuture<?> submitListenable(Runnable task) {
    ThreadPoolExecutor executor = this.getThreadPoolExecutor();

    try {
      ListenableFutureTask<Object> future = new ListenableFutureTask(task, (Object)null);
      executor.execute(future);
      return future;
    } catch (RejectedExecutionException var4) {
      throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, var4);
    }
  }

  public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
    ThreadPoolExecutor executor = this.getThreadPoolExecutor();

    try {
      ListenableFutureTask<T> future = new ListenableFutureTask(task);
      executor.execute(future);
      return future;
    } catch (RejectedExecutionException var4) {
      throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, var4);
    }
  }

  public boolean prefersShortLivedTasks() {
    return true;
  }
}

ThreadPoolTaskExecutor 繼承了 ExecutorConfigurationSupport,其實它主要是完成執行緒池的初始化的:

public abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory implements BeanNameAware, InitializingBean, DisposableBean {
  protected final Log logger = LogFactory.getLog(this.getClass());
  private ThreadFactory threadFactory = this;
  private boolean threadNamePrefixSet = false;
  private RejectedExecutionHandler rejectedExecutionHandler = new AbortPolicy();
  private boolean waitForTasksToCompleteOnShutdown = false;
  private int awaitTerminationSeconds = 0;
  private String beanName;
  private ExecutorService executor;

  public ExecutorConfigurationSupport() {
  }

  public void setThreadFactory(ThreadFactory threadFactory) {
    this.threadFactory = (ThreadFactory)(threadFactory != null ? threadFactory : this);
  }

  public void setThreadNamePrefix(String threadNamePrefix) {
    super.setThreadNamePrefix(threadNamePrefix);
    this.threadNamePrefixSet = true;
  }

  public void setRejectedExecutionHandler(RejectedExecutionHandler rejectedExecutionHandler) {
    this.rejectedExecutionHandler = (RejectedExecutionHandler)(rejectedExecutionHandler != null ? rejectedExecutionHandler : new AbortPolicy());
  }

  public void setWaitForTasksToCompleteOnShutdown(boolean waitForJobsToCompleteOnShutdown) {
    this.waitForTasksToCompleteOnShutdown = waitForJobsToCompleteOnShutdown;
  }

  public void setAwaitTerminationSeconds(int awaitTerminationSeconds) {
    this.awaitTerminationSeconds = awaitTerminationSeconds;
  }

  public void setBeanName(String name) {
    this.beanName = name;
  }

  /**
  * 這裡就是在bean初始化完後呼叫執行緒池的初始化方法生成執行緒池例項
  * 並被Spring容器管理
  */
  public void afterPropertiesSet() {
    this.initialize();
  }

  public void initialize() {
    if (this.logger.isInfoEnabled()) {
      this.logger.info("Initializing ExecutorService " + (this.beanName != null ? " '" + this.beanName + "'" : ""));
    }

    if (!this.threadNamePrefixSet && this.beanName != null) {
      this.setThreadNamePrefix(this.beanName + "-");
    }

    this.executor = this.initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
  }

  protected abstract ExecutorService initializeExecutor(ThreadFactory var1, RejectedExecutionHandler var2);

  public void destroy() {
    this.shutdown();
  }

  public void shutdown() {
    if (this.logger.isInfoEnabled()) {
      this.logger.info("Shutting down ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
    }

    if (this.waitForTasksToCompleteOnShutdown) {
      this.executor.shutdown();
    } else {
      this.executor.shutdownNow();
    }

    this.awaitTerminationIfNecessary();
  }

  private void awaitTerminationIfNecessary() {
    if (this.awaitTerminationSeconds > 0) {
      try {
        if (!this.executor.awaitTermination((long)this.awaitTerminationSeconds, TimeUnit.SECONDS) && this.logger.isWarnEnabled()) {
          this.logger.warn("Timed out while waiting for executor" + (this.beanName != null ? " '" + this.beanName + "'" : "") + " to terminate");
        }
      } catch (InterruptedException var2) {
        if (this.logger.isWarnEnabled()) {
          this.logger.warn("Interrupted while waiting for executor" + (this.beanName != null ? " '" + this.beanName + "'" : "") + " to terminate");
        }

        Thread.currentThread().interrupt();
      }
    }
  }
}

上述好多的引數其實都是JDK執行緒池需要的,具體他們的功能可以看執行緒池原始碼來了解它的作用。執行緒池原始碼解析