当前位置:网站首页>Which thread pool does Async use?
Which thread pool does Async use?
2022-08-03 16:39:00 【Listen to the wind rain in the world】
前言
在Spring中我们经常会用到异步操作,注解中使用 @EnableAsync
和 @Async
就可以使用它了.但是最近发现在异步中线程号使用的是我们项目中自定义的线程池 ThreadPoolTaskExecutor
而不是之前熟悉的 SimpleAsyncTaskExecutor
那么来看一下他的执行过程吧.
正文
- 首先要使异步生效,我们得在启动类中加入
@EnableAsync
那么就点开它看看.它会使用@Import
注入一个AsyncConfigurationSelector
类,启动是通过父类可以决定它使用的是配置类ProxyAsyncConfiguration
.
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME = "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
public AsyncConfigurationSelector() {
}
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
switch(adviceMode) {
case PROXY:
return new String[]{
ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[]{
"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration"};
default:
return null;
}
}
}
- 点开能够看到注入一个
AsyncAnnotationBeanPostProcessor
.它实现了BeanPostProcessor
接口,因此它是一个后处理器,用于将Spring AOP
的Advisor
应用于给定的bean
.从而该bean
上通过异步注解所定义的方法在调用时会被真正地异步调用起来.
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
public ProxyAsyncConfiguration() {
}
@Bean(
name = {
"org.springframework.context.annotation.internalAsyncAnnotationProcessor"}
)
@Role(2)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
bpp.configure(this.executor, this.exceptionHandler);
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
bpp.setOrder((Integer)this.enableAsync.getNumber("order"));
return bpp;
}
}
AsyncAnnotationBeanPostProcessor
的父类实现了BeanFactoryAware
,那么会在AsyncAnnotationBeanPostProcessor
实例化之后回调setBeanFactory()
来实例化切面AsyncAnnotationAdvisor
.
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
//定义一个切面
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
AsyncAnnotationAdvisor
构造和声明切入的目标(切点)和代码增强(通知).
public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
asyncAnnotationTypes.add(Async.class);
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
}
catch (ClassNotFoundException ex) {
// If EJB 3.1 API not present, simply ignore.
}
//通知
this.advice = buildAdvice(executor, exceptionHandler);
//切入点
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
- 通知就是最终要执行的.
buildAdvice
用于构建通知,主要是创建一个AnnotationAsyncExecutionInterceptor
类型的拦截器,并且配置好使用的执行器和异常处理器.真正的异步执行的代码在AsyncExecutionAspectSupport
中!
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
//配置拦截器
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
- 配置拦截器,通过参数配置自定义的执行器和异常处理器或者使用默认的执行器和异常处理器.
public void configure(@Nullable Supplier<Executor> defaultExecutor,
@Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
//默认执行器
this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new);
}
getDefaultExecutor()
方法,用来查找默认的执行器,父类AsyncExecutionAspectSupport
首先寻找唯一一个类型为TaskExecutor
的执行器并返回,若存在多个则寻找默认的执行器taskExecutor
,若无法找到则直接返回null.子类AsyncExecutionInterceptor
重写getDefaultExecutor
方法,首先调用父类逻辑,返回null则配置一个名为SimpleAsyncTaskExecutor
的执行器
/** * 父类 * 获取或构建此通知实例的默认执行器 * 这里返回的执行器将被缓存以供后续使用 * 默认实现搜索唯一的TaskExecutor的bean * 在上下文中,用于名为“taskExecutor”的Executor bean. * 如果两者都不是可解析的,这个实现将返回 null */
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
if (beanFactory != null) {
try {
// 搜索唯一的一个TaskExecutor类型的bean并返回
return beanFactory.getBean(TaskExecutor.class);
}
catch (NoUniqueBeanDefinitionException ex) {
//找不到唯一一个bean异常后,搜索一个TaskExecutor类型的“taskExecutor”的bean并返回
logger.debug("Could not find unique TaskExecutor bean", ex);
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
if (logger.isInfoEnabled()) {
logger.info("More than one TaskExecutor bean found within the context, and none is named " +
"'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +
"as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDefinitionException ex) {
//未找到异常时搜索一个TaskExecutor类型的“taskExecutor”的bean并返回
logger.debug("Could not find default TaskExecutor bean", ex);
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
logger.info("No task executor bean found for async processing: " +
"no bean of type TaskExecutor and no bean named 'taskExecutor' either");
}
// Giving up -> either using local default executor or none at all...
}
}
return null;
}
/** * 子类 * 如父类为null则重新实例化一个名为SimpleAsyncTaskExecutor的执行器 */
@Override
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}
所以,到了这一步就可以理解为什么异步线程名默认叫 SimpleAsyncTaskExecutor-xx
,为什么有了自己的线程池有可能异步用到了自己的线程池配置.
我们有这个切入点之后,每次请求接口执行异步方法前都会执行 AsyncExecutionInterceptor#invoke()
, determineAsyncExecutor
用来决策使用哪个执行器
@Nullable
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
//在缓存的执行器中选择一个对应方法的执行器
AsyncTaskExecutor executor = (AsyncTaskExecutor)this.executors.get(method);
if (executor == null) {
//获取@Async注解中的value(指定的执行器)
String qualifier = this.getExecutorQualifier(method);
Executor targetExecutor;
if (StringUtils.hasLength(qualifier)) {
//获取指定执行器的bean
targetExecutor = this.findQualifiedExecutor(this.beanFactory, qualifier);
} else {
//选择默认的执行器
targetExecutor = (Executor)this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
executor = targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor)targetExecutor : new TaskExecutorAdapter(targetExecutor);
//将执行器进行缓存
this.executors.put(method, executor);
}
return (AsyncTaskExecutor)executor;
}
当有了执行器调用 doSubmit
方法将任务加入到执行器中.
异步任务,默认将采用SimpleAsyncTaskExecutor作为执行器!它有如下特点:
不复用线程,也就是说为每个任务新起一个线程.但是可以通过
concurrencyLimit
属性来控制并发线程数量,但是默认情况下不做限制(concurrencyLimit
取值为-1).
因此,如果我们使用异步任务,一定不能采用默认执行器的配置,以防OOM异常!最好的方式是指定执行器!
总结
本文主要以看源码的方式来了解异步注解 @Async
是如何在项目中选择线程以及使用线程的,尽量给异步任务指定一个独有线程池,这样会的避免不与其他业务共用线程池而造成影响.
边栏推荐
猜你喜欢
数据中台“集存通用治”功能场景说明
MPLS的wpn实验
[Deep Learning] Today's bug (August 2)
TiKV & TiFlash accelerate complex business queries丨TiFlash application practice
Introduction to the advantages of the new generation mesh network protocol T-Mesh wireless communication technology
DAYU200 OpenHarmony标准系统HDMI全屏显示
如何选择合适的导电滑环型号
C专家编程 第3章 分析C语言的声明 3.7 typedef struct foo{... foo;}的含义
TiKV & TiFlash 加速复杂业务查询丨TiFlash 应用实践
设置海思芯片MMZ内存、OS内存详解
随机推荐
Understand the recommendation system in one article: Outline 02: The link of the recommendation system, from recalling rough sorting, to fine sorting, to rearranging, and finally showing the recommend
Interpretation of the 2021 Cost of Data Breach Report
C专家编程 第3章 分析C语言的声明 3.7 typedef struct foo{... foo;}的含义
纯纯粹粹纯纯粹粹
使用 PowerShell 将 Windows 转发事件导入 SQL Server
如何选择合适的导电滑环型号
[Unity Getting Started Plan] Basic Concepts (6) - Sprite Renderer Sprite Renderer
Component communication - parent-child component communication
ORACLE CLOUD 在国内有数据中心吗?
自动化部署+整合SSM项目
C专家编程 第2章 这不是Bug,而是语言特性 2.2 多做之过
How to start an NFT collection
Small Tools (4) integrated Seata1.5.2 distributed transactions
元宇宙系列--Value creation in the metaverse
MPLS的wpn实验
【翻译】关于扩容一个百万级别用户系统的六个课程
QT QT 】 【 to have developed a good program for packaging into a dynamic library
《社会企业开展应聘文职人员培训规范》团体标准在新华书店上架
C语言03、数组
Web3 安全风险令人生畏?应该如何应对?