1. 程式人生 > >Spring Boot系列二 Spring @Async非同步執行緒池用法總結

Spring Boot系列二 Spring @Async非同步執行緒池用法總結

Spring非同步執行緒池的介面類,其實質是java.util.concurrent.Executor

Spring 已經實現的異常執行緒池: 
1. SimpleAsyncTaskExecutor:不是真的執行緒池,這個類不重用執行緒,每次呼叫都會建立一個新的執行緒。 
2. SyncTaskExecutor:這個類沒有實現非同步呼叫,只是一個同步操作。只適用於不需要多執行緒的地方
3. ConcurrentTaskExecutor:Executor的適配類,不推薦使用。如果ThreadPoolTaskExecutor不滿足要求時,才用考慮使用這個類 
4. SimpleThreadPoolTaskExecutor:是Quartz的SimpleThreadPool的類。執行緒池同時被quartz和非quartz使用,才需要使用此類 
5. ThreadPoolTaskExecutor :最常使用,推薦。 其實質是對java.util.concurrent.ThreadPoolExecutor的包裝

2. @Async

spring對過@Async定義非同步任務

非同步的方法有3種 
1. 最簡單的非同步呼叫,返回值為void 
2. 帶引數的非同步呼叫 非同步方法可以傳入引數 
3. 異常呼叫返回Future

詳細見程式碼:

@Component
public class AsyncDemo {
    private static final Logger log = LoggerFactory.getLogger(AsyncDemo.class);

    /**
     * 最簡單的非同步呼叫,返回值為void
     */
    @Async
    public void asyncInvokeSimplest
() { log.info("asyncSimplest"); } /** * 帶引數的非同步呼叫 非同步方法可以傳入引數 * * @param s */ @Async public void asyncInvokeWithParameter(String s) { log.info("asyncInvokeWithParameter, parementer={}", s); } /** * 異常呼叫返回Future * * @param i * @return
*/
@Async public Future<String> asyncInvokeReturnFuture(int i) { log.info("asyncInvokeReturnFuture, parementer={}", i); Future<String> future; try { Thread.sleep(1000 * 1); future = new AsyncResult<String>("success:" + i); } catch (InterruptedException e) { future = new AsyncResult<String>("error"); } return future; } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

以上的非同步方法和普通的方法呼叫相同

asyncDemo.asyncInvokeSimplest();
asyncDemo.asyncInvokeWithException("test");
Future<String> future = asyncDemo.asyncInvokeReturnFuture(100);
System.out.println(future.get());
  • 1
  • 2
  • 3
  • 4

3. Spring 開啟非同步配置

Spring有兩種方法啟動配置 
1. 註解 
2. XML

3.1 通過註解實現

要啟動異常方法還需要以下配置 
1. @EnableAsync 此註解開戶非同步呼叫功能 
2. public AsyncTaskExecutor taskExecutor() 方法自定義自己的執行緒池,執行緒池字首”Anno-Executor”。如果不定義,則使用系統預設的執行緒池。

@SpringBootApplication
@EnableAsync // 啟動非同步呼叫
public class AsyncApplicationWithAnnotation {
    private static final Logger log = LoggerFactory.getLogger(AsyncApplicationWithAnnotation.class);

    /**
     * 自定義非同步執行緒池
     * @return
     */
    @Bean
    public AsyncTaskExecutor taskExecutor() {  
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); 
        executor.setThreadNamePrefix("Anno-Executor");
        executor.setMaxPoolSize(10);  

        // 設定拒絕策略
        executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                // .....
            }
        });
        // 使用預定義的異常處理類
        // executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        return executor;  
    } 

    public static void main(String[] args) {
        log.info("Start AsyncApplication.. ");
        SpringApplication.run(AsyncApplicationWithAnnotation.class, args);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

以上的異常方法和普通的方法呼叫相同

@RunWith(SpringRunner.class)
@SpringBootTest(classes=AsyncApplicationWithAnnotation.class)
public class AsyncApplicationWithAnnotationTests {
    @Autowired
    private AsyncDemo asyncDemo;

    @Test
    public void contextLoads() throws InterruptedException, ExecutionException {
        asyncDemo.asyncInvokeSimplest();
        asyncDemo.asyncInvokeWithParameter("test");
        Future<String> future = asyncDemo.asyncInvokeReturnFuture(100);
        System.out.println(future.get());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

執行測試用例,輸出內容如下: 
可以看出主執行緒的名稱為main; 非同步方法則使用 Anno-Executor1,可見異常執行緒池起作用了

2017-03-28 20:00:07.731  INFO 5144 --- [ Anno-Executor1] c.hry.spring.async.annotation.AsyncDemo  : asyncSimplest
2017-03-28 20:00:07.732  INFO 5144 --- [ Anno-Executor1] c.hry.spring.async.annotation.AsyncDemo  : asyncInvokeWithParameter, parementer=test
2017-03-28 20:00:07.751  INFO 5144 --- [ Anno-Executor1] c.hry.spring.async.annotation.AsyncDemo  : asyncInvokeReturnFuture, parementer=100
success:100
2017-03-28 20:00:08.757  INFO 5144 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@47af7f3d: startup date [Tue Mar 28 20:00:06 CST 2017]; root of context hierarchy
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

3.2 通過XML實現

Bean檔案配置: spring_async.xml 
1. 執行緒的字首為xmlExecutor 
2. 啟動非同步執行緒池配置

    <!-- 等價於 @EnableAsync, executor指定執行緒池 -->
    <task:annotation-driven executor="xmlExecutor"/>
    <!-- id指定執行緒池產生執行緒名稱的字首 -->
    <task:executor
        id="xmlExecutor"
        pool-size="5-25"
        queue-capacity="100"
        keep-alive="120"
        rejection-policy="CALLER_RUNS"/>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

執行緒池引數說明 
1. ‘id’ : 執行緒的名稱的字首 
2. ‘pool-size’:執行緒池的大小。支援範圍”min-max”和固定值(此時執行緒池core和max sizes相同) 
3. ‘queue-capacity’ :排隊佇列長度 
○ The main idea is that when a task is submitted, the executor will first try to use a free thread if the number of active threads is currently less than the core size. 
○ If the core size has been reached, then the task will be added to the queue as long as its capacity has not yet been reached. 
○ Only then, if the queue’s capacity has been reached, will the executor create a new thread beyond the core size. 
○ If the max size has also been reached, then the executor will reject the task. 
○ By default, the queue is unbounded, but this is rarely the desired configuration because it can lead to OutOfMemoryErrors if enough tasks are added to that queue while all pool threads are busy. 
4. ‘rejection-policy’: 對拒絕的任務處理策略 
○ In the default ThreadPoolExecutor.AbortPolicy, the handler throws a runtime RejectedExecutionException upon rejection. 
○ In ThreadPoolExecutor.CallerRunsPolicy, the thread that invokes execute itself runs the task. This provides a simple feedback control mechanism that will slow down the rate that new tasks are submitted. 
○ In ThreadPoolExecutor.DiscardPolicy, a task that cannot be executed is simply dropped. 
○ In ThreadPoolExecutor.DiscardOldestPolicy, if the executor is not shut down, the task at the head of the work queue is dropped, and then execution is retried (which can fail again, causing this to be repeated.) 
5. ‘keep-alive’ : 執行緒保活時間(單位秒) 
setting determines the time limit (in seconds) for which threads may remain idle before being terminated. If there are more than the core number of threads currently in the pool, after waiting this amount of time without processing a task, excess threads will get terminated. A time value of zero will cause excess threads to terminate immediately after executing a task without remaining follow-up work in the task queue()

非同步執行緒池

@SpringBootApplication
@ImportResource("classpath:/async/spring_async.xml")
public class AsyncApplicationWithXML {
    private static final Logger log = LoggerFactory.getLogger(AsyncApplicationWithXML.class);

    public static void main(String[] args) {
        log.info("Start AsyncApplication.. ");
        SpringApplication.run(AsyncApplicationWithXML.class, args);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

測試用例

@RunWith(SpringRunner.class)
@SpringBootTest(classes=AsyncApplicationWithXML.class)
public class AsyncApplicationWithXMLTest {
    @Autowired
    private AsyncDemo asyncDemo;

    @Test
    public void contextLoads() throws InterruptedException, ExecutionException {
        asyncDemo.asyncInvokeSimplest();
        asyncDemo.asyncInvokeWithParameter("test");
        Future<String> future = asyncDemo.asyncInvokeReturnFuture(100);
        System.out.println(future.get());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

執行測試用例,輸出內容如下: 
可以看出主執行緒的名稱為main; 非同步方法則使用 xmlExecutor-x,可見異常執行緒池起作用了

2017-03-28 20:12:10.540  INFO 12948 --- [           main] c.h.s.a.xml.AsyncApplicationWithXMLTest  : Started AsyncApplicationWithXMLTest in 1.441 seconds (JVM running for 2.201)
2017-03-28 20:12:10.718  INFO 12948 --- [  xmlExecutor-2] com.hry.spring.async.xml.AsyncDemo       : asyncInvokeWithParameter, parementer=test
2017-03-28 20:12:10.721  INFO 12948 --- [  xmlExecutor-1] com.hry.spring.async.xml.AsyncDemo       : asyncSimplest
2017-03-28 20:12:10.722  INFO 12948 --- [  xmlExecutor-3] com.hry.spring.async.xml.AsyncDemo       : asyncInvokeReturnFuture, parementer=100
success:100
2017-03-28 20:12:11.729  INFO 12948 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@71809907: startup date [Tue Mar 28 20:12:09 CST 2017]; root of context hierarchy
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

4. 對非同步方法的異常處理

在呼叫方法時,可能出現方法中丟擲異常的情況。在非同步中主要有有兩種異常處理方法: 
1. 對於方法返回值是Futrue的非同步方法: a) 一種是在呼叫future的get時捕獲異常; b) 在異常方法中直接捕獲異常 
2. 對於返回值是void的非同步方法:通過AsyncUncaughtExceptionHandler處理異常

AsyncExceptionDemo:

@Component
public class AsyncExceptionDemo {
    private static final Logger log = LoggerFactory.getLogger(AsyncExceptionDemo.class);

    /**
     * 最簡單的非同步呼叫,返回值為void
     */
    @Async
    public void asyncInvokeSimplest() {
        log.info("asyncSimplest");
    }

    /**
     * 帶引數的非同步呼叫 非同步方法可以傳入引數
     *  對於返回值是void,異常會被AsyncUncaughtExceptionHandler處理掉
     * @param s
     */
    @Async
    public void asyncInvokeWithException(String s) {
        log.info("asyncInvokeWithParameter, parementer={}", s);
        throw new IllegalArgumentException(s);
    }

    /**
     * 異常呼叫返回Future
     *  對於返回值是Future,不會被AsyncUncaughtExceptionHandler處理,需要我們在方法中捕獲異常並處理
     *  或者在呼叫方在呼叫Futrue.get時捕獲異常進行處理
     * 
     * @param i
     * @return
     */
    @Async
    public Future<String> asyncInvokeReturnFuture(int i) {
        log.info("asyncInvokeReturnFuture, parementer={}", i);
        Future<String> future;
        try {
            Thread.sleep(1000 * 1);
            future = new AsyncResult<String>("success:" + i);
            throw new IllegalArgumentException("a");
        } catch (InterruptedException e) {
            future = new AsyncResult<String>("error");
        } catch(IllegalArgumentException e){
            future = new AsyncResult<String>("error-IllegalArgumentException");
        }
        return future;
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

實現AsyncConfigurer介面對異常執行緒池更加細粒度的控制 
a) 建立執行緒自己的執行緒池 
b) 對void方法丟擲的異常處理的類AsyncUncaughtExceptionHandler

/**
 * 通過實現AsyncConfigurer自定義異常執行緒池,包含異常處理
 * 
 * @author hry
 *
 */
@Service
public class MyAsyncConfigurer implements AsyncConfigurer{
    private static final Logger log = LoggerFactory.getLogger(MyAsyncConfigurer.class);

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();  
        threadPool.setCorePoolSize(1);  
        threadPool.setMaxPoolSize(1);  
        threadPool.setWaitForTasksToCompleteOnShutdown(true);  
        threadPool.setAwaitTerminationSeconds(60 * 15);  
        threadPool.setThreadNamePrefix("MyAsync-");
        threadPool.initialize();
        return threadPool;  
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
         return new MyAsyncExceptionHandler();  
    }

    /**
     * 自定義異常處理類
     * @author hry
     *
     */
    class MyAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {  

        @Override  
        public void handleUncaughtException(Throwable throwable, Method method, Object... obj) {  
            log.info("Exception message - " + throwable.getMessage());  
            log.info("Method name - " + method.getName());  
            for (Object param : obj) {  
                log.info("Parameter value - " + param);  
            }  
        }  

    } 

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
@SpringBootApplication
@EnableAsync // 啟動非同步呼叫
public class AsyncApplicationWithAsyncConfigurer {
    private static final Logger log = LoggerFactory.getLogger(AsyncApplicationWithAsyncConfigurer.class);

    public static void main(String[] args) {
        log.info("Start AsyncApplication.. ");
        SpringApplication.run(AsyncApplicationWithAsyncConfigurer.class, args);
    }


}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

測試程式碼

@RunWith(SpringRunner.class)
@SpringBootTest(classes=AsyncApplicationWithAsyncConfigurer.class)
public class AsyncApplicationWithAsyncConfigurerTests {
    @Autowired
    private AsyncExceptionDemo asyncDemo;

    @Test
    public void contextLoads() throws InterruptedException, ExecutionException {
        asyncDemo.asyncInvokeSimplest();
        asyncDemo.asyncInvokeWithException("test");
        Future<String> future = asyncDemo.asyncInvokeReturnFuture(100);
        System.out.println(future.get());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

執行測試用例 
MyAsyncConfigurer 捕獲AsyncExceptionDemo 物件在呼叫asyncInvokeWithException的異常

2017-04-02 16:01:45.591  INFO 11152 --- [      MyAsync-1] c.h.s.a.exception.AsyncExceptionDemo     : asyncSimplest
2017-04-02 16:01:45.605  INFO 11152 --- [      MyAsync-1] c.h.s.a.exception.AsyncExceptionDemo     : asyncInvokeWithParameter, parementer=test
2017-04-02 16:01:45.608  INFO 11152 --- [      MyAsync-1] c.h.s.async.exception.MyAsyncConfigurer  : Exception message - test
2017-04-02 16:01:45.608  INFO 11152 --- [      MyAsync-1] c.h.s.async.exception.MyAsyncConfigurer  : Method name - asyncInvokeWithException
2017-04-02 16:01:45.608  INFO 11152 --- [      MyAsync-1] c.h.s.async.exception.MyAsyncConfigurer  : Parameter value - test
2017-04-02 16:01:45.608  INFO 11152 --- [      MyAsync-1] c.h.s.a.exception.AsyncExceptionDemo     : asyncInvokeReturnFuture, parementer=100
error-IllegalArgumentException
2017-04-02 16:01:46.656  INFO 11152 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@47af7f3d: startup date [Sun Apr 02 16:01:44 CST 2017]; root of context hierarchy
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

5. 原始碼地址