Spring | Async注解

1. Async注解的用法

1.1初级用法

1.在启动类或者配置类加上@EnableSync;

2.在需要异步执行方法或者类上加@Async注解;

3.另外一个方法调用该方法即可;

简单Demo代码如下所示:

//启动类
@SpringBootApplication
@EnableAspectJAutoProxy
@EnableAsync
public class SpringBootWithDockerStarter {
    public static void main(String[] args) {
        SpringApplication.run(SpringBootWithDockerStarter.class, args);
    }
}
//需要异步执行的方法
@Slf4j
@Service
public class BeanE {
    @Async
    public void asyncMethod(){
        log.info("BeanE + asyncMethod");
    }
}

@RestController
@Slf4j
public class BeanController {

    @Autowired
    private BeanE beanE;

    @RequestMapping("/test")
    public void test() throws InterruptedException, ExecutionException {
        beanE.asyncMethod();
    }
}

服务启动后调用/test接口即可实现异步调用asyncMethod。

1.2中级用法

a). 该Async注解不建议使用默认的线程池(后文分析原因),开发者要使用该注解时需要自定义线程池,自定义的方式有两种:

  • 直接加在注解上;
  • 实现AsyncConfigurer#getAsyncExecutor方法,配置Async注解全局默认的线程池;
//第一种
@Slf4j
@Service
public class BeanE {
    @Async("myExecutor") //前提是加一个myExecutor的线程池到Spring容器中
    public void asyncMethod(){
        log.info("BeanE + asyncMethod");
    }
}

//第二种
@Configuration
@Slf4j
public class AsyncConfig implements AsyncConfigurer {

    @Bean
    public Executor asyncExecutor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        executor.setKeepAliveSeconds(30);
        executor.setQueueCapacity(10);
        executor.setThreadNamePrefix("my-async-");
        return executor;
    }

    @Override
    public Executor getAsyncExecutor() {
        return asyncExecutor();
    }
}

Async注解选择线程池是优先选择第一种,毕竟第二种是默认的线程池,如果该默认线程池被开发者自定义了那么Spring的默认线程池是不会初始化的。

b). 配置Async注解全局默认的异常处理:实现AsyncConfigurer#getAsyncUncaughtExceptionHandler方法

该异常处理主要是用于在使用@Async执行对应的方法时报出异常,如果配置了该异常处理器会优先执行该处理器的逻辑,配置方式如下:

@Configuration
public class AsyncConfig implements AsyncConfigurer {
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        log.info("getAsyncUncaughtExceptionHandler");
        return  (Throwable ex, Method method, Object... params) -> {
            log.info("customer handle when handle async");
            log.info("Method: {}", method);
            throw new NullPointerException();
        } ;
    }
}

建议大家在异常处理器中不要捕获异常,因为有时候需要根据异常处理一些逻辑,当然也提倡大家捕获A异常然后抛出我们预知的B异常。

1.3高级用法

1.3.1 同类中的方法调用

同一个Bean中存在A, B两个方法,B方法加了@Async需要异步执行,在A方法中调用的B方法。

    public void testInternalClassCall(){
        log.info("BeanE + testInternalClassCall");
        asyncMethod();
    }

    @Async
    public void asyncMethod(){
        log.info("BeanE + asyncMethod");
    }

如果只是上述的调用方式的话,asyncMethod方法不能实现异步执行,想要异步执行的话需要注入当前BeanE,如下:

@Service
public class BeanE {
    @Autowired
    @Lazy
    private BeanE beanE;

    public void testInternalClassCall(){
        log.info("BeanE + testInternalClassCall");
        beanE.asyncMethod();
    }

    @Async
    public void asyncMethod(){
        log.info("BeanE + asyncMethod");
    }
}

上述的方式是可以实现异步执行asyncMethod方法,但是你会发现我注入了当前Bean,也就是自己注入自己,实际上这里就构成了循环依赖,并且在注入BeanE的时候加了@Lazy注解,如果不加该注解启动会报循环依赖的错误(原因见后文), 另外我是用beanE去调用该方法的。

1.3.2异步执行带返回值

调用带返回值的异步方法,@Async支持四种返回类型:(源码见AsyncExecutionAspectSupport#doSubmit)

  • CompletableFuture
  • ListenableFuture
  • Future
  • Void

前三者使用的是带返回值的,实际上基本类似,只是执行task的方式换了。

//将返回值的类型写入CompletableFuture
@Async
public CompletableFuture<String> asyncMethodByCompletableFuture(){
    log.info("BeanF + asyncMethodByCompletableFuture");
    try {
        Thread.sleep(3000L);
    }catch (Exception e){

    }
    //将返回的类型进行包装
    return CompletableFuture.completedFuture("Hello CompletableFuture");
}

@Async
public Future<String> asyncMethod() throws InterruptedException {
    log.info("BeanF + asyncMethod");
    return new AsyncResult<>("Hello asyncMethod");
}

//在调用的时候需要调用get方法,该方法会阻塞主线程,等异步任务执行完才会继续后续的执行
public void test() throws InterruptedException, ExecutionException {
    Future<String> result = beanF.asyncMethod();
    log.info("test .....");
    String value = result.get();

    CompletableFuture<String> stringFuture = beanF.asyncMethodByCompletableFuture();
    String stringValue = stringFuture.get();
}

1.4提出问题

对于@Async的用法,根据我自己的使用,我提出了下列这么多问题,带着问题去分析:

1.为什么加了@EnableAsync注解才能生效?

2.Spring是可以解决循环依赖的,为什么使用@Async注解发生循环依赖会启动失败?

3.如果异步执行的方法存在切面时,会使用异步的方式执行切面逻辑吗?

4.为啥在1.3中要使用beanE去调用asyncMethod方法才能生效。

2.Async注解的原理

2.1Async注解加载流程

@EnableAsync作用是在Spring加载过程中将AsyncConfigurationSelector添加到容器中,其主要是通过@Import(AsyncConfigurationSelector.class)方式来实现,这个配置类可以选择使用某种代理方式,默认是JDK代理,因此返回结果是ProxyAsyncConfiguration这个配置Bean的全路径名,这一过程实现了将ProxyAsyncConfiguration配置类添加到容器中。在该配置类中有且仅有一个个Bean(org.springframework.context.annotation.internalAsyncAnnotationProcessor), 这个Bean是为Async注解生成代理的后处理器,代码如下:

public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
		Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
		AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
		//此处可用于配置Async注解的线程池和异常处理
        bpp.configure(this.executor, this.exceptionHandler);
		Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
		if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
			bpp.setAsyncAnnotationType(customAsyncAnnotation);
		}
		bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
		bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
		return bpp;
	}

Note:

@Configuration注解没有提供自动注入Bean的功能。要想在配置类中自动注入Bean,需要使用@ComponentScan注解或者@Import注解导入其配置类,在其配置类中定义Bean,就可以实现自动注入。

换句话说,就是在启动时候由于路径问题没有扫到该路径下的配置类,这时候就需要手动使用@Import注解导入或者使用@ComponentScan注解扫描整个路径下的配置类

在其他的Bean加载过程中,会执行internalAsyncAnnotationProcessor后处理器,执行处理器的时候会检查该Bean中的方法或者该类是否有@Async注解,如果存在即为该Bean生成代理对象(Async注解只会在该后处理器阶段生成代理对象,即使发生循环依赖)。当然如果在为Async注解创建代理前已经生成了代理对象,这时该后处理器会判断如果该Bean是代理对象,只会把Async注解的切面加在所有的切面最前面,这也就意味着该切面是最先执行的。

public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
   @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
   @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
   public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
      Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
      //实例化这个对象时执行setBeforeExistingAdvisors(true) 这让该切面会加在所有的切面前面
      AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
      //配置默认线程池和异常处理器
      bpp.configure(this.executor, this.exceptionHandler);
      Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
      if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
         bpp.setAsyncAnnotationType(customAsyncAnnotation);
      }
      bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
      //值很大 优先级很低
      bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
      return bpp;
   }

}

//在实例化该后处理器的时候执行了set方法
public AsyncAnnotationBeanPostProcessor() {
		setBeforeExistingAdvisors(true);
}


//该后处理器的初始化后的逻辑
public Object postProcessAfterInitialization(Object bean, String beanName) {
		if (this.advisor == null || bean instanceof AopInfrastructureBean) {
			// Ignore AOP infrastructure such as scoped proxies.
			return bean;
		}
		if (bean instanceof Advised) {
			Advised advised = (Advised) bean;
			if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
				// Add our local Advisor to the existing proxy's Advisor chain...
				//此处就用到了上述构造函数中的set方法的变量,将Async注解切面加在已经存在的切面前面
                if (this.beforeExistingAdvisors) {
					advised.addAdvisor(0, this.advisor);
				}
				else {
					advised.addAdvisor(this.advisor);
				}
				return bean;
			}
		}
    //此处省略后续逻辑
}

整个加载流程不是很复杂,主要就是一个后处理器全部搞定,接下来解释为何不能解决@Async注解的循环依赖,对于想熟悉SpringBean的循环依赖可以点击此处链接

BeanE和BeanF发生循环依赖,且两个Bean中都存在@Async注解,假设BeanE先加载:

  1. BeanE在属性填充时发现需要BeanF,此时BeanE在之前已经将BeanE添加到第三级缓存;

  2. 加载BeanF,BeanF属性填充时发现需要BeanE,这时第三级缓存中存在用于获取BeanE对象的lambda;

  3. 执行该lambda表达式两种情况:

    • 存在切面(此时不会为代理对象添加Async注解的切面) 返回代理对象(记为ProxyBeanE)注入到BeanF中,并将该代理对象放入第二级缓存;

    • 不存在切面 返回BeanE注入到BeanF中,并将BeanE放入第二级缓存;

  4. BeanF填充好BeanE后,执行Async注解的后处理逻辑创建代理对象,创建完成后放入第一级缓存(单例池);

  5. 这时BeanF已经加载完毕,注入到BeanE中,这时不管BeanE有没有生成代理,执行后续的初始化逻辑始终是BeanE而不是代理对象;

  6. 在执行BeanE的后处理器逻辑时,两种情况:

    • 前期生成代理对象:由于在执行Async注解的后处理器中拿到的是BeanE对象(第五步结尾有说明),所以又会为BeanE生成代理,此时的代理对象仅包含Async注解的切面,这时的代理对象和第三步中虽然都是代理对象但是是不同对象;
    • 前期未生成理对象:后处理器为Async注解生成代理对象,这时候返回的对象是代理对象,可是第二级缓存中已经有了一个BeanE对象;
  7. 无论那种情况都会导致同一个BeanE存在两个Bean,所以会报错;

a). 在第三步的第一种情况中,当发生循环依赖是不会为Async注解添加切面到代理对象中,debug图示如下: image-20230310134838596

由于在发生循环依赖时只会执行实现了SmartInstantiationAwareBeanPostProcessor接口的处理器,而Async注解的后处理器AsyncAnnotationBeanPostProcessor没有实现该接口,所以不会提前执行该后处理器的逻辑创建代理。

b).第六步中的第一种情况,当代理对象生成了为啥还会生成代理对象?那为啥使用Before,After等注解不会重新生成?

    //在发生循环依赖时,为Before After等注解提前创建代理对象时会往earlyProxyReferences塞入非代理Bean
	public Object getEarlyBeanReference(Object bean, String beanName) {
		Object cacheKey = getCacheKey(bean.getClass(), beanName);
		this.earlyProxyReferences.put(cacheKey, bean);
		return wrapIfNecessary(bean, beanName, cacheKey);
	}

    //初始化后后处理逻辑为Before After等注解生成代理
	public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) {
		if (bean != null) {
			Object cacheKey = getCacheKey(bean.getClass(), beanName);
            //会对之前的生成过的Bean进行判断,如果之前生成了就直接返回bean(非代理)
			if (this.earlyProxyReferences.remove(cacheKey) != bean) {
				return wrapIfNecessary(bean, beanName, cacheKey);
			}
		}
		return bean;
	}

上述earlyProxyReferences这个map可以防止重复创建代理对象,但是对于AsyncAnnotationBeanPostProcessor没有该逻辑判断,只有对Bean对象的类型判断,自然会重新创建一个新的代理对象返回了。

c).报错的地方,代码如下:

//允许循环依赖
if (earlySingletonExposure) {
    //先从第一级缓存获取,没有再从第二级缓存获取,不涉及第三级缓存
	Object earlySingletonReference = getSingleton(beanName, false);
		if (earlySingletonReference != null) {
            //从第一级或第二级缓存中拿到Bean对象说明之前已经创建了,这是需要替换(主要是为了替换成代理对象)
			if (exposedObject == bean) {
					exposedObject = earlySingletonReference;
				}
				else if (!this.allowRawInjectionDespiteWrapping && hasDependentBean(beanName)) {
					String[] dependentBeans = getDependentBeans(beanName);
					Set<String> actualDependentBeans = new LinkedHashSet<>(dependentBeans.length);
					for (String dependentBean : dependentBeans) {
						if (!removeSingletonIfCreatedForTypeCheckOnly(dependentBean)) {
							actualDependentBeans.add(dependentBean);
						}
					}
					if (!actualDependentBeans.isEmpty()) {
						throw new BeanCurrentlyInCreationException(beanName,
								"Bean with name '" + beanName + "' has been injected into other beans [" +
								StringUtils.collectionToCommaDelimitedString(actualDependentBeans) + ...");
					}
				}
			}
		}

对于Async报错就是在exposedObject == bean条件时,不管之前有没有创建代理对象,执行Async注解的后处理器后exposedObject一定是代理对象,而Bean是非代理对象,因此走到最后异常。Before After等注解没有问题是因为他们的后处理器会检查已经创建过了就会返回非代理Bean,那exposedObject == bean肯定为true,这时就把缓存中的代理对象赋值给exposedObject 执行后续逻辑。

d).正常流程中,Async后处理如果处理已经存在的代理对象?

正常流程中,Before After等后处理器是返回代理对象的,但是由于前期没有发生循环依赖,在执行getSingleton(beanName, false);时是没有值返回的,因此会跳过该异常逻辑。

2.2Async注解执行流程

使用代理对象在调用方法时,最先进入的切面是Async注解的切面,该切面通知做了三件事:1.获取需要执行的方法;2获取线程池;3提交执行该方法后续切面逻辑这个任务到线程池;

Async注解有一套自己选择线程池的方法,源码见:AsyncExecutionAspectSupport#determineAsyncExecutor

由于@Async注解本身就可以配置自己的线程池,例如:@Async(“myExecutor”),还可以配置全局默认的线程池,如果上述都没有的话那就是使用Spring自己配置的线程池(不知道为啥Spring把这参数设置的这么离谱,这么高估我们的服务器性能)。如下图所示,maxPoolSize和queueCapacity。另外这是Spring2.1以后的默认线程池配置,在Spring2.1之前使用的是SimpleAsyncTaskExecutor,这尼玛也很离谱,每次都会创建一个新的线程去执行,创建了个寂寞线程池。

image-20230312162009251

在加载流程提到Async注解切面会加在Before After等注解的切面前面,那么在执行时,也会用异步的方式执行切面的逻辑,因此切面里面存在ThreadLocal或者Request对象这些会被异步执行所影响的对象时,要注意规避风险。

3.Async注解总结

3.1流程总结:

1.@EnableAsync加载启动类上,最终用于将ProxyAsyncConfiguration加到Spring容器中;

2.初始化ProxyAsyncConfiguration中的AsyncAnnotationBeanPostProcessor后处理器,该后处理器为@Async生成代理对象;

3.该后处理器在自己的初始化时调用setBeanFactory方法完成切面填充;

4.在Spring的Bean加载过程检查该Bean或者Bean中方法是否加@Async:(该代理只会在后置处理器生成,即使发生循环也不会提前创建)

  • 已经有切面为该对象生成代理对象,Async注解的切面逻辑加在所有的切面逻辑最前面;
  • 该对象还是一个普通Bean,根据对应的条件生成JDK代理对象或是CGLIB代理对象;

5.@Async的切面:AsyncAnnotationAdvisor, 该切面也是一个低级Advisor切面,@Async的通知AnnotationAsyncExecutionInterceptor,该通知也是一个环绕通知。当发生异步方法调用时,最先执行的是Async注解的切面,该切面内会获取相应的线程池,然后使用在该线程池中调用Callable任务完成异步调用。

3.2用法安全事项:

  1. 不要使用默认的线程池,要自己配置线程池;
  2. 在异步方法中会丢弃主线程中ThreadLocal、request等数据;
  3. 由于@Async切面注解最先执行,所以如果该异步方法中存在其他的通知逻辑,请注意ThreadLocal、request数据丢失是否影响执行过程;
  4. 注意和事务注解结合使用;

最后回答一下1.4节提出的四个问题,前三个问题在上述已经回答了,这里着重回答一下第四个问题。直接调用是相当于使用this方式调用,但是在代理对象执行的过程中,调用目标方法都是使用原生的Bean(非代理),所以this就是原生的bean对象。该对象在调用方法时是直接调用的,不会执行切面逻辑,自然不会实现异步调用。那如果想实现异步调用的话就得使用代理对象,这是注入的对象就是代理对象,所以可以实现异步调用。