当前位置:网站首页>Which thread pool does Async use?
Which thread pool does Async use?
2022-08-02 23:32:00 【blue shirt dyed red dust】
前言
在SpringWe often use asynchronous operations in,注解中使用 @EnableAsync
和 @Async
就可以使用它了.But recently I found that the thread number used in asynchronous is the custom thread pool in our project ThreadPoolTaskExecutor
rather than the familiar SimpleAsyncTaskExecutor
So let's take a look at his execution process..
正文
- First make the asynchrony effective,We have to add in the startup class
@EnableAsync
Then click on it.它会使用@Import
注入一个AsyncConfigurationSelector
类,Start is through the parent class can decide it uses the configuration classProxyAsyncConfiguration
.
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME = "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
public AsyncConfigurationSelector() {
}
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
switch(adviceMode) {
case PROXY:
return new String[]{ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[]{"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration"};
default:
return null;
}
}
}
- Click to see the injection of a
AsyncAnnotationBeanPostProcessor
.它实现了BeanPostProcessor
接口,So it is a post-processor,用于将Spring AOP
的Advisor
应用于给定的bean
.从而该bean
The method defined by the asynchronous annotation on the above will be called truly asynchronously when it is called.
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
public ProxyAsyncConfiguration() {
}
@Bean(
name = {"org.springframework.context.annotation.internalAsyncAnnotationProcessor"}
)
@Role(2)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
bpp.configure(this.executor, this.exceptionHandler);
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
bpp.setOrder((Integer)this.enableAsync.getNumber("order"));
return bpp;
}
}
AsyncAnnotationBeanPostProcessor
的父类实现了BeanFactoryAware
,那么会在AsyncAnnotationBeanPostProcessor
实例化之后回调setBeanFactory()
来实例化切面AsyncAnnotationAdvisor
.
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
//定义一个切面
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
AsyncAnnotationAdvisor
Constructing and declaring cut-ins(切点)And the code to enhance(通知).
public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
asyncAnnotationTypes.add(Async.class);
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
}
catch (ClassNotFoundException ex) {
// If EJB 3.1 API not present, simply ignore.
}
//通知
this.advice = buildAdvice(executor, exceptionHandler);
//切入点
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
- 通知就是最终要执行的.
buildAdvice
Used to construct the notification,主要是创建一个AnnotationAsyncExecutionInterceptor
类型的拦截器,And configured using the actuator and the exception handler.The real asynchronous execution of the code is inAsyncExecutionAspectSupport
中!
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
//配置拦截器
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
- 配置拦截器,Custom executor through parameter configuration and exception handler or use the default actuators and the exception handler.
public void configure(@Nullable Supplier<Executor> defaultExecutor,
@Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
//默认执行器
this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new);
}
getDefaultExecutor()
方法,Used to find the default executor,父类AsyncExecutionAspectSupport
Looking for only one type of firstTaskExecutor
executor and return,If there are more than one, find the default executortaskExecutor
,If it cannot be found, return directlynull.子类AsyncExecutionInterceptor
重写getDefaultExecutor
方法,First call the parent class logic,返回nullconfigure a namedSimpleAsyncTaskExecutor
的执行器
/**
* 父类
* Get or build the default executor for this notification instance
* Here to return to the actuator will be cached for later use
* The default implementation searches for uniqueTaskExecutor的bean
* 在上下文中,用于名为“taskExecutor”的Executor bean.
* If neither can be resolved,This implementation will return null
*/
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
if (beanFactory != null) {
try {
// Search the only oneTaskExecutor类型的bean并返回
return beanFactory.getBean(TaskExecutor.class);
}
catch (NoUniqueBeanDefinitionException ex) {
//Can't find the only onebean异常后,搜索一个TaskExecutor类型的“taskExecutor”的bean并返回
logger.debug("Could not find unique TaskExecutor bean", ex);
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
if (logger.isInfoEnabled()) {
logger.info("More than one TaskExecutor bean found within the context, and none is named " +
"'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +
"as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDefinitionException ex) {
//Search for one when no exception is foundTaskExecutor类型的“taskExecutor”的bean并返回
logger.debug("Could not find default TaskExecutor bean", ex);
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
logger.info("No task executor bean found for async processing: " +
"no bean of type TaskExecutor and no bean named 'taskExecutor' either");
}
// Giving up -> either using local default executor or none at all...
}
}
return null;
}
/**
* 子类
* If the parent class isnullTo instantiate a namedSimpleAsyncTaskExecutor的执行器
*/
@Override
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}
所以,In this step you can understand why the asynchronous thread name by default SimpleAsyncTaskExecutor-xx
,Why is it possible to use your own thread pool configuration asynchronously with your own thread pool?.
After we have the breakthrough point,Executed every time the interface is requested to execute an asynchronous method AsyncExecutionInterceptor#invoke()
, determineAsyncExecutor
Used to decide which executor to use
@Nullable
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
//Select an executor of the corresponding method in the cached executor
AsyncTaskExecutor executor = (AsyncTaskExecutor)this.executors.get(method);
if (executor == null) {
//获取@Async注解中的value(The specified actuator)
String qualifier = this.getExecutorQualifier(method);
Executor targetExecutor;
if (StringUtils.hasLength(qualifier)) {
//获取指定执行器的bean
targetExecutor = this.findQualifiedExecutor(this.beanFactory, qualifier);
} else {
//Choose the default executor
targetExecutor = (Executor)this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
executor = targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor)targetExecutor : new TaskExecutorAdapter(targetExecutor);
//cache the executor
this.executors.put(method, executor);
}
return (AsyncTaskExecutor)executor;
}
When there is an executor call doSubmit
Method to add tasks to the actuator.
异步任务,默认将采用SimpleAsyncTaskExecutor作为执行器!它有如下特点:
不复用线程,That is up a new thread for each task.但是可以通过
concurrencyLimit
property to control the number of concurrent threads,But by default do not limit(concurrencyLimit
取值为-1).因此,If we use async tasks,The configuration of the default executor must not be used,以防OOM异常!The best way is to specify the executor!
总结
This article mainly looks at the source code to understand asynchronous annotations @Async
Is how to choose the thread and using a thread in the project,Try to specify a unique thread pool for asynchronous tasks,This will avoid the impact of not sharing the thread pool with other services.
边栏推荐
- Thread线程类基本使用(上)
- 【实战 已完结】WPF开发自动化生产管理平台
- postgresql autovaccum自动清理
- Parse the commonly used methods in the List interface that are overridden by subclasses
- Golang source code analysis: juju/ratelimit
- golang 源码分析:juju/ratelimit
- 六石管理学:入门机会只有一次,先把产品做好
- 【软件工程导论】软件工程导论笔记
- Geoip2 - golang golang source code analysis
- SQL Server数据类型转换函数cast()和convert()详解
猜你喜欢
LM小型可编程控制器软件(基于CoDeSys)笔记二十五:plc的数据存储区(数字量输入通道部分)
Parse common methods in the Collection interface that are overridden by subclasses
即时通讯开发移动端网络短连接的优化手段
Translate My Wonderful | July Moli Translation Program Winners Announced
OpenCV开发中的内存管理问题
TPAMI2022 | TransCL:基于Transformer的压缩学习,更灵活更强大
9,共模抑制比一-不受输入信号中共模波动的影响。【如何分析共模CM抑制比。】
ssdp协议搜索GB28181设备
GNN教程:图神经网络基础知识!
成为黑客不得不学的语言,看完觉得你们还可吗?
随机推荐
实现fashion_minst服装图像分类
成为黑客不得不学的语言,看完觉得你们还可吗?
Tencent YunMeng every jie: I experienced by cloud native authors efficiency best practices case
「 每日一练,快乐水题 」1374. 生成每种字符都是奇数个的字符串
Helm基础知识
ssdp协议搜索GB28181设备
Linphone 被叫方如何解析来电SIP消息中的自定义头消息
如何解决图像分类中的类别不均衡问题?不妨试试分开学习表征和分类器
SQL Server安装教程
OpenCV开发中的内存管理问题
js Fetch返回数据res.json()报错问题
即时通讯开发移动端网络短连接的优化手段
js如何获取浏览器缩放比例
使用位运算实现加减乘除(+、-、*、/)及比较器的用法
接口测试常用工具及测试方法(入门篇)
五大维度解读软件测试分类
太魔人招新啦|快来加入我们吧!
如何使用windbg查看C#某个线程的栈大小 ?
ALV concept explanation
李沐动手学深度学习V2-bert和代码实现