1. 程式人生 > >Spring Boot Async異步執行

Spring Boot Async異步執行

system 策略 finall all handler rop 任務 trace stp

異步調用就是不用等待結果的返回就執行後面的邏輯,同步調用則需要等帶結果再執行後面的邏輯。

通常我們使用異步操作都會去創建一個線程執行一段邏輯,然後把這個線程丟到線程池中去執行,代碼如下:

ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.execute(() -> {
    try {
        // 業務邏輯
    } catch (Exception e) {
        e.printStackTrace();
    } finally
{ } });

這樣的方式看起來沒那麽優雅,盡管用了java的lambda。在Spring Boot中有一種更簡單的方式來執行異步操作,只需要一個@Async註解即可。

@Async
public void saveLog() {
    System.err.println(Thread.currentThread().getName());
}

我們可以直接在Controller中調用這個業務方法,它就是異步執行的,會在默認的線程池中去執行。需要註意的是一定要在外部的類中去調用這個方法,如果在本類調用是不起作用的,比如this.saveLog()。 最後在啟動類上開啟異步任務的執行,添加@EnableAsync即可。

另外關於執行異步任務的線程池我們也可以自定義,首先我們定義一個線程池的配置類,用來配置一些參數,具體代碼如下:

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;  

/**
 * 異步任務線程池配置
 * 
 * @author yinjihuan
 */
@Configuration
@ConfigurationProperties(prefix 
= "spring.task.pool") public class TaskThreadPoolConfig { //核心線程數 private int corePoolSize = 5; //最大線程數 private int maxPoolSize = 50; //線程池維護線程所允許的空閑時間 private int keepAliveSeconds = 60; //隊列長度 private int queueCapacity = 10000; //線程名稱前綴 private String threadNamePrefix = "FSH-AsyncTask-"; public String getThreadNamePrefix() { return threadNamePrefix; } public void setThreadNamePrefix(String threadNamePrefix) { this.threadNamePrefix = threadNamePrefix; } public int getCorePoolSize() { return corePoolSize; } public void setCorePoolSize(int corePoolSize) { this.corePoolSize = corePoolSize; } public int getMaxPoolSize() { return maxPoolSize; } public void setMaxPoolSize(int maxPoolSize) { this.maxPoolSize = maxPoolSize; } public int getKeepAliveSeconds() { return keepAliveSeconds; } public void setKeepAliveSeconds(int keepAliveSeconds) { this.keepAliveSeconds = keepAliveSeconds; } public int getQueueCapacity() { return queueCapacity; } public void setQueueCapacity(int queueCapacity) { this.queueCapacity = queueCapacity; } }

然後我們重新定義線程池的配置:

import java.lang.reflect.Method;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration  
public class AsyncTaskExecutePool implements AsyncConfigurer {    
    private Logger logger = LoggerFactory.getLogger(AsyncTaskExecutePool.class);

    @Autowired    
    private TaskThreadPoolConfig config;

    @Override  
    public Executor getAsyncExecutor() {  
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();  
        executor.setCorePoolSize(config.getCorePoolSize());    
        executor.setMaxPoolSize(config.getMaxPoolSize());    
        executor.setQueueCapacity(config.getQueueCapacity());    
        executor.setKeepAliveSeconds(config.getKeepAliveSeconds());    
        executor.setThreadNamePrefix(config.getThreadNamePrefix());
        //線程池對拒絕任務(無線程可用)的處理策略,目前只支持AbortPolicy、CallerRunsPolicy
        //AbortPolicy:直接拋出java.util.concurrent.RejectedExecutionException異常 -->
        //CallerRunsPolicy:主線程直接執行該任務,執行完之後嘗試添加下一個任務到線程池中,可以有效降低向線程池內添加任務的速度 -->
        //DiscardOldestPolicy:拋棄舊的任務、暫不支持;會導致被丟棄的任務無法再次被執行 -->
        //DiscardPolicy:拋棄當前任務、暫不支持;會導致被丟棄的任務無法再次被執行 -->
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());  
        executor.initialize();    
        return executor;    
    }  

    @Override  
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {// 異步任務中異常處理  
        return new AsyncUncaughtExceptionHandler() {  
            @Override  
            public void handleUncaughtException(Throwable arg0, Method arg1, Object... arg2) {  
                logger.error("=========================="+arg0.getMessage()+"=======================", arg0);  
                logger.error("exception method:" + arg1.getName());  
            }  
        };  
    }    
}  

配置完之後我們的異步任務執行的線程池就是我們自定義的了,我們可以通過在屬性文件裏面配置線程池的大小等等信息,也可以使用默認的配置:

spring.task.pool.maxPoolSize=100  

最後講下線程池配置的拒絕策略,當我們的線程數量高於線程池的處理速度時,任務會被緩存到本地的隊列中,隊列也是有大小的,如果超過了這個大小,我們需要有拒絕的策略,不然就會內存溢出了,目前支持2中拒絕策略:
- AbortPolicy: 直接拋出java.util.concurrent.RejectedExecutionException異常
- CallerRunsPolicy: 主線程直接執行該任務,執行完之後嘗試添加下一個任務到線程池中,可以有效降低向線程池內添加任務的速度

建議大家用CallerRunsPolicy策略,因為當隊列中的任務滿了之後,如果直接拋異常,那麽這個任務就會被丟棄,如果是CallerRunsPolicy策略會用主線程去執行,就是同步執行,最起碼這樣任務不會丟棄。

Spring Boot Async異步執行