来自 技术 2019-04-17 00:00 的文章

Spring boot异步注解源码解析

八年Java开发的感悟:什么才是程序员的立身之本>>>   

一、例子

我们先来看下面这个Demo。

pom.xml中maven依赖:

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.5.14.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.18</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>

启动类SpringBootAsyncApplication.java

@SpringBootApplication@EnableAsyncpublic class SpringBootAsyncApplication { public static void main(String[] args) { SpringApplication.run(SpringBootAsyncApplication.class, args); }}

DemoController.java

@RestController@RequestMapping(value = "/demos")@Slf4jpublic class DemoController { @Autowired private IDemoService demoService; @GetMapping("") public String test(@RequestParam String name) { long start = System.currentTimeMillis(); log.info("start send. ThreadName: {}", Thread.currentThread().getName()); demoService.send(name); long end = System.currentTimeMillis(); log.info("send end, time: {}", (end - start)); return "success"; }}

IDemoService接口实现类DemoServiceImpl.java

@Service@Slf4jpublic class DemoServiceImpl implements IDemoService { @Async @Override public void send(String name) { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("async name={}, ThreadName: {}", name, Thread.currentThread().getName()); }}

启动项目后,访问GET http://127.0.0.1:8002/demos?name=test,得到如下结果:

2019-04-12 17:17:32.462 INFO 12 --- [nio-8002-exec-1] c.l.d.s.controller.DemoController : start send. ThreadName: http-nio-8002-exec-12019-04-12 17:17:32.466 INFO 12 --- [nio-8002-exec-1] .s.a.AnnotationAsyncExecutionInterceptor : No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either2019-04-12 17:17:32.467 INFO 12 --- [nio-8002-exec-1] c.l.d.s.controller.DemoController : send end, time: 52019-04-12 17:17:37.468 INFO 12 --- [cTaskExecutor-1] c.l.d.s.service.impl.DemoServiceImpl : async name=test, ThreadName: SimpleAsyncTaskExecutor-1

通过控制台日志打印,我们可以看到有两个线程,一个是主线程,一个是异步的线程。没等异步的线程执行完,主线程就直接执行完毕,返回响应结果了,这就是异步的效果。

二、结论 2.1 实现异步方式 开启异步配置,即在启动类或者配置类上加@EnableAsync注解; 在方法或类上加@Async注解。 2.2@Async注解 用@Async注解的方法,将使它在一个单独的线程(例子中SimpleAsyncTaskExecutor-1线程)中执行,调用者不用等待被调用方法完成。 用@Async注解的方法,必须只应用于public方法上(只有public修饰的方法才能被进行代理)。 @Async注解不能自调用,即在同一个类中调用异步方法,否则不起作用(同一个类中调用方法的话会略过代理进行直接调用)。 三、疑问

为什么会抛出下面这段日志?

[nio-8002-exec-1] .s.a.AnnotationAsyncExecutionInterceptor : No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either

没有发现用于异步处理的任务执行器,既没有TaskExecutor类型的bean,也没有名为“taskExecutor”的bean。什么意思?而且异步开启的线程为啥前缀是SimpleAsyncTaskExecutor?带着这些疑问,我们深入到源码中,看看到底发生了什么。

四、源码解析

spring-context-4.3.18.RELEASE.jar!org.springframework.scheduling.annotation.Async.java

/** * Annotation that marks a method as a candidate for <i>asynchronous</i> execution. * Can also be used at the type level, in which case all of the type's methods are * considered as asynchronous. * * <p>In terms of target method signatures, any parameter types are supported. * However, the return type is constrained to either {@code void} or * {@link java.util.concurrent.Future}. In the latter case, you may declare the * more specific {@link org.springframework.util.concurrent.ListenableFuture} or * {@link java.util.concurrent.CompletableFuture} types which allow for richer * interaction with the asynchronous task and for immediate composition with * further processing steps. * * <p>A {@code Future} handle returned from the proxy will be an actual asynchronous * {@code Future} that can be used to track the result of the asynchronous method * execution. However, since the target method needs to implement the same signature, * it will have to return a temporary {@code Future} handle that just passes a value * through: e.g. Spring's {@link AsyncResult}, EJB 3.1's {@link javax.ejb.AsyncResult}, * or {@link java.util.concurrent.CompletableFuture#completedFuture(Object)}. * * @author Juergen Hoeller * @author Chris Beams * @since 3.0 * @see AnnotationAsyncExecutionInterceptor * @see AsyncAnnotationAdvisor */@Target({ElementType.METHOD, ElementType.TYPE})@Retention(RetentionPolicy.RUNTIME)@Documentedpublic @interface Async {/** * 指定异步操作的限定符值 * A qualifier value for the specified asynchronous operation(s). * <p>May be used to determine the target executor to be used when executing this * method, matching the qualifier value (or the bean name) of a specific * {@link java.util.concurrent.Executor Executor} or * {@link org.springframework.core.task.TaskExecutor TaskExecutor} * bean definition. * <p>When specified on a class level {@code @Async} annotation, indicates that the * given executor should be used for all methods within the class. Method level use * of {@code Async#value} always overrides any value set at the class level. * @since 3.1.2 */String value() default "";}

spring-context-4.3.18.RELEASE.jar!org.springframework.scheduling.annotation.EnableAsync.java

package org.springframework.scheduling.annotation;import java.lang.annotation.Annotation;import java.lang.annotation.Documented;import java.lang.annotation.ElementType;import java.lang.annotation.Retention;import java.lang.annotation.RetentionPolicy;import java.lang.annotation.Target;import org.springframework.context.annotation.AdviceMode;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Import;import org.springframework.core.Ordered;/** * Enables Spring's asynchronous method execution capability, similar to functionality * found in Spring's {@code <task:*>} XML namespace. * * <p>To be used together with @{@link Configuration Configuration} classes as follows, * enabling annotation-driven async processing for an entire Spring application context: * * <pre class="code"> * &#064;Configuration * &#064;EnableAsync * public class AppConfig { * * }</pre> * * {@code MyAsyncBean} is a user-defined type with one or more methods annotated with * either Spring's {@code @Async} annotation, the EJB 3.1 {@code @javax.ejb.Asynchronous} * annotation, or any custom annotation specified via the {@link #annotation} attribute. * The aspect is added transparently for any registered bean, for instance via this * configuration: * * <pre class="code"> * &#064;Configuration * public class AnotherAppConfig { * * &#064;Bean * public MyAsyncBean asyncBean() { * return new MyAsyncBean(); * } * }</pre> * * <p>By default, Spring will be searching for an associated thread pool definition: * either a unique {@link org.springframework.core.task.TaskExecutor} bean in the context, * or an {@link java.util.concurrent.Executor} bean named "taskExecutor" otherwise. If * neither of the two is resolvable, a {@link org.springframework.core.task.SimpleAsyncTaskExecutor} * will be used to process async method invocations. Besides, annotated methods having a * {@code void} return type cannot transmit any exception back to the caller. By default, * such uncaught exceptions are only logged. * * <p>To customize all this, implement {@link AsyncConfigurer} and provide: * <ul> * <li>your own {@link java.util.concurrent.Executor Executor} through the * {@link AsyncConfigurer#getAsyncExecutor getAsyncExecutor()} method, and</li> * <li>your own {@link org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler * AsyncUncaughtExceptionHandler} through the {@link AsyncConfigurer#getAsyncUncaughtExceptionHandler * getAsyncUncaughtExceptionHandler()} * method.</li> * </ul> * * <pre class="code"> * &#064;Configuration * &#064;EnableAsync * public class AppConfig implements AsyncConfigurer { * * &#064;Override * public Executor getAsyncExecutor() { * ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); * executor.setCorePoolSize(7); * executor.setMaxPoolSize(42); * executor.setQueueCapacity(11); * executor.setThreadNamePrefix("MyExecutor-"); * executor.initialize(); * return executor; * } * * &#064;Override * public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { * return MyAsyncUncaughtExceptionHandler(); * } * }</pre> * * <p>If only one item needs to be customized, {@code null} can be returned to * keep the default settings. Consider also extending from {@link AsyncConfigurerSupport} * when possible. * * <p>Note: In the above example the {@code ThreadPoolTaskExecutor} is not a fully managed * Spring bean. Add the {@code @Bean} annotation to the {@code getAsyncExecutor()} method * if you want a fully managed bean. In such circumstances it is no longer necessary to * manually call the {@code executor.initialize()} method as this will be invoked * automatically when the bean is initialized. * * <p>For reference, the example above can be compared to the following Spring XML * configuration: * * <pre class="code"> * {@code * <beans> * * <task:annotation-driven executor="myExecutor" exception-handler="exceptionHandler"/> * * <task:executor id="myExecutor" pool-size="7-42" queue-capacity="11"/> * * <bean id="asyncBean" class="com.foo.MyAsyncBean"/> * * <bean id="exceptionHandler" class="com.foo.MyAsyncUncaughtExceptionHandler"/> * * </beans> * }</pre> * * The above XML-based and JavaConfig-based examples are equivalent except for the * setting of the <em>thread name prefix</em> of the {@code Executor}; this is because * the {@code <task:executor>} element does not expose such an attribute. This * demonstrates how the JavaConfig-based approach allows for maximum configurability * through direct access to actual componentry. * * <p>The {@link #mode} attribute controls how advice is applied: If the mode is * {@link AdviceMode#PROXY} (the default), then the other attributes control the behavior * of the proxying. Please note that proxy mode allows for interception of calls through * the proxy only; local calls within the same class cannot get intercepted that way. * * <p>Note that if the {@linkplain #mode} is set to {@link AdviceMode#ASPECTJ}, then the * value of the {@link #proxyTargetClass} attribute will be ignored. Note also that in * this case the {@code spring-aspects} module JAR must be present on the classpath, with * compile-time weaving or load-time weaving applying the aspect to the affected classes. * There is no proxy involved in such a scenario; local calls will be intercepted as well. * * @author Chris Beams * @author Juergen Hoeller * @author Stephane Nicoll * @author Sam Brannen * @since 3.1 * @see Async * @see AsyncConfigurer * @see AsyncConfigurationSelector */@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Import(AsyncConfigurationSelector.class)public @interface EnableAsync {/** * Indicate the 'async' annotation type to be detected at either class * or method level. * <p>By default, both Spring's @{@link Async} annotation and the EJB 3.1 * {@code @javax.ejb.Asynchronous} annotation will be detected. * <p>This attribute exists so that developers can provide their own * custom annotation type to indicate that a method (or all methods of * a given class) should be invoked asynchronously. */Class<? extends Annotation> annotation() default Annotation.class;/** * Indicate whether subclass-based (CGLIB) proxies are to be created as opposed * to standard Java interface-based proxies. * <p><strong>Applicable only if the {@link #mode} is set to {@link AdviceMode#PROXY}</strong>. * <p>The default is {@code false}. * <p>Note that setting this attribute to {@code true} will affect <em>all</em> * Spring-managed beans requiring proxying, not just those marked with {@code @Async}. * For example, other beans marked with Spring's {@code @Transactional} annotation * will be upgraded to subclass proxying at the same time. This approach has no * negative impact in practice unless one is explicitly expecting one type of proxy * vs. another &mdash; for example, in tests. */boolean proxyTargetClass() default false;/** * Indicate how async advice should be applied. * <p><b>The default is {@link AdviceMode#PROXY}.</b> * Please note that proxy mode allows for interception of calls through the proxy * only. Local calls within the same class cannot get intercepted that way; an * {@link Async} annotation on such a method within a local call will be ignored * since Spring's interceptor does not even kick in for such a runtime scenario. * For a more advanced mode of interception, consider switching this to * {@link AdviceMode#ASPECTJ}. */AdviceMode mode() default AdviceMode.PROXY;/** * Indicate the order in which the {@link AsyncAnnotationBeanPostProcessor} * should be applied. * <p>The default is {@link Ordered#LOWEST_PRECEDENCE} in order to run * after all other post-processors, so that it can add an advisor to * existing proxies rather than double-proxy. */int order() default Ordered.LOWEST_PRECEDENCE;}

@EnableAsync启用Spring异步方法执行功能,类似于Spring的<task:*>XML命名空间,和@Configuration注解类一起使用,为整个Spring应用程序上下文启用注释驱动的异步处理。

默认情况下,Spring将会搜索一个关联的线程池定义:要么是一个在上下文中唯一的TaskExecutor类型的bean,要么是一个名叫"taskExecutor"的Executor类型的bean;如果两者都无法解决,SimpleAsyncTaskExecutor将用于处理异步方法调用。此外,具有void返回类型的带注释的方法不能将任何异常传回调用者。默认情况下,此类未捕获异常只会被记录下来。

为了定制所有这些,需要实现AsyncConfigurer类,并重写AsyncConfigurer类中getAsyncExecutor()方法和getAsyncUncaughtExceptionHandler()方法。

@Configuration@EnableAsyncpublic class AppConfig implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor= new ThreadPoolTaskExecutor(); executor.setCorePoolSize(7); executor.setMaxPoolSize(42); executor.setQueueCapacity(11); executor.setThreadNamePrefix("MyExecutor-"); executor.initialize(); return executor; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return MyAsyncUncaughtExceptionHandler(); }}

如果只需要定制一个项目,则可以返回null以保持默认设置,如果可能的话还可以考虑从AsyncConfigurerSupport扩展。

注意:在上面的示例中,ThreadPoolTaskExecutor不是一个完全受管理Spring bean。如果你想要一个完全受管理的bean,可以将@Bean注解添加到getAsyncExecutor()方法上;在这种情况下,就没有必要再手动调用executor.initialize()方法进行bean初始化了。

@Bean@Overridepublic Executor getAsyncExecutor() {ThreadPoolTaskExecutor executor= new ThreadPoolTaskExecutor();executor.setCorePoolSize(7);executor.setMaxPoolSize(42);executor.setQueueCapacity(11);executor.setThreadNamePrefix("MyExecutor-");return executor;}

为了便于参考,可以将上面的示例与下面的Spring XML配置进行比较:

<beans><task:annotation-driven executor="myExecutor" exception-handler="exceptionHandler"/><task:executor id="myExecutor" pool-size="7-42" queue-capacity="11"/> <bean id="asyncBean" class="com.foo.MyAsyncBean"/><bean id="exceptionHandler" class="com.foo.MyAsyncUncaughtExceptionHandler"/></beans>

上述基于xml和基于Java配置的示例除了设置执行器的线程名前缀外,其余都是等价的,这是因为<task:executor>元素没有暴露这样的属性。这演示了基于Java配置的方法如何通过直接访问实际组件来实现最大的可配置性。

#mode属性控制应用如何被通知:如果mode是AdviceMode#PROXY(默认值),那么其他属性控制代理的行为。请注意,代理模式只允许拦截通过代理进行的调用,同一类内的本地调用不能以这种方式被拦截。

注意,如果#mode被设置为AdviceMode#ASPECTJ,那么#proxyTargetClass属性值将被忽略。还要注意,在这种情况下,spring-aspects模块JAR必须出现在类路径上,编译时或加载时应用切面到受影响的类上。在这种情况下不涉及代理,本地调用也会被拦截。

上面这些都是EnableAsync类注释,具体是如何实现的?且看下面。

有没有发现,EnableAsync类唯一的核心注解就是@Import(AsyncConfigurationSelector.class),我们来看下它的源码:

spring-context-4.3.18.RELEASE.jar!org.springframework.scheduling.annotation.AsyncConfigurationSelector.java

/** * Selects which implementation of {@link AbstractAsyncConfiguration} should be used based * on the value of {@link EnableAsync#mode} on the importing {@code @Configuration} class. * * @author Chris Beams * @since 3.1 * @see EnableAsync * @see ProxyAsyncConfiguration */public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME ="org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";/** * {@inheritDoc} * @return {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration} for * {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()}, respectively */@Overridepublic String[] selectImports(AdviceMode adviceMode) {switch (adviceMode) {case PROXY:return new String[] { ProxyAsyncConfiguration.class.getName() };case ASPECTJ:return new String[] { ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME };default:return null;}}}

根据导入@Configuration类上EnableAsync#mode的值选择AbstractAsyncConfiguration的哪个实现类应该被使用。

默认配置PROXY,使用ProxyAsyncConfiguration。

spring-context-4.3.18.RELEASE.jar!org.springframework.scheduling.annotation.ProxyAsyncConfiguration.java

/** * {@code @Configuration} class that registers the Spring infrastructure beans necessary * to enable proxy-based asynchronous method execution. * * @author Chris Beams * @author Stephane Nicoll * @since 3.1 * @see EnableAsync * @see AsyncConfigurationSelector */@Configuration@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration { /** * 定义了一个AsyncAnnotationBeanPostProcessor类bean */@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public AsyncAnnotationBeanPostProcessor asyncAdvisor() {Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected"); // 新建一个异步注解bean后处理器AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor(); // 获取@EnableAsync中用户自定义annotationClass<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation"); // 如果不是默认注解,则设置异步注解配置if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {bpp.setAsyncAnnotationType(customAsyncAnnotation);} // 设置线程任务执行器if (this.executor != null) {bpp.setExecutor(this.executor);} // 设置异常处理器if (this.exceptionHandler != null) {bpp.setExceptionHandler(this.exceptionHandler);} // 设置是否升级到CGLIB子类代理,默认不开启bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass")); // 设置执行优先级,默认最后执行bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));return bpp;}}

spring-context-4.3.18.RELEASE.jar!org.springframework.scheduling.annotation.AbstractAsyncConfiguration.java

/** * Abstract base {@code Configuration} class providing common structure for enabling * Spring's asynchronous method execution capability. * * @author Chris Beams * @author Stephane Nicoll * @since 3.1 * @see EnableAsync */@Configurationpublic abstract class AbstractAsyncConfiguration implements ImportAware { // 注解属性protected AnnotationAttributes enableAsync; // 线程任务执行器protected Executor executor; // 异常处理器protected AsyncUncaughtExceptionHandler exceptionHandler;@Overridepublic void setImportMetadata(AnnotationMetadata importMetadata) {this.enableAsync = AnnotationAttributes.fromMap(importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));if (this.enableAsync == null) {throw new IllegalArgumentException("@EnableAsync is not present on importing class " + importMetadata.getClassName());}}/** * Collect any {@link AsyncConfigurer} beans through autowiring. */@Autowired(required = false)void setConfigurers(Collection<AsyncConfigurer> configurers) {if (CollectionUtils.isEmpty(configurers)) {return;}if (configurers.size() > 1) {throw new IllegalStateException("Only one AsyncConfigurer may exist");}AsyncConfigurer configurer = configurers.iterator().next();this.executor = configurer.getAsyncExecutor();this.exceptionHandler = configurer.getAsyncUncaughtExceptionHandler();}}

可以看到接口org.springframework.scheduling.annotation.AsyncConfigurer的唯一实现类org.springframework.scheduling.annotation.AsyncConfigurerSupport:

/** * A convenience {@link AsyncConfigurer} that implements all methods * so that the defaults are used. Provides a backward compatible alternative * of implementing {@link AsyncConfigurer} directly. * * @author Stephane Nicoll * @since 4.1 */public class AsyncConfigurerSupport implements AsyncConfigurer {@Overridepublic Executor getAsyncExecutor() {return null;}@Overridepublic AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {return null;}}

这就是上面注释讲的,可以通过实现AsyncConfigurer接口实现默认线程池和异常处理的定制化。

回到ProxyAsyncConfiguration类,这是一个Configuration类,在其中通过@bean注入了AsyncAnnotationBeanPostProcessor 类。

AsyncAnnotationBeanPostProcessor这个类的Bean初始化时,重写了BeanFactoryAware接口setBeanFactory方法,对AsyncAnnotationAdvisor异步注解切面进行了构造。

@Overridepublic void setBeanFactory(BeanFactory beanFactory) {super.setBeanFactory(beanFactory);AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);if (this.asyncAnnotationType != null) {advisor.setAsyncAnnotationType(this.asyncAnnotationType);}advisor.setBeanFactory(beanFactory);this.advisor = advisor;}

它会在普通bean属性之后、初始化回调(如InitializingBean#afterPropertiesSet() 或者一个自定义初始化方法)之前被调用。

AsyncAnnotationBeanPostProcessor的后置bean处理是通过其父类AbstractAdvisingBeanPostProcessor来实现的,该类实现了BeanPostProcessor接口,重写postProcessAfterInitialization方法。

@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) {if (bean instanceof AopInfrastructureBean) {// Ignore AOP infrastructure such as scoped proxies.return bean;} //把Advisor添加进bean ProxyFactory-》AdvisedSupport-》Advisedif (bean instanceof Advised) {Advised advised = (Advised) bean;if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {// Add our local Advisor to the existing proxy's Advisor chain...if (this.beforeExistingAdvisors) {advised.addAdvisor(0, this.advisor);}else {advised.addAdvisor(this.advisor);}return bean;}} //构造ProxyFactory代理工厂,添加代理的接口,设置切面,最后返回代理类:AopProxyif (isEligible(bean, beanName)) {ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);if (!proxyFactory.isProxyTargetClass()) {evaluateProxyInterfaces(bean.getClass(), proxyFactory);}proxyFactory.addAdvisor(this.advisor);customizeProxyFactory(proxyFactory);return proxyFactory.getProxy(getProxyClassLoader());}// No async proxy needed.return bean;}

JDK动态代理类JdkDynamicAopProxy实现AopProxy接口,最终执行的是InvocationHandler接口的invoke方法。

/** * Implementation of {@code InvocationHandler.invoke}. * <p>Callers will see exactly the exception thrown by the target, * unless a hook method throws an exception. */@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {MethodInvocation invocation;Object oldProxy = null;boolean setProxyContext = false;TargetSource targetSource = this.advised.targetSource;Class<?> targetClass = null;Object target = null;try {if (!this.equalsDefined && AopUtils.isEqualsMethod(method)) {// The target does not implement the equals(Object) method itself.return equals(args[0]);}else if (!this.hashCodeDefined && AopUtils.isHashCodeMethod(method)) {// The target does not implement the hashCode() method itself.return hashCode();}else if (method.getDeclaringClass() == DecoratingProxy.class) {// There is only getDecoratedClass() declared -> dispatch to proxy config.return AopProxyUtils.ultimateTargetClass(this.advised);}else if (!this.advised.opaque && method.getDeclaringClass().isInterface() &&method.getDeclaringClass().isAssignableFrom(Advised.class)) {// Service invocations on ProxyConfig with the proxy config...return AopUtils.invokeJoinpointUsingReflection(this.advised, method, args);}Object retVal;if (this.advised.exposeProxy) {// Make invocation available if necessary.oldProxy = AopContext.setCurrentProxy(proxy);setProxyContext = true;}// May be null. Get as late as possible to minimize the time we "own" the target,// in case it comes from a pool.target = targetSource.getTarget();if (target != null) {targetClass = target.getClass();}// Get the interception chain for this method.List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);// Check whether we have any advice. If we don't, we can fallback on direct// reflective invocation of the target, and avoid creating a MethodInvocation.if (chain.isEmpty()) {// We can skip creating a MethodInvocation: just invoke the target directly// Note that the final invoker must be an InvokerInterceptor so we know it does// nothing but a reflective operation on the target, and no hot swapping or fancy proxying.Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);}else {// We need to create a method invocation...invocation = new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain);// Proceed to the joinpoint through the interceptor chain.retVal = invocation.proceed();}// Massage return value if necessary.Class<?> returnType = method.getReturnType();if (retVal != null && retVal == target &&returnType != Object.class && returnType.isInstance(proxy) &&!RawTargetAccess.class.isAssignableFrom(method.getDeclaringClass())) {// Special case: it returned "this" and the return type of the method// is type-compatible. Note that we can't help if the target sets// a reference to itself in another returned object.retVal = proxy;}else if (retVal == null && returnType != Void.TYPE && returnType.isPrimitive()) {throw new AopInvocationException("Null return value from advice does not match primitive return type for: " + method);}return retVal;}finally {if (target != null && !targetSource.isStatic()) {// Must have come from TargetSource.targetSource.releaseTarget(target);}if (setProxyContext) {// Restore old proxy.AopContext.setCurrentProxy(oldProxy);}}}

@Async注解的拦截器是AsyncExecutionInterceptor,它继承了MethodInterceptor接口。而MethodInterceptor就是AOP规范中的Advice(切点的处理器)。chain不为空,执行第二个分支,构造ReflectiveMethodInvocation,然后执行proceed方法。

@Overridepublic Object proceed() throws Throwable {//We start with an index of -1 and increment early.if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {return invokeJoinpoint();}Object interceptorOrInterceptionAdvice =this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {// Evaluate dynamic method matcher here: static part will already have// been evaluated and found to match.InterceptorAndDynamicMethodMatcher dm =(InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice;if (dm.methodMatcher.matches(this.method, this.targetClass, this.arguments)) {return dm.interceptor.invoke(this);}else {// Dynamic matching failed.// Skip this interceptor and invoke the next in the chain.return proceed();}}else {// It's an interceptor, so we just invoke it: The pointcut will have// been evaluated statically before this object was constructed.return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);}}

核心方法是InterceptorAndDynamicMethodMatcher.interceptor.invoke(this),实际就是执行了AsyncExecutionInterceptor.invoke。

/** * Intercept the given method invocation, submit the actual calling of the method to * the correct task executor and return immediately to the caller. * @param invocation the method to intercept and make asynchronous * @return {@link Future} if the original method returns {@code Future}; {@code null} * otherwise. */@Overridepublic Object invoke(final MethodInvocation invocation) throws Throwable {Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);if (executor == null) {throw new IllegalStateException("No executor specified and no default executor set on AsyncExecutionInterceptor either");}Callable<Object> task = new Callable<Object>() {@Overridepublic Object call() throws Exception {try {Object result = invocation.proceed();if (result instanceof Future) {return ((Future<?>) result).get();}}catch (ExecutionException ex) {handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());}catch (Throwable ex) {handleError(ex, userDeclaredMethod, invocation.getArguments());}return null;}};return doSubmit(task, executor, invocation.getMethod().getReturnType());}

spring-aop-4.3.18.RELEASE-sources.jar!/org/springframework/aop/interceptor/AsyncExecutionAspectSupport.java

/** * Determine the specific executor to use when executing the given method. * Should preferably return an {@link AsyncListenableTaskExecutor} implementation. * @return the executor to use (or {@code null}, but just if no default executor is available) */protected AsyncTaskExecutor determineAsyncExecutor(Method method) {AsyncTaskExecutor executor = this.executors.get(method);if (executor == null) {Executor targetExecutor;String qualifier = getExecutorQualifier(method);if (StringUtils.hasLength(qualifier)) {targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);}else {targetExecutor = this.defaultExecutor;if (targetExecutor == null) {synchronized (this.executors) {if (this.defaultExecutor == null) {this.defaultExecutor = getDefaultExecutor(this.beanFactory);}targetExecutor = this.defaultExecutor;}}}if (targetExecutor == null) {return null;}executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));this.executors.put(method, executor);}return executor;}

@Aync注解有个value可以标注使用哪个executor,这里的getExecutorQualifier就是寻找这个标识。这里如果defaultExecutor为null的话,则获取找默认的executor。

/** * Retrieve or build a default executor for this advice instance. * An executor returned from here will be cached for further use. * <p>The default implementation searches for a unique {@link TaskExecutor} bean * in the context, or for an {@link Executor} bean named "taskExecutor" otherwise. * If neither of the two is resolvable, this implementation will return {@code null}. * @param beanFactory the BeanFactory to use for a default executor lookup * @return the default executor, or {@code null} if none available * @since 4.2.6 * @see #findQualifiedExecutor(BeanFactory, String) * @see #DEFAULT_TASK_EXECUTOR_BEAN_NAME */protected Executor getDefaultExecutor(BeanFactory beanFactory) {if (beanFactory != null) {try {// Search for TaskExecutor bean... not plain Executor since that would// match with ScheduledExecutorService as well, which is unusable for// our purposes here. TaskExecutor is more clearly designed for it.return beanFactory.getBean(TaskExecutor.class);}catch (NoUniqueBeanDefinitionException ex) {logger.debug("Could not find unique TaskExecutor bean", ex);try {return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);}catch (NoSuchBeanDefinitionException ex2) {if (logger.isInfoEnabled()) {logger.info("More than one TaskExecutor bean found within the context, and none is named " +"'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +"as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());}}}catch (NoSuchBeanDefinitionException ex) {logger.debug("Could not find default TaskExecutor bean", ex);try {return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);}catch (NoSuchBeanDefinitionException ex2) {logger.info("No task executor bean found for async processing: " +"no bean of type TaskExecutor and no bean named 'taskExecutor' either");}// Giving up -> either using local default executor or none at all...}}return null;}

如果工程里头没有定义默认的task executor的话,则获取bean的时候会抛出NoSuchBeanDefinitionException。这就是为什么上面例子中会抛出如下错误的原因。

[nio-8002-exec-1] .s.a.AnnotationAsyncExecutionInterceptor : No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either

spring-aop-4.3.18.RELEASE-sources.jar!/org/springframework/aop/interceptor/AsyncExecutionInterceptor.java

/** * This implementation searches for a unique {@link org.springframework.core.task.TaskExecutor} * bean in the context, or for an {@link Executor} bean named "taskExecutor" otherwise. * If neither of the two is resolvable (e.g. if no {@code BeanFactory} was configured at all), * this implementation falls back to a newly created {@link SimpleAsyncTaskExecutor} instance * for local use if no default could be found. * @see #DEFAULT_TASK_EXECUTOR_BEAN_NAME */@Overrideprotected Executor getDefaultExecutor(BeanFactory beanFactory) {Executor defaultExecutor = super.getDefaultExecutor(beanFactory);return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());}

AsyncExecutionInterceptor重写了getDefaultExecutor方法,先调用AsyncExecutionAspectSupport的getDefaultExecutor,如果默认的找不到,这里new一个SimpleAsyncTaskExecutor。这也是为什么上面例子中出现SimpleAsyncTaskExecutor线程前缀的原因。

五、总结

整体流程大体可梳理为两条线:

1.从注解开始:@EnableAsync--》ProxyAsyncConfiguration类构造一个bean(类型:AsyncAnnotationBeanPostProcessor)

2.从AsyncAnnotationBeanPostProcessor这个类的bean的生命周期走:AOP-Advisor切面初始化(setBeanFactory())--》AOP-生成代理类AopProxy(postProcessAfterInitialization())--》AOP-切点执行(InvocationHandler.invoke)

参考

https://segmentfault.com/a/1190000011339882

https://blog.csdn.net/qq_39470742/article/details/83382338

https://blog.csdn.net/fenglongmiao/article/details/82429460

https://www.baeldung.com/spring-async