1. 程式人生 > >Spring boot非同步註解原始碼解析

Spring boot非同步註解原始碼解析

一、例子

我們先來看下面這個Demo。

pom.xml中maven依賴:

<parent>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-parent</artifactId>
	<version>1.5.14.RELEASE</version>
	<relativePath/> <!-- lookup parent from repository -->
</parent>

<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-web</artifactId>
	</dependency>

	<dependency>
		<groupId>org.projectlombok</groupId>
		<artifactId>lombok</artifactId>
		<version>1.16.18</version>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-test</artifactId>
		<scope>test</scope>
	</dependency>
</dependencies>

啟動類SpringBootAsyncApplication.java

@SpringBootApplication
@EnableAsync
public class SpringBootAsyncApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBootAsyncApplication.class, args);
    }

}

DemoController.java

@RestController
@RequestMapping(value = "/demos")
@Slf4j
public class DemoController {

    @Autowired
    private IDemoService demoService;

    @GetMapping("")
    public String test(@RequestParam String name) {
        long start = System.currentTimeMillis();
        log.info("start send. ThreadName: {}", Thread.currentThread().getName());
        demoService.send(name);
        long end = System.currentTimeMillis();
        log.info("send end, time: {}", (end - start));
        return "success";
    }

}

IDemoService介面實現類DemoServiceImpl.java

@Service
@Slf4j
public class DemoServiceImpl implements IDemoService {

    @Async
    @Override
    public void send(String name) {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("async name={}, ThreadName: {}", name, Thread.currentThread().getName());
    }

}

啟動專案後,訪問GET http://127.0.0.1:8002/demos?name=test,得到如下結果:

2019-04-12 17:17:32.462 INFO 12 --- [nio-8002-exec-1] c.l.d.s.controller.DemoController        : start send. ThreadName: http-nio-8002-exec-1
2019-04-12 17:17:32.466 INFO 12 --- [nio-8002-exec-1] .s.a.AnnotationAsyncExecutionInterceptor : No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either
2019-04-12 17:17:32.467 INFO 12 --- [nio-8002-exec-1] c.l.d.s.controller.DemoController        : send end, time: 5
2019-04-12 17:17:37.468 INFO 12 --- [cTaskExecutor-1] c.l.d.s.service.impl.DemoServiceImpl     : async name=test, ThreadName: SimpleAsyncTaskExecutor-1

通過控制檯日誌列印,我們可以看到有兩個執行緒,一個是主執行緒,一個是非同步的執行緒。沒等非同步的執行緒執行完,主執行緒就直接執行完畢,返回響應結果了,這就是非同步的效果。

二、結論

2.1 實現非同步方式

  • 開啟非同步配置,即在啟動類或者配置類上加@EnableAsync註解;
  • 在方法或類上加@Async註解。

2.2 @Async註解

  • 用@Async註解的方法,將使它在一個單獨的執行緒(例子中SimpleAsyncTaskExecutor-1執行緒)中執行,呼叫者不用等待被呼叫方法完成。
  • 用@Async註解的方法,必須只應用於public方法上(只有public修飾的方法才能被進行代理)。
  • @Async註解不能自呼叫,即在同一個類中呼叫非同步方法,否則不起作用(同一個類中呼叫方法的話會略過代理進行直接呼叫)。

三、疑問

為什麼會丟擲下面這段日誌?

[nio-8002-exec-1] .s.a.AnnotationAsyncExecutionInterceptor : No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either

沒有發現用於非同步處理的任務執行器,既沒有TaskExecutor型別的bean,也沒有名為“taskExecutor”的bean。什麼意思?而且非同步開啟的執行緒為啥字首是SimpleAsyncTaskExecutor?帶著這些疑問,我們深入到原始碼中,看看到底發生了什麼。

四、原始碼解析

spring-context-4.3.18.RELEASE.jar!org.springframework.scheduling.annotation.Async.java

/**
 * Annotation that marks a method as a candidate for <i>asynchronous</i> execution.
 * Can also be used at the type level, in which case all of the type's methods are
 * considered as asynchronous.
 *
 * <p>In terms of target method signatures, any parameter types are supported.
 * However, the return type is constrained to either {@code void} or
 * {@link java.util.concurrent.Future}. In the latter case, you may declare the
 * more specific {@link org.springframework.util.concurrent.ListenableFuture} or
 * {@link java.util.concurrent.CompletableFuture} types which allow for richer
 * interaction with the asynchronous task and for immediate composition with
 * further processing steps.
 *
 * <p>A {@code Future} handle returned from the proxy will be an actual asynchronous
 * {@code Future} that can be used to track the result of the asynchronous method
 * execution. However, since the target method needs to implement the same signature,
 * it will have to return a temporary {@code Future} handle that just passes a value
 * through: e.g. Spring's {@link AsyncResult}, EJB 3.1's {@link javax.ejb.AsyncResult},
 * or {@link java.util.concurrent.CompletableFuture#completedFuture(Object)}.
 *
 * @author Juergen Hoeller
 * @author Chris Beams
 * @since 3.0
 * @see AnnotationAsyncExecutionInterceptor
 * @see AsyncAnnotationAdvisor
 */
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Async {

	/**
     * 指定非同步操作的限定符值
	 * A qualifier value for the specified asynchronous operation(s).
	 * <p>May be used to determine the target executor to be used when executing this
	 * method, matching the qualifier value (or the bean name) of a specific
	 * {@link java.util.concurrent.Executor Executor} or
	 * {@link org.springframework.core.task.TaskExecutor TaskExecutor}
	 * bean definition.
	 * <p>When specified on a class level {@code @Async} annotation, indicates that the
	 * given executor should be used for all methods within the class. Method level use
	 * of {@code Async#value} always overrides any value set at the class level.
	 * @since 3.1.2
	 */
	String value() default "";

}

spring-context-4.3.18.RELEASE.jar!org.springframework.scheduling.annotation.EnableAsync.java

package org.springframework.scheduling.annotation;

import java.lang.annotation.Annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import org.springframework.context.annotation.AdviceMode;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.Ordered;

/**
 * Enables Spring's asynchronous method execution capability, similar to functionality
 * found in Spring's {@code <task:*>} XML namespace.
 *
 * <p>To be used together with @{@link Configuration Configuration} classes as follows,
 * enabling annotation-driven async processing for an entire Spring application context:
 *
 * <pre class="code">
 * &#064;Configuration
 * &#064;EnableAsync
 * public class AppConfig {
 *
 * }</pre>
 *
 * {@code MyAsyncBean} is a user-defined type with one or more methods annotated with
 * either Spring's {@code @Async} annotation, the EJB 3.1 {@code @javax.ejb.Asynchronous}
 * annotation, or any custom annotation specified via the {@link #annotation} attribute.
 * The aspect is added transparently for any registered bean, for instance via this
 * configuration:
 *
 * <pre class="code">
 * &#064;Configuration
 * public class AnotherAppConfig {
 *
 *     &#064;Bean
 *     public MyAsyncBean asyncBean() {
 *         return new MyAsyncBean();
 *     }
 * }</pre>
 *
 * <p>By default, Spring will be searching for an associated thread pool definition:
 * either a unique {@link org.springframework.core.task.TaskExecutor} bean in the context,
 * or an {@link java.util.concurrent.Executor} bean named "taskExecutor" otherwise. If
 * neither of the two is resolvable, a {@link org.springframework.core.task.SimpleAsyncTaskExecutor}
 * will be used to process async method invocations. Besides, annotated methods having a
 * {@code void} return type cannot transmit any exception back to the caller. By default,
 * such uncaught exceptions are only logged.
 *
 * <p>To customize all this, implement {@link AsyncConfigurer} and provide:
 * <ul>
 * <li>your own {@link java.util.concurrent.Executor Executor} through the
 * {@link AsyncConfigurer#getAsyncExecutor getAsyncExecutor()} method, and</li>
 * <li>your own {@link org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler
 * AsyncUncaughtExceptionHandler} through the {@link AsyncConfigurer#getAsyncUncaughtExceptionHandler
 * getAsyncUncaughtExceptionHandler()}
 * method.</li>
 * </ul>
 *
 * <pre class="code">
 * &#064;Configuration
 * &#064;EnableAsync
 * public class AppConfig implements AsyncConfigurer {
 *
 *     &#064;Override
 *     public Executor getAsyncExecutor() {
 *         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
 *         executor.setCorePoolSize(7);
 *         executor.setMaxPoolSize(42);
 *         executor.setQueueCapacity(11);
 *         executor.setThreadNamePrefix("MyExecutor-");
 *         executor.initialize();
 *         return executor;
 *     }
 *
 *     &#064;Override
 *     public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
 *         return MyAsyncUncaughtExceptionHandler();
 *     }
 * }</pre>
 *
 * <p>If only one item needs to be customized, {@code null} can be returned to
 * keep the default settings. Consider also extending from {@link AsyncConfigurerSupport}
 * when possible.
 *
 * <p>Note: In the above example the {@code ThreadPoolTaskExecutor} is not a fully managed
 * Spring bean. Add the {@code @Bean} annotation to the {@code getAsyncExecutor()} method
 * if you want a fully managed bean. In such circumstances it is no longer necessary to
 * manually call the {@code executor.initialize()} method as this will be invoked
 * automatically when the bean is initialized.
 *
 * <p>For reference, the example above can be compared to the following Spring XML
 * configuration:
 *
 * <pre class="code">
 * {@code
 * <beans>
 *
 *     <task:annotation-driven executor="myExecutor" exception-handler="exceptionHandler"/>
 *
 *     <task:executor id="myExecutor" pool-size="7-42" queue-capacity="11"/>
 *
 *     <bean id="asyncBean" class="com.foo.MyAsyncBean"/>
 *
 *     <bean id="exceptionHandler" class="com.foo.MyAsyncUncaughtExceptionHandler"/>
 *
 * </beans>
 * }</pre>
 *
 * The above XML-based and JavaConfig-based examples are equivalent except for the
 * setting of the <em>thread name prefix</em> of the {@code Executor}; this is because
 * the {@code <task:executor>} element does not expose such an attribute. This
 * demonstrates how the JavaConfig-based approach allows for maximum configurability
 * through direct access to actual componentry.
 *
 * <p>The {@link #mode} attribute controls how advice is applied: If the mode is
 * {@link AdviceMode#PROXY} (the default), then the other attributes control the behavior
 * of the proxying. Please note that proxy mode allows for interception of calls through
 * the proxy only; local calls within the same class cannot get intercepted that way.
 *
 * <p>Note that if the {@linkplain #mode} is set to {@link AdviceMode#ASPECTJ}, then the
 * value of the {@link #proxyTargetClass} attribute will be ignored. Note also that in
 * this case the {@code spring-aspects} module JAR must be present on the classpath, with
 * compile-time weaving or load-time weaving applying the aspect to the affected classes.
 * There is no proxy involved in such a scenario; local calls will be intercepted as well.
 *
 * @author Chris Beams
 * @author Juergen Hoeller
 * @author Stephane Nicoll
 * @author Sam Brannen
 * @since 3.1
 * @see Async
 * @see AsyncConfigurer
 * @see AsyncConfigurationSelector
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {

	/**
	 * Indicate the 'async' annotation type to be detected at either class
	 * or method level.
	 * <p>By default, both Spring's @{@link Async} annotation and the EJB 3.1
	 * {@code @javax.ejb.Asynchronous} annotation will be detected.
	 * <p>This attribute exists so that developers can provide their own
	 * custom annotation type to indicate that a method (or all methods of
	 * a given class) should be invoked asynchronously.
	 */
	Class<? extends Annotation> annotation() default Annotation.class;

	/**
	 * Indicate whether subclass-based (CGLIB) proxies are to be created as opposed
	 * to standard Java interface-based proxies.
	 * <p><strong>Applicable only if the {@link #mode} is set to {@link AdviceMode#PROXY}</strong>.
	 * <p>The default is {@code false}.
	 * <p>Note that setting this attribute to {@code true} will affect <em>all</em>
	 * Spring-managed beans requiring proxying, not just those marked with {@code @Async}.
	 * For example, other beans marked with Spring's {@code @Transactional} annotation
	 * will be upgraded to subclass proxying at the same time. This approach has no
	 * negative impact in practice unless one is explicitly expecting one type of proxy
	 * vs. another &mdash; for example, in tests.
	 */
	boolean proxyTargetClass() default false;

	/**
	 * Indicate how async advice should be applied.
	 * <p><b>The default is {@link AdviceMode#PROXY}.</b>
	 * Please note that proxy mode allows for interception of calls through the proxy
	 * only. Local calls within the same class cannot get intercepted that way; an
	 * {@link Async} annotation on such a method within a local call will be ignored
	 * since Spring's interceptor does not even kick in for such a runtime scenario.
	 * For a more advanced mode of interception, consider switching this to
	 * {@link AdviceMode#ASPECTJ}.
	 */
	AdviceMode mode() default AdviceMode.PROXY;

	/**
	 * Indicate the order in which the {@link AsyncAnnotationBeanPostProcessor}
	 * should be applied.
	 * <p>The default is {@link Ordered#LOWEST_PRECEDENCE} in order to run
	 * after all other post-processors, so that it can add an advisor to
	 * existing proxies rather than double-proxy.
	 */
	int order() default Ordered.LOWEST_PRECEDENCE;

}

@EnableAsync啟用Spring非同步方法執行功能,類似於Spring的<task:*>XML名稱空間,和@Configuration註解類一起使用,為整個Spring應用程式上下文啟用註釋驅動的非同步處理。

預設情況下,Spring將會搜尋一個關聯的執行緒池定義:要麼是一個在上下文中唯一的TaskExecutor型別的bean,要麼是一個名叫"taskExecutor"的Executor型別的bean;如果兩者都無法解決,SimpleAsyncTaskExecutor將用於處理非同步方法呼叫。此外,具有void返回型別的帶註釋的方法不能將任何異常傳回呼叫者。預設情況下,此類未捕獲異常只會被記錄下來。

為了定製所有這些,需要實現AsyncConfigurer類,並重寫AsyncConfigurer類中getAsyncExecutor()方法和getAsyncUncaughtExceptionHandler()方法。

@Configuration
@EnableAsync
public class AppConfig implements AsyncConfigurer {

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor= new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(7);
        executor.setMaxPoolSize(42);
        executor.setQueueCapacity(11);
        executor.setThreadNamePrefix("MyExecutor-");
        executor.initialize();
        return executor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return MyAsyncUncaughtExceptionHandler();
    }

}

如果只需要定製一個專案,則可以返回null以保持預設設定,如果可能的話還可以考慮從AsyncConfigurerSupport擴充套件。

注意:在上面的示例中,ThreadPoolTaskExecutor不是一個完全受管理Spring bean。如果你想要一個完全受管理的bean,可以將@Bean註解新增到getAsyncExecutor()方法上;在這種情況下,就沒有必要再手動呼叫executor.initialize()方法進行bean初始化了。

@Bean
@Override
public Executor getAsyncExecutor() {
	ThreadPoolTaskExecutor executor= new ThreadPoolTaskExecutor();
	executor.setCorePoolSize(7);
	executor.setMaxPoolSize(42);
	executor.setQueueCapacity(11);
	executor.setThreadNamePrefix("MyExecutor-");
	return executor;
}

為了便於參考,可以將上面的示例與下面的Spring XML配置進行比較:

<beans>
	<task:annotation-driven executor="myExecutor" exception-handler="exceptionHandler"/>

	<task:executor id="myExecutor" pool-size="7-42" queue-capacity="11"/>
 
	<bean id="asyncBean" class="com.foo.MyAsyncBean"/>
	
	<bean id="exceptionHandler" class="com.foo.MyAsyncUncaughtExceptionHandler"/>
</beans>

上述基於xml和基於Java配置的示例除了設定執行器的執行緒名字首外,其餘都是等價的,這是因為<task:executor>元素沒有暴露這樣的屬性。這演示了基於Java配置的方法如何通過直接訪問實際元件來實現最大的可配置性。

#mode屬性控制應用如何被通知:如果mode是AdviceMode#PROXY(預設值),那麼其他屬性控制代理的行為。請注意,代理模式只允許攔截通過代理進行的呼叫,同一類內的本地呼叫不能以這種方式被攔截。

注意,如果#mode被設定為AdviceMode#ASPECTJ,那麼#proxyTargetClass屬性值將被忽略。還要注意,在這種情況下,spring-aspects模組JAR必須出現在類路徑上,編譯時或載入時應用切面到受影響的類上。在這種情況下不涉及代理,本地呼叫也會被攔截。

上面這些都是EnableAsync類註釋,具體是如何實現的?且看下面。

有沒有發現,EnableAsync類唯一的核心註解就是@Import(AsyncConfigurationSelector.class),我們來看下它的原始碼:

spring-context-4.3.18.RELEASE.jar!org.springframework.scheduling.annotation.AsyncConfigurationSelector.java

/**
 * Selects which implementation of {@link AbstractAsyncConfiguration} should be used based
 * on the value of {@link EnableAsync#mode} on the importing {@code @Configuration} class.
 *
 * @author Chris Beams
 * @since 3.1
 * @see EnableAsync
 * @see ProxyAsyncConfiguration
 */
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {

	private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
			"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";

	/**
	 * {@inheritDoc}
	 * @return {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration} for
	 * {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()}, respectively
	 */
	@Override
	public String[] selectImports(AdviceMode adviceMode) {
		switch (adviceMode) {
			case PROXY:
				return new String[] { ProxyAsyncConfiguration.class.getName() };
			case ASPECTJ:
				return new String[] { ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME };
			default:
				return null;
		}
	}

}

根據匯入@Configuration類上EnableAsync#mode的值選擇AbstractAsyncConfiguration的哪個實現類應該被使用。

  • 預設配置PROXY,使用ProxyAsyncConfiguration。

spring-context-4.3.18.RELEASE.jar!org.springframework.scheduling.annotation.ProxyAsyncConfiguration.java

/**
 * {@code @Configuration} class that registers the Spring infrastructure beans necessary
 * to enable proxy-based asynchronous method execution.
 *
 * @author Chris Beams
 * @author Stephane Nicoll
 * @since 3.1
 * @see EnableAsync
 * @see AsyncConfigurationSelector
 */
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {

    /**
     * 定義了一個AsyncAnnotationBeanPostProcessor類bean
     */
	@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
		Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
        // 新建一個非同步註解bean後處理器
		AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
        // 獲取@EnableAsync中使用者自定義annotation
		Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
        // 如果不是預設註解,則設定非同步註解配置
		if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
			bpp.setAsyncAnnotationType(customAsyncAnnotation);
		}
        // 設定執行緒任務執行器
		if (this.executor != null) {
			bpp.setExecutor(this.executor);
		}
        // 設定異常處理器
		if (this.exceptionHandler != null) {
			bpp.setExceptionHandler(this.exceptionHandler);
		}
        // 設定是否升級到CGLIB子類代理,預設不開啟
		bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
        // 設定執行優先順序,預設最後執行
		bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
		return bpp;
	}

}

spring-context-4.3.18.RELEASE.jar!org.springframework.scheduling.annotation.AbstractAsyncConfiguration.java

/**
 * Abstract base {@code Configuration} class providing common structure for enabling
 * Spring's asynchronous method execution capability.
 *
 * @author Chris Beams
 * @author Stephane Nicoll
 * @since 3.1
 * @see EnableAsync
 */
@Configuration
public abstract class AbstractAsyncConfiguration implements ImportAware {
    // 註解屬性
	protected AnnotationAttributes enableAsync;
    // 執行緒任務執行器
	protected Executor executor;
    // 異常處理器
	protected AsyncUncaughtExceptionHandler exceptionHandler;


	@Override
	public void setImportMetadata(AnnotationMetadata importMetadata) {
		this.enableAsync = AnnotationAttributes.fromMap(
				importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));
		if (this.enableAsync == null) {
			throw new IllegalArgumentException(
					"@EnableAsync is not present on importing class " + importMetadata.getClassName());
		}
	}

	/**
	 * Collect any {@link AsyncConfigurer} beans through autowiring.
	 */
	@Autowired(required = false)
	void setConfigurers(Collection<AsyncConfigurer> configurers) {
		if (CollectionUtils.isEmpty(configurers)) {
			return;
		}
		if (configurers.size() > 1) {
			throw new IllegalStateException("Only one AsyncConfigurer may exist");
		}
		AsyncConfigurer configurer = configurers.iterator().next();
		this.executor = configurer.getAsyncExecutor();
		this.exceptionHandler = configurer.getAsyncUncaughtExceptionHandler();
	}

}

可以看到介面org.springframework.scheduling.annotation.AsyncConfigurer的唯一實現類org.springframework.scheduling.annotation.AsyncConfigurerSupport:

/**
 * A convenience {@link AsyncConfigurer} that implements all methods
 * so that the defaults are used. Provides a backward compatible alternative
 * of implementing {@link AsyncConfigurer} directly.
 *
 * @author Stephane Nicoll
 * @since 4.1
 */
public class AsyncConfigurerSupport implements AsyncConfigurer {

	@Override
	public Executor getAsyncExecutor() {
		return null;
	}

	@Override
	public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
		return null;
	}

}

這就是上面註釋講的,可以通過實現AsyncConfigurer介面實現預設執行緒池和異常處理的定製化。

回到ProxyAsyncConfiguration類,這是一個Configuration類,在其中通過@bean注入了AsyncAnnotationBeanPostProcessor 類。

AsyncAnnotationBeanPostProcessor這個類的Bean初始化時,重寫了BeanFactoryAware介面setBeanFactory方法,對AsyncAnnotationAdvisor非同步註解切面進行了構造。

@Override
public void setBeanFactory(BeanFactory beanFactory) {
	super.setBeanFactory(beanFactory);

	AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
	if (this.asyncAnnotationType != null) {
		advisor.setAsyncAnnotationType(this.asyncAnnotationType);
	}
	advisor.setBeanFactory(beanFactory);
	this.advisor = advisor;
}

它會在普通bean屬性之後、初始化回撥(如InitializingBean#afterPropertiesSet() 或者一個自定義初始化方法)之前被呼叫。

 

AsyncAnnotationBeanPostProcessor的後置bean處理是通過其父類AbstractAdvisingBeanPostProcessor來實現的,該類實現了BeanPostProcessor介面,重寫postProcessAfterInitialization方法。

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
	if (bean instanceof AopInfrastructureBean) {
		// Ignore AOP infrastructure such as scoped proxies.
		return bean;
	}

    //把Advisor新增進bean  ProxyFactory-》AdvisedSupport-》Advised
	if (bean instanceof Advised) {
		Advised advised = (Advised) bean;
		if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
			// Add our local Advisor to the existing proxy's Advisor chain...
			if (this.beforeExistingAdvisors) {
				advised.addAdvisor(0, this.advisor);
			}
			else {
				advised.addAdvisor(this.advisor);
			}
			return bean;
		}
	}
   
    //構造ProxyFactory代理工廠,新增代理的介面,設定切面,最後返回代理類:AopProxy
	if (isEligible(bean, beanName)) {
		ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
		if (!proxyFactory.isProxyTargetClass()) {
			evaluateProxyInterfaces(bean.getClass(), proxyFactory);
		}
		proxyFactory.addAdvisor(this.advisor);
		customizeProxyFactory(proxyFactory);
		return proxyFactory.getProxy(getProxyClassLoader());
	}

	// No async proxy needed.
	return bean;
}

JDK動態代理類JdkDynamicAopProxy實現AopProxy介面,最終執行的是InvocationHandler介面的invoke方法。

/**
 * Implementation of {@code InvocationHandler.invoke}.
 * <p>Callers will see exactly the exception thrown by the target,
 * unless a hook method throws an exception.
 */
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
	MethodInvocation invocation;
	Object oldProxy = null;
	boolean setProxyContext = false;

	TargetSource targetSource = this.advised.targetSource;
	Class<?> targetClass = null;
	Object target = null;

	try {
		if (!this.equalsDefined && AopUtils.isEqualsMethod(method)) {
			// The target does not implement the equals(Object) method itself.
			return equals(args[0]);
		}
		else if (!this.hashCodeDefined && AopUtils.isHashCodeMethod(method)) {
			// The target does not implement the hashCode() method itself.
			return hashCode();
		}
		else if (method.getDeclaringClass() == DecoratingProxy.class) {
			// There is only getDecoratedClass() declared -> dispatch to proxy config.
			return AopProxyUtils.ultimateTargetClass(this.advised);
		}
		else if (!this.advised.opaque && method.getDeclaringClass().isInterface() &&
				method.getDeclaringClass().isAssignableFrom(Advised.class)) {
			// Service invocations on ProxyConfig with the proxy config...
			return AopUtils.invokeJoinpointUsingReflection(this.advised, method, args);
		}

		Object retVal;

		if (this.advised.exposeProxy) {
			// Make invocation available if necessary.
			oldProxy = AopContext.setCurrentProxy(proxy);
			setProxyContext = true;
		}

		// May be null. Get as late as possible to minimize the time we "own" the target,
		// in case it comes from a pool.
		target = targetSource.getTarget();
		if (target != null) {
			targetClass = target.getClass();
		}

		// Get the interception chain for this method.
		List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);

		// Check whether we have any advice. If we don't, we can fallback on direct
		// reflective invocation of the target, and avoid creating a MethodInvocation.
		if (chain.isEmpty()) {
			// We can skip creating a MethodInvocation: just invoke the target directly
			// Note that the final invoker must be an InvokerInterceptor so we know it does
			// nothing but a reflective operation on the target, and no hot swapping or fancy proxying.
			Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
			retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);
		}
		else {
			// We need to create a method invocation...
			invocation = new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain);
			// Proceed to the joinpoint through the interceptor chain.
			retVal = invocation.proceed();
		}

		// Massage return value if necessary.
		Class<?> returnType = method.getReturnType();
		if (retVal != null && retVal == target &&
				returnType != Object.class && returnType.isInstance(proxy) &&
				!RawTargetAccess.class.isAssignableFrom(method.getDeclaringClass())) {
			// Special case: it returned "this" and the return type of the method
			// is type-compatible. Note that we can't help if the target sets
			// a reference to itself in another returned object.
			retVal = proxy;
		}
		else if (retVal == null && returnType != Void.TYPE && returnType.isPrimitive()) {
			throw new AopInvocationException(
					"Null return value from advice does not match primitive return type for: " + method);
		}
		return retVal;
	}
	finally {
		if (target != null && !targetSource.isStatic()) {
			// Must have come from TargetSource.
			targetSource.releaseTarget(target);
		}
		if (setProxyContext) {
			// Restore old proxy.
			AopContext.setCurrentProxy(oldProxy);
		}
	}
}

@Async註解的攔截器是AsyncExecutionInterceptor,它繼承了MethodInterceptor介面。而MethodInterceptor就是AOP規範中的Advice(切點的處理器)。chain不為空,執行第二個分支,構造ReflectiveMethodInvocation,然後執行proceed方法。

@Override
public Object proceed() throws Throwable {
	//	We start with an index of -1 and increment early.
	if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
		return invokeJoinpoint();
	}

	Object interceptorOrInterceptionAdvice =
			this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);
	if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {
		// Evaluate dynamic method matcher here: static part will already have
		// been evaluated and found to match.
		InterceptorAndDynamicMethodMatcher dm =
				(InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice;
		if (dm.methodMatcher.matches(this.method, this.targetClass, this.arguments)) {
			return dm.interceptor.invoke(this);
		}
		else {
			// Dynamic matching failed.
			// Skip this interceptor and invoke the next in the chain.
			return proceed();
		}
	}
	else {
		// It's an interceptor, so we just invoke it: The pointcut will have
		// been evaluated statically before this object was constructed.
		return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);
	}
}

核心方法是InterceptorAndDynamicMethodMatcher.interceptor.invoke(this),實際就是執行了AsyncExecutionInterceptor.invoke。

/**
 * Intercept the given method invocation, submit the actual calling of the method to
 * the correct task executor and return immediately to the caller.
 * @param invocation the method to intercept and make asynchronous
 * @return {@link Future} if the original method returns {@code Future}; {@code null}
 * otherwise.
 */
@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {
	Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
	Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
	final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

	AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
	if (executor == null) {
		throw new IllegalStateException(
				"No executor specified and no default executor set on AsyncExecutionInterceptor either");
	}

	Callable<Object> task = new Callable<Object>() {
		@Override
		public Object call() throws Exception {
			try {
				Object result = invocation.proceed();
				if (result instanceof Future) {
					return ((Future<?>) result).get();
				}
			}
			catch (ExecutionException ex) {
				handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
			}
			catch (Throwable ex) {
				handleError(ex, userDeclaredMethod, invocation.getArguments());
			}
			return null;
		}
	};

	return doSubmit(task, executor, invocation.getMethod().getReturnType());
}

spring-aop-4.3.18.RELEASE-sources.jar!/org/springframework/aop/interceptor/AsyncExecutionAspectSupport.java

/**
 * Determine the specific executor to use when executing the given method.
 * Should preferably return an {@link AsyncListenableTaskExecutor} implementation.
 * @return the executor to use (or {@code null}, but just if no default executor is available)
 */
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
	AsyncTaskExecutor executor = this.executors.get(method);
	if (executor == null) {
		Executor targetExecutor;
		String qualifier = getExecutorQualifier(method);
		if (StringUtils.hasLength(qualifier)) {
			targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
		}
		else {
			targetExecutor = this.defaultExecutor;
			if (targetExecutor == null) {
				synchronized (this.executors) {
					if (this.defaultExecutor == null) {
						this.defaultExecutor = getDefaultExecutor(this.beanFactory);
					}
					targetExecutor = this.defaultExecutor;
				}
			}
		}
		if (targetExecutor == null) {
			return null;
		}
		executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
				(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
		this.executors.put(method, executor);
	}
	return executor;
}

@Aync註解有個value可以標註使用哪個executor,這裡的getExecutorQualifier就是尋找這個標識。這裡如果defaultExecutor為null的話,則獲取找預設的executor。

/**
 * Retrieve or build a default executor for this advice instance.
 * An executor returned from here will be cached for further use.
 * <p>The default implementation searches for a unique {@link TaskExecutor} bean
 * in the context, or for an {@link Executor} bean named "taskExecutor" otherwise.
 * If neither of the two is resolvable, this implementation will return {@code null}.
 * @param beanFactory the BeanFactory to use for a default executor lookup
 * @return the default executor, or {@code null} if none available
 * @since 4.2.6
 * @see #findQualifiedExecutor(BeanFactory, String)
 * @see #DEFAULT_TASK_EXECUTOR_BEAN_NAME
 */
protected Executor getDefaultExecutor(BeanFactory beanFactory) {
	if (beanFactory != null) {
		try {
			// Search for TaskExecutor bean... not plain Executor since that would
			// match with ScheduledExecutorService as well, which is unusable for
			// our purposes here. TaskExecutor is more clearly designed for it.
			return beanFactory.getBean(TaskExecutor.class);
		}
		catch (NoUniqueBeanDefinitionException ex) {
			logger.debug("Could not find unique TaskExecutor bean", ex);
			try {
				return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
			}
			catch (NoSuchBeanDefinitionException ex2) {
				if (logger.isInfoEnabled()) {
					logger.info("More than one TaskExecutor bean found within the context, and none is named " +
							"'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +
							"as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());
				}
			}
		}
		catch (NoSuchBeanDefinitionException ex) {
			logger.debug("Could not find default TaskExecutor bean", ex);
			try {
				return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
			}
			catch (NoSuchBeanDefinitionException ex2) {
				logger.info("No task executor bean found for async processing: " +
						"no bean of type TaskExecutor and no bean named 'taskExecutor' either");
			}
			// Giving up -> either using local default executor or none at all...
		}
	}
	return null;
}

如果工程裡頭沒有定義預設的task executor的話,則獲取bean的時候會丟擲NoSuchBeanDefinitionException。這就是為什麼上面例子中會丟擲如下錯誤的原因。

[nio-8002-exec-1] .s.a.AnnotationAsyncExecutionInterceptor : No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either

spring-aop-4.3.18.RELEASE-sources.jar!/org/springframework/aop/interceptor/AsyncExecutionInterceptor.java

/**
 * This implementation searches for a unique {@link org.springframework.core.task.TaskExecutor}
 * bean in the context, or for an {@link Executor} bean named "taskExecutor" otherwise.
 * If neither of the two is resolvable (e.g. if no {@code BeanFactory} was configured at all),
 * this implementation falls back to a newly created {@link SimpleAsyncTaskExecutor} instance
 * for local use if no default could be found.
 * @see #DEFAULT_TASK_EXECUTOR_BEAN_NAME
 */
@Override
protected Executor getDefaultExecutor(BeanFactory beanFactory) {
	Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
	return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}

AsyncExecutionInterceptor重寫了getDefaultExecutor方法,先呼叫AsyncExecutionAspectSupport的getDefaultExecutor,如果預設的找不到,這裡new一個SimpleAsyncTaskExecutor。這也是為什麼上面例子中出現SimpleAsyncTaskExecutor執行緒字首的原因。

五、總結

整體流程大體可梳理為兩條線:

1.從註解開始:@EnableAsync--》ProxyAsyncConfiguration類構造一個bean(型別:AsyncAnnotationBeanPostProcessor)

2.從AsyncAnnotationBeanPostProcessor這個類的bean的生命週期走:AOP-Advisor切面初始化(setBeanFactory())--》AOP-生成代理類AopProxy(postProcessAfterInitialization())--》AOP-切點執行(InvocationHandler.invoke)


 

參考

https://segmentfault.com/a/1190000011339882

https://blog.csdn.net/qq_39470742/article/details/83382338

https://blog.csdn.net/fenglongmiao/article/details/82429460

https://www.bael