当前位置:网站首页>Which thread pool does Async use?
Which thread pool does Async use?
2022-08-02 23:32:00 【blue shirt dyed red dust】
前言
在SpringWe often use asynchronous operations in,注解中使用 @EnableAsync
和 @Async
就可以使用它了.But recently I found that the thread number used in asynchronous is the custom thread pool in our project ThreadPoolTaskExecutor
rather than the familiar SimpleAsyncTaskExecutor
So let's take a look at his execution process..
正文
- First make the asynchrony effective,We have to add in the startup class
@EnableAsync
Then click on it.它会使用@Import
注入一个AsyncConfigurationSelector
类,Start is through the parent class can decide it uses the configuration classProxyAsyncConfiguration
.
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME = "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
public AsyncConfigurationSelector() {
}
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
switch(adviceMode) {
case PROXY:
return new String[]{ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[]{"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration"};
default:
return null;
}
}
}
- Click to see the injection of a
AsyncAnnotationBeanPostProcessor
.它实现了BeanPostProcessor
接口,So it is a post-processor,用于将Spring AOP
的Advisor
应用于给定的bean
.从而该bean
The method defined by the asynchronous annotation on the above will be called truly asynchronously when it is called.
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
public ProxyAsyncConfiguration() {
}
@Bean(
name = {"org.springframework.context.annotation.internalAsyncAnnotationProcessor"}
)
@Role(2)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
bpp.configure(this.executor, this.exceptionHandler);
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
bpp.setOrder((Integer)this.enableAsync.getNumber("order"));
return bpp;
}
}
AsyncAnnotationBeanPostProcessor
的父类实现了BeanFactoryAware
,那么会在AsyncAnnotationBeanPostProcessor
实例化之后回调setBeanFactory()
来实例化切面AsyncAnnotationAdvisor
.
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
//定义一个切面
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
AsyncAnnotationAdvisor
Constructing and declaring cut-ins(切点)And the code to enhance(通知).
public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
asyncAnnotationTypes.add(Async.class);
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
}
catch (ClassNotFoundException ex) {
// If EJB 3.1 API not present, simply ignore.
}
//通知
this.advice = buildAdvice(executor, exceptionHandler);
//切入点
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
- 通知就是最终要执行的.
buildAdvice
Used to construct the notification,主要是创建一个AnnotationAsyncExecutionInterceptor
类型的拦截器,And configured using the actuator and the exception handler.The real asynchronous execution of the code is inAsyncExecutionAspectSupport
中!
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
//配置拦截器
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
- 配置拦截器,Custom executor through parameter configuration and exception handler or use the default actuators and the exception handler.
public void configure(@Nullable Supplier<Executor> defaultExecutor,
@Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
//默认执行器
this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new);
}
getDefaultExecutor()
方法,Used to find the default executor,父类AsyncExecutionAspectSupport
Looking for only one type of firstTaskExecutor
executor and return,If there are more than one, find the default executortaskExecutor
,If it cannot be found, return directlynull.子类AsyncExecutionInterceptor
重写getDefaultExecutor
方法,First call the parent class logic,返回nullconfigure a namedSimpleAsyncTaskExecutor
的执行器
/**
* 父类
* Get or build the default executor for this notification instance
* Here to return to the actuator will be cached for later use
* The default implementation searches for uniqueTaskExecutor的bean
* 在上下文中,用于名为“taskExecutor”的Executor bean.
* If neither can be resolved,This implementation will return null
*/
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
if (beanFactory != null) {
try {
// Search the only oneTaskExecutor类型的bean并返回
return beanFactory.getBean(TaskExecutor.class);
}
catch (NoUniqueBeanDefinitionException ex) {
//Can't find the only onebean异常后,搜索一个TaskExecutor类型的“taskExecutor”的bean并返回
logger.debug("Could not find unique TaskExecutor bean", ex);
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
if (logger.isInfoEnabled()) {
logger.info("More than one TaskExecutor bean found within the context, and none is named " +
"'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +
"as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDefinitionException ex) {
//Search for one when no exception is foundTaskExecutor类型的“taskExecutor”的bean并返回
logger.debug("Could not find default TaskExecutor bean", ex);
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
logger.info("No task executor bean found for async processing: " +
"no bean of type TaskExecutor and no bean named 'taskExecutor' either");
}
// Giving up -> either using local default executor or none at all...
}
}
return null;
}
/**
* 子类
* If the parent class isnullTo instantiate a namedSimpleAsyncTaskExecutor的执行器
*/
@Override
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}
所以,In this step you can understand why the asynchronous thread name by default SimpleAsyncTaskExecutor-xx
,Why is it possible to use your own thread pool configuration asynchronously with your own thread pool?.
After we have the breakthrough point,Executed every time the interface is requested to execute an asynchronous method AsyncExecutionInterceptor#invoke()
, determineAsyncExecutor
Used to decide which executor to use
@Nullable
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
//Select an executor of the corresponding method in the cached executor
AsyncTaskExecutor executor = (AsyncTaskExecutor)this.executors.get(method);
if (executor == null) {
//获取@Async注解中的value(The specified actuator)
String qualifier = this.getExecutorQualifier(method);
Executor targetExecutor;
if (StringUtils.hasLength(qualifier)) {
//获取指定执行器的bean
targetExecutor = this.findQualifiedExecutor(this.beanFactory, qualifier);
} else {
//Choose the default executor
targetExecutor = (Executor)this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
executor = targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor)targetExecutor : new TaskExecutorAdapter(targetExecutor);
//cache the executor
this.executors.put(method, executor);
}
return (AsyncTaskExecutor)executor;
}
When there is an executor call doSubmit
Method to add tasks to the actuator.
异步任务,默认将采用SimpleAsyncTaskExecutor作为执行器!它有如下特点:
不复用线程,That is up a new thread for each task.但是可以通过
concurrencyLimit
property to control the number of concurrent threads,But by default do not limit(concurrencyLimit
取值为-1).因此,If we use async tasks,The configuration of the default executor must not be used,以防OOM异常!The best way is to specify the executor!
总结
This article mainly looks at the source code to understand asynchronous annotations @Async
Is how to choose the thread and using a thread in the project,Try to specify a unique thread pool for asynchronous tasks,This will avoid the impact of not sharing the thread pool with other services.
边栏推荐
- 【软件工程导论】软件工程导论笔记
- 李沐动手学深度学习V2-bert预训练数据集和代码实现
- golang源码分析之geoip2-golang
- Meta 与苹果的元宇宙碰撞
- 4 kmiles join YiSheng group, with more strong ability of digital business, accelerate China's cross-border electricity full domain full growth
- SQL Server安装教程
- postgresql autovaccum自动清理
- setup syntax sugar defineProps defineEmits defineExpose
- 实战:10 种实现延迟任务的方法,附代码!
- Five data structures of Redis and their corresponding usage scenarios
猜你喜欢
接口测试常用工具及测试方法(入门篇)
信息学奥赛一本通(1258:【例9.2】数字金字塔)
成为黑客不得不学的语言,看完觉得你们还可吗?
Thread线程类基本使用(上)
ShardingSphere-proxy +PostgreSQL实现读写分离(静态策略)
Electron User Guide Beginning Experience
磁盘分区的知识
What is a Field Service Management System (FSM)?what is the benefit?
Translate My Wonderful | July Moli Translation Program Winners Announced
The time series database has been developed for 5 years. What problem does it need to solve?
随机推荐
磁盘分区的知识
J9数字论:互联网跨链桥有什么作用呢?
软件测试的流程规范有哪些?具体要怎么做?
.NET如何快速比较两个byte数组是否相等
Bena的生命周期
J9数字货币论:识别Web3新的稀缺性:开源开发者
力扣每日一题-第46天-344. 反转字符串
What is a Field Service Management System (FSM)?what is the benefit?
腾讯云孟凡杰:我所经历的云原生降本增效最佳实践案例
In action: 10 ways to implement delayed tasks, with code!
新增指令 v-memo
太魔人招新啦|快来加入我们吧!
所谓武功再高也怕菜刀-分区、分库、分表和分布式的优劣
什么是 IDE
信息学奥赛一本通(1258:【例9.2】数字金字塔)
Async的线程池使用的哪个?
Triacetin是什么化学材料
PG 之 SQL执行计划
信息学奥赛一本通(1259:【例9.3】求最长不下降序列)
4 kmiles join YiSheng group, with more strong ability of digital business, accelerate China's cross-border electricity full domain full growth