1. 程式人生 > >《SpringBoot從入門到放棄》之第(十三)篇——使用@Async非同步呼叫,ThreadPoolTaskScheduler執行緒池,使用Future以及定義超時

《SpringBoot從入門到放棄》之第(十三)篇——使用@Async非同步呼叫,ThreadPoolTaskScheduler執行緒池,使用Future以及定義超時

建立 TaskPoolConfig 類,配置執行緒池:

package com.test.util;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 執行緒池配置
 */
@Configuration
public class TaskPoolConfig {

    @Bean("myTaskExecutor")
    public Executor myTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);//核心執行緒數量,執行緒池建立時候初始化的執行緒數
        executor.setMaxPoolSize(15);//最大執行緒數,只有在緩衝佇列滿了之後才會申請超過核心執行緒數的執行緒
        executor.setQueueCapacity(200);//緩衝佇列,用來緩衝執行任務的佇列
        executor.setKeepAliveSeconds(60);//當超過了核心執行緒出之外的執行緒在空閒時間到達之後會被銷燬
        executor.setThreadNamePrefix("myTask-");//設定好了之後可以方便我們定位處理任務所在的執行緒池
        executor.setWaitForTasksToCompleteOnShutdown(true);//用來設定執行緒池關閉的時候等待所有任務都完成再繼續銷燬其他的Bean
        executor.setAwaitTerminationSeconds(60);//該方法用來設定執行緒池中任務的等待時間,如果超過這個時候還沒有銷燬就強制銷燬,以確保應用最後能夠被關閉,而不是阻塞住。
        //執行緒池對拒絕任務的處理策略:這裡採用了CallerRunsPolicy策略,當執行緒池沒有處理能力的時候,該策略會直接在 execute 方法的呼叫執行緒中執行被拒絕的任務;如果執行程式已關閉,則會丟棄該任務
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }
}

executor.setWaitForTasksToCompleteOnShutdown(true);//用來設定執行緒池關閉的時候等待所有任務都完成再繼續銷燬其他的Bean executor.setAwaitTerminationSeconds(60);//該方法用來設定執行緒池中任務的等待時間,如果超過這個時候還沒有銷燬就強制銷燬,以確保應用最後能夠被關閉,而不是阻塞住。

上面的兩個方法能安全的關閉執行緒池。

如何使用執行緒池呢?

很簡單,在 @Async 註解裡標註執行緒池的名稱即可:@Async("myTaskExecutor")

我們在每個方法裡都加上一句話,列印執行緒名:System.out.println("當前執行緒名:"+ Thread.currentThread());

package com.test.web;

import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Component;

import java.util.concurrent.Future;

@Component
public class LoadMessageTest {

    /**
     * 載入主頁
     *
     * @throws Exception
     */
    @Async("myTaskExecutor")
    public Future<Boolean> loadMainPage() throws Exception {
        System.out.println("開始載入主頁...");
        Thread.sleep(300);
        System.out.println("載入主頁共計耗時 0.3 秒。");
        System.out.println("當前執行緒名:"+ Thread.currentThread());
        return new AsyncResult<>(true);
    }

    /**
     * 載入個人中心
     *
     * @throws Exception
     */
    @Async("myTaskExecutor")
    public Future<Boolean> loadUserCenter() throws Exception {
        System.out.println("開始載入個人中心...");
        Thread.sleep(500);
        System.out.println("載入個人中心共計耗時 0.5 秒。");
        System.out.println("當前執行緒名:"+ Thread.currentThread());
        return new AsyncResult<>(true);
    }

    /**
     * 載入活動資訊
     *
     * @throws Exception
     */
    @Async("myTaskExecutor")
    public Future<Boolean> loadActivityInfo() throws Exception {
        System.out.println("開始載入活動資訊...");
        Thread.sleep(800);
        System.out.println("載入活動資訊共計耗時 0.8 秒。");
        System.out.println("當前執行緒名:"+ Thread.currentThread());
        return new AsyncResult<>(true);
    }
}

單元測試,可以看到執行緒名的字首都是我們配置的:

開始載入個人中心...
開始載入主頁...
開始載入活動資訊...
載入主頁共計耗時 0.3 秒。
當前執行緒名:Thread[myTask-1,5,main]
載入個人中心共計耗時 0.5 秒。
當前執行緒名:Thread[myTask-2,5,main]
載入活動資訊共計耗時 0.8 秒。
當前執行緒名:Thread[myTask-3,5,main]
載入網站資訊完畢,共計耗時:874 毫秒

瞭解Future:

Future是對於具體的 Runnable 或者 Callable 任務的執行結果進行取消、查詢是否完成、獲取結果的介面。必要時可以通過get方法獲取執行結果,該方法會阻塞直到任務返回結果。

Futrue介面定義如下:

package java.util.concurrent;
public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
}

cancel 方法用來取消任務,如果取消任務成功則返回true,如果取消任務失敗則返回false。引數mayInterruptIfRunning表示是否允許取消正在執行卻沒有執行完畢的任務,如果設定true,則表示可以取消正在執行過程中的任務。如果任務已經完成,則無論mayInterruptIfRunning為true還是false,此方法肯定返回false,即如果取消已經完成的任務會返回false;如果任務正在執行,若mayInterruptIfRunning設定為true,則返回true,若mayInterruptIfRunning設定為false,則返回false;如果任務還沒有執行,則無論mayInterruptIfRunning為true還是false,肯定返回true。

isCancelled 方法表示任務是否被取消成功,如果在任務正常完成前被取消成功,則返回 true。

isDone 方法表示任務是否已經完成,若任務完成,則返回true;

get() 方法用來獲取執行結果,這個方法會產生阻塞,會一直等到任務執行完畢才返回;

get(long timeout, TimeUnit unit) 用來獲取執行結果,如果在指定時間內,還沒獲取到結果,就直接返回null。

也就是說Future提供了三種功能:判斷任務是否完成;能夠中斷任務;能夠獲取任務執行結果。

我們測試一下第⑤個方法:

在類 LoadMessageTest 裡新增獲取使用者年齡的方法:

/**
     * 獲取使用者年齡
     * @return
     * @throws Exception
     */
    @Async("myTaskExecutor")
    public Future<Integer> loadUserAge()throws Exception{
        System.out.println("開始獲取使用者年齡...");
        Integer age =10;
        Thread.sleep(3000);//休眠3秒
        return new AsyncResult<>(age);
    }

測試類裡新增測試方法:

/**
	 * 測試獲取使用者年齡
	 * @throws Exception
	 */
	@Test
	public void getUserAgeTest()throws Exception{
		Future<Integer> task = loadMessageTest.loadUserAge();//獲取使用者年齡
		Integer age = task.get(2, TimeUnit.DAYS.SECONDS);//規定2秒內需要返回資料
		System.out.println("使用者年齡="+age);
	}

測試結果:超時報錯

我們修改測試方法,延長時間:

/**
	 * 測試獲取使用者年齡
	 * @throws Exception
	 */
	@Test
	public void getUserAgeTest()throws Exception{
		Future<Integer> task = loadMessageTest.loadUserAge();//獲取使用者年齡
		Integer age = task.get(5, TimeUnit.DAYS.SECONDS);//規定5秒內需要返回資料
		System.out.println("使用者年齡="+age);
	}

測試結果:

開始獲取使用者年齡...
使用者年齡=10