当前位置:网站首页>踩坑记录---线程池拒绝策略引起的一系列巧合
踩坑记录---线程池拒绝策略引起的一系列巧合
2022-06-25 18:10:00 【北漂的菜小白】
一、业务场景
1、定时任务从数据库搜索一批需要处理的数据,大概在140w左右
2、140w数据使用循环处理
3、循环中,将数据处理任务提交到线程池(坑一),并在任务中,把处理结果推送到mq
4、定时任务结束
二、异常现象
在执行循环到过程中,突然循环中断,任务处理失败,循环提前结束
三、代码贴图
主线程简化版代码如下
// 获取全部需要处理的任务数据
List<T> needHandlerList = getNeedHandlerList(shardingContext);
// 获取用户自定义的任务线程池
AsyncTaskExecutor threadPool = getAsyncTaskExecutor();
if (Objects.isNull(threadPool)) {
// 如果未获取到用户的线程池,则使用本身提供的线程池
threadPool = SpringContextHolder.getBean(AsyncTaskExecutor.class, "callableAsyncThreadPool");
}
if (!CollectionUtils.isEmpty(needHandlerList)) {
for (T obj : needHandlerList) {
// 获取线程bean名字
final String callableSpringBeanName = getCallableSpringBeanName();
// 验证非空
Objects.requireNonNull(callableSpringBeanName);
// 从上下文中获取线程对象,每个实现类,通过com.qlm.yfb.component.AbstractAsyncThreadPostProcessor 设置Scope为prototype
// @Scope("prototype") 说明:当一个类具有 prototype标记时,从spring ioc获取对象时,将保持原型模式
AbstractCallableThread<T, V> callableThread = SpringContextHolder.getBean(AbstractCallableThread.class, callableSpringBeanName);
// 验证非空
Objects.requireNonNull(callableThread);
try{
// 调用执行
doExecute(callableThread, obj, threadPool, needHandlerList.size(), shardingContext.getJobName(), redisMemberSetKey, rateLimiter);
}catch (Exception e){
logger.error(shardingContext.getJobName() + "循环处理产生异常" ,e );
}
}
}
private void doExecute(AbstractCallableThread<T, V> callableThread,
T param,
AsyncTaskExecutor threadPool,
int total,
String jobName,
String redisMemberSetKey,
RateLimiter rateLimiter) {
if (rateLimiter != null) {
// 启动限流
rateLimiter.acquire();
}
// 设置执行参数
callableThread.setAsyncParam(param);
threadPool.execute(()->{
// 包装线程对象
FutureTask<V> futureTask = new FutureTask<>(callableThread);
futureTask.run();
callableThread.done(futureTask);
});
redisTemplate.opsForSet().add(redisMemberSetKey, param.getMemberId());
Long size = redisTemplate.opsForSet().size(redisMemberSetKey);
logger.info("定时任务:{},总数据量{}:当前处理数据总数:{}", jobName, total, size);
}
线程代码简化版如下
protected TemplateLogDTO asyncExecute(MemberDTO dto){
// 业务逻辑
}
protected void done(Future<TemplateLogDTO> result){
try{
result.get();
// 发送到rabbit mq
}catch(InterruptedException | ExecutionException | TimeoutException e) {
// 正常catch异常相关信息
}
}
线程池代码
public AsyncTaskExecutor taskThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//获取到服务器的cpu内核
int i = Runtime.getRuntime().availableProcessors();
//核心池大小
executor.setCorePoolSize(i);
//最大线程数
executor.setMaxPoolSize(i * 2);
//队列长度
executor.setQueueCapacity(300000);
//线程空闲时间
executor.setKeepAliveSeconds(1000);
//线程前缀名称
executor.setThreadNamePrefix("taskThread-pool-");
//配置拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
四、问题排查
1、任务中断时发现 java 和rabbit mq-server链接全部关闭,初步判断是因为配置问题引起的,发现相关异常信息
2021-03-23 17:07:59,574 [dsTsZBxxTUEtoSUN3_Worker-1] ERROR [com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler33] - Job 'dsTsZBxxTUEtoSUN3' exception occur in job processing
org.springframework.amqp.AmqpResourceNotAvailableException: The channelMax limit is reached. Try later.
at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:60)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createBareChannel(CachingConnectionFactory.java:1434)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.access$200(CachingConnectionFactory.java:1420)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:723)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:706)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getCachedChannelProxy(CachingConnectionFactory.java:676)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:567)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$1600(CachingConnectionFactory.java:102)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:1439)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2095)
at org.springframework.amqp.rabbit.core.RabbitTemplate.lambda$execute$13(RabbitTemplate.java:2051)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2050)
at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:1009)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:1075)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:1068)
at com.qlm.yfb.thread.ZbPushThread.handlerCallableResult(ZbPushThread.java:302)
at com.qlm.yfb.thread.AbstractCallableThread.done(AbstractCallableThread.java:49)
at com.qlm.yfb.component.AbstractAsyncTemplate.lambda$doExecute$0(AbstractAsyncTemplate.java:208)
at java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy.rejectedExecution(ThreadPoolExecutor.java:2038)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:321)
at com.qlm.yfb.component.AbstractAsyncTemplate.doExecute(AbstractAsyncTemplate.java:206)
at com.qlm.yfb.component.AbstractAsyncTemplate.execute(AbstractAsyncTemplate.java:114)
at com.dangdang.ddframe.job.executor.type.SimpleJobExecutor.process(SimpleJobExecutor.java:41)
at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.process(AbstractElasticJobExecutor.java:206)
at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.process(AbstractElasticJobExecutor.java:171)
at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.execute(AbstractElasticJobExecutor.java:150)
at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.execute(AbstractElasticJobExecutor.java:122)
at com.dangdang.ddframe.job.lite.internal.schedule.LiteJob.execute(LiteJob.java:26)
at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)
大概可以理解为 rabbit mq的 channel-max配置过小导致的
解决方案:修改rabbit mq server的配置文件,调整此项配置值
配置文件位于 /etc/rabbitmq/rabbitmq.conf
原始值128,修改后1000
注意:此项值会和java客户端进行比较,取最小值,如果都为null或0,则不限制
2、为什么提交到线程池中的任务产生了异常,会导致主线程中的循环中断?
思考了很久,个人认为是线程池拒绝策略导致的
ThreadPoolExecutor.CallerRunsPolicy():当线程池的缓冲队列打满了的情况下,会回调主线程执行该任务
很不凑巧,Future里面的代码catch的异常中,并不包括AmqpResourceNotAvailableException
所以当任务处理产生异常之后,主线程就被中断了
解决方法,
1、在循环调用doExecute时,try{}catch(Exception e),性能较低,但是简单粗暴
2、放大线程池队列长度,需要一些计算才可以,可变性差
3、如果任务允许丢弃,更换拒绝策略
我使用的是方法1,简单、粗暴。。。。
边栏推荐
- Acy100 oil fume concentration online monitor for kitchen oil fume emission in catering industry
- Vscode automatically generates ifndef define ENDIF of header file
- HMS Core机器学习服务实现同声传译,支持中英文互译和多种音色语音播报
- ASP. Net supermarket convenience store online shopping mall source code, for the surrounding distribution system
- About Equilibrium - Simplified bottleneck model
- 篇7:CLion中没有代码提示,,,
- 实际开户复杂吗?在线开户安全么?
- Uncover ges super large scale graph computing engine hyg: Graph Segmentation
- 怎么判断自己是否适合转行软件测试
- 1. Understanding of norm
猜你喜欢

Centos7 installing redis 7.0.2

What is an operator?

什么是算子?

Acy100 oil fume concentration online monitor for kitchen oil fume emission in catering industry
![[tips] how to quickly start a new position for a new software testing engineer](/img/88/5c002f492db56c646cbfd1ee98cd5b.png)
[tips] how to quickly start a new position for a new software testing engineer
![[compilation principle] lexical analysis](/img/b2/8f7dea3944839e27199b28d903d9f0.png)
[compilation principle] lexical analysis

Virtual machine class loading mechanism

Using QT to make a beautiful login interface box

About Equilibrium - Simplified bottleneck model

1、对范数的理解
随机推荐
Android Internet of things application development (smart Park) - picture preview interface
IET出席2022世界科技社团发展与治理论坛 为构建国际科技共同体献言献策
Introduction to microservices
解析数仓lazyagg查询重写优化
什么是泛型以及在集合中泛型的使用[通俗易懂]
Encryption trend: Fashion advances to the meta universe
CentOS7 安装 Redis 7.0.2
[machine learning] case study of college entrance examination prediction based on multiple time series
什么是算子?
篇6:CLion:Toolchains are not configured Configure Disable profile
ASP. Net supermarket convenience store online shopping mall source code, for the surrounding distribution system
Is it safe for a securities company to open an account with the lowest handling fee among the top ten
Qinheng ch583 USB custom hid debugging record
[tips] how to quickly start a new position for a new software testing engineer
【 NLP 】 in this year's English college entrance examination, CMU delivered 134 high scores with reconstruction pre training, significantly surpassing gpt3
How about qiniu's Zhangle TenPay? Is it safe
What is public chain development? What are the public chain development projects?
How to judge whether you are suitable for software testing
卷积操作的本质特性+TextCNN文本分类
Operating steps for installing CUDA in win10 (continuous improvement)