当前位置:网站首页>Which thread pool does Async use?
Which thread pool does Async use?
2022-08-03 16:39:00 【Listen to the wind rain in the world】
前言
在Spring中我们经常会用到异步操作,注解中使用 @EnableAsync
和 @Async
就可以使用它了.但是最近发现在异步中线程号使用的是我们项目中自定义的线程池 ThreadPoolTaskExecutor
而不是之前熟悉的 SimpleAsyncTaskExecutor
那么来看一下他的执行过程吧.
正文
- 首先要使异步生效,我们得在启动类中加入
@EnableAsync
那么就点开它看看.它会使用@Import
注入一个AsyncConfigurationSelector
类,启动是通过父类可以决定它使用的是配置类ProxyAsyncConfiguration
.
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME = "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
public AsyncConfigurationSelector() {
}
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
switch(adviceMode) {
case PROXY:
return new String[]{
ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[]{
"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration"};
default:
return null;
}
}
}
- 点开能够看到注入一个
AsyncAnnotationBeanPostProcessor
.它实现了BeanPostProcessor
接口,因此它是一个后处理器,用于将Spring AOP
的Advisor
应用于给定的bean
.从而该bean
上通过异步注解所定义的方法在调用时会被真正地异步调用起来.
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
public ProxyAsyncConfiguration() {
}
@Bean(
name = {
"org.springframework.context.annotation.internalAsyncAnnotationProcessor"}
)
@Role(2)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
bpp.configure(this.executor, this.exceptionHandler);
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
bpp.setOrder((Integer)this.enableAsync.getNumber("order"));
return bpp;
}
}
AsyncAnnotationBeanPostProcessor
的父类实现了BeanFactoryAware
,那么会在AsyncAnnotationBeanPostProcessor
实例化之后回调setBeanFactory()
来实例化切面AsyncAnnotationAdvisor
.
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
//定义一个切面
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
AsyncAnnotationAdvisor
构造和声明切入的目标(切点)和代码增强(通知).
public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
asyncAnnotationTypes.add(Async.class);
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
}
catch (ClassNotFoundException ex) {
// If EJB 3.1 API not present, simply ignore.
}
//通知
this.advice = buildAdvice(executor, exceptionHandler);
//切入点
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
- 通知就是最终要执行的.
buildAdvice
用于构建通知,主要是创建一个AnnotationAsyncExecutionInterceptor
类型的拦截器,并且配置好使用的执行器和异常处理器.真正的异步执行的代码在AsyncExecutionAspectSupport
中!
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
//配置拦截器
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
- 配置拦截器,通过参数配置自定义的执行器和异常处理器或者使用默认的执行器和异常处理器.
public void configure(@Nullable Supplier<Executor> defaultExecutor,
@Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
//默认执行器
this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new);
}
getDefaultExecutor()
方法,用来查找默认的执行器,父类AsyncExecutionAspectSupport
首先寻找唯一一个类型为TaskExecutor
的执行器并返回,若存在多个则寻找默认的执行器taskExecutor
,若无法找到则直接返回null.子类AsyncExecutionInterceptor
重写getDefaultExecutor
方法,首先调用父类逻辑,返回null则配置一个名为SimpleAsyncTaskExecutor
的执行器
/** * 父类 * 获取或构建此通知实例的默认执行器 * 这里返回的执行器将被缓存以供后续使用 * 默认实现搜索唯一的TaskExecutor的bean * 在上下文中,用于名为“taskExecutor”的Executor bean. * 如果两者都不是可解析的,这个实现将返回 null */
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
if (beanFactory != null) {
try {
// 搜索唯一的一个TaskExecutor类型的bean并返回
return beanFactory.getBean(TaskExecutor.class);
}
catch (NoUniqueBeanDefinitionException ex) {
//找不到唯一一个bean异常后,搜索一个TaskExecutor类型的“taskExecutor”的bean并返回
logger.debug("Could not find unique TaskExecutor bean", ex);
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
if (logger.isInfoEnabled()) {
logger.info("More than one TaskExecutor bean found within the context, and none is named " +
"'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +
"as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDefinitionException ex) {
//未找到异常时搜索一个TaskExecutor类型的“taskExecutor”的bean并返回
logger.debug("Could not find default TaskExecutor bean", ex);
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
logger.info("No task executor bean found for async processing: " +
"no bean of type TaskExecutor and no bean named 'taskExecutor' either");
}
// Giving up -> either using local default executor or none at all...
}
}
return null;
}
/** * 子类 * 如父类为null则重新实例化一个名为SimpleAsyncTaskExecutor的执行器 */
@Override
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}
所以,到了这一步就可以理解为什么异步线程名默认叫 SimpleAsyncTaskExecutor-xx
,为什么有了自己的线程池有可能异步用到了自己的线程池配置.
我们有这个切入点之后,每次请求接口执行异步方法前都会执行 AsyncExecutionInterceptor#invoke()
, determineAsyncExecutor
用来决策使用哪个执行器
@Nullable
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
//在缓存的执行器中选择一个对应方法的执行器
AsyncTaskExecutor executor = (AsyncTaskExecutor)this.executors.get(method);
if (executor == null) {
//获取@Async注解中的value(指定的执行器)
String qualifier = this.getExecutorQualifier(method);
Executor targetExecutor;
if (StringUtils.hasLength(qualifier)) {
//获取指定执行器的bean
targetExecutor = this.findQualifiedExecutor(this.beanFactory, qualifier);
} else {
//选择默认的执行器
targetExecutor = (Executor)this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
executor = targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor)targetExecutor : new TaskExecutorAdapter(targetExecutor);
//将执行器进行缓存
this.executors.put(method, executor);
}
return (AsyncTaskExecutor)executor;
}
当有了执行器调用 doSubmit
方法将任务加入到执行器中.
异步任务,默认将采用SimpleAsyncTaskExecutor作为执行器!它有如下特点:
不复用线程,也就是说为每个任务新起一个线程.但是可以通过
concurrencyLimit
属性来控制并发线程数量,但是默认情况下不做限制(concurrencyLimit
取值为-1).
因此,如果我们使用异步任务,一定不能采用默认执行器的配置,以防OOM异常!最好的方式是指定执行器!
总结
本文主要以看源码的方式来了解异步注解 @Async
是如何在项目中选择线程以及使用线程的,尽量给异步任务指定一个独有线程池,这样会的避免不与其他业务共用线程池而造成影响.
边栏推荐
- 中小微企业如何简单便捷、低成本实现数字化?360视觉云有妙招
- 从MatePad Pro进化看鸿蒙OS的生态势能
- protobuf 中数据编码规则
- 甲方不让用开源【监控软件】?大不了我自己写一个
- 13 and OOM simulation
- 高薪程序员&面试题精讲系列132之微服务之间如何进行通信?服务熔断是怎么回事?你熟悉Hystrix吗?
- Kubernetes 笔记 / 目录
- C专家编程 第2章 这不是Bug,而是语言特性 2.1 这关语言特性何事,在Fortran里这就是Bug呀
- 【系统学习编程-编程入门-全民编程 视频教程】
- Tolstoy: There are only two misfortunes in life
猜你喜欢
Why do I strongly recommend using smart async?
面试不再被吊打!这才是Redis分布式锁的七种方案的正确打开方式
vector类
C专家编程 第3章 分析C语言的声明 3.8 理解所有分析过程的代码段
Component communication - parent-child component communication
protobuf 中数据编码规则
可复现、开放科研、跨学科合作:数据驱动下的科研趋势及应用方案
QT QT 】 【 to have developed a good program for packaging into a dynamic library
Small Tools (4) integrated Seata1.5.2 distributed transactions
数据中台“集存通用治”功能场景说明
随机推荐
MySQL窗口函数 PARTITION BY()函数介绍
Understand the recommendation system in one article: Outline 02: The link of the recommendation system, from recalling rough sorting, to fine sorting, to rearranging, and finally showing the recommend
蒋松廷 荣获第六季完美童模全球总决赛 全球总冠军
AI+BI+Visualization, Deep Analysis of Sugar BI Architecture
可复现、开放科研、跨学科合作:数据驱动下的科研趋势及应用方案
怎么在opengauss中进行测试自己添加的新函数的性能(循环n次的运行时间)?
leetcode SVM
window.open does not show favicon.icon
详谈RDMA技术原理和三种实现方式
B站回应HR称核心用户是Loser;微博回应宕机原因;Go 1.19 正式发布|极客头条
Not to be ignored!Features and advantages of outdoor LED display
从零开始搭建MySQL主从复制架构
C专家编程 第3章 分析C语言的声明 3.5 typedef可以成为你的朋友
C语言04、操作符
C专家编程 第1章 C:穿越时空的迷雾 1.10 “安静的改变”究竟有多少安静
Components of communication - the drop-down menu
我写了个”不贪吃蛇“小游戏
建造者模式/生成器模式
【AppCube】零代码小课堂开课啦
罗克韦尔AB PLC RSLogix5000中创建新项目、任务、程序和例程的具体方法和步骤