当前位置:网站首页>Trample hole record -- a series of coincidences caused by thread pool rejection policy
Trample hole record -- a series of coincidences caused by thread pool rejection policy
2022-06-25 18:16:00 【Beipiao's vegetable Xiaobai】
One 、 Business scenario
1、 The scheduled task searches a batch of data to be processed from the database , In about 140w about
2、140w Data is processed using a loop
3、 In circulation , Submit the data processing task to the thread pool ( A pit ), And in the task , Push the processing results to mq
4、 The timed mission ends
Two 、 Abnormal phenomenon
During execution of loop to , Sudden cycle interruption , Task processing failed , The cycle ends prematurely
3、 ... and 、 Code mapping
The main thread simplified code is as follows
// Get all the task data to be processed
List<T> needHandlerList = getNeedHandlerList(shardingContext);
// Get the user-defined task thread pool
AsyncTaskExecutor threadPool = getAsyncTaskExecutor();
if (Objects.isNull(threadPool)) {
// If the user's thread pool is not obtained , Then use the thread pool provided by itself
threadPool = SpringContextHolder.getBean(AsyncTaskExecutor.class, "callableAsyncThreadPool");
}
if (!CollectionUtils.isEmpty(needHandlerList)) {
for (T obj : needHandlerList) {
// Get thread bean name
final String callableSpringBeanName = getCallableSpringBeanName();
// Verify non empty
Objects.requireNonNull(callableSpringBeanName);
// Get the thread object from the context , Each implementation class , adopt com.qlm.yfb.component.AbstractAsyncThreadPostProcessor Set up Scope by prototype
// @Scope("prototype") explain : When a class has prototype When the tag , from spring ioc When getting objects , The prototype mode will be maintained
AbstractCallableThread<T, V> callableThread = SpringContextHolder.getBean(AbstractCallableThread.class, callableSpringBeanName);
// Verify non empty
Objects.requireNonNull(callableThread);
try{
// Calls to perform
doExecute(callableThread, obj, threadPool, needHandlerList.size(), shardingContext.getJobName(), redisMemberSetKey, rateLimiter);
}catch (Exception e){
logger.error(shardingContext.getJobName() + " Loop processing generated an exception " ,e );
}
}
}
private void doExecute(AbstractCallableThread<T, V> callableThread,
T param,
AsyncTaskExecutor threadPool,
int total,
String jobName,
String redisMemberSetKey,
RateLimiter rateLimiter) {
if (rateLimiter != null) {
// Start current limiting
rateLimiter.acquire();
}
// Set execution parameters
callableThread.setAsyncParam(param);
threadPool.execute(()->{
// Wrap Thread objects
FutureTask<V> futureTask = new FutureTask<>(callableThread);
futureTask.run();
callableThread.done(futureTask);
});
redisTemplate.opsForSet().add(redisMemberSetKey, param.getMemberId());
Long size = redisTemplate.opsForSet().size(redisMemberSetKey);
logger.info(" Timing task :{}, Total data {}: Total number of data currently processed :{}", jobName, total, size);
}
The simplified version of the thread code is as follows
protected TemplateLogDTO asyncExecute(MemberDTO dto){
// Business logic
}
protected void done(Future<TemplateLogDTO> result){
try{
result.get();
// Send to rabbit mq
}catch(InterruptedException | ExecutionException | TimeoutException e) {
// normal catch Exception related information
}
}
Thread pool code
public AsyncTaskExecutor taskThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// Get... From the server cpu kernel
int i = Runtime.getRuntime().availableProcessors();
// Core pool size
executor.setCorePoolSize(i);
// Maximum number of threads
executor.setMaxPoolSize(i * 2);
// The queue length
executor.setQueueCapacity(300000);
// Thread idle time
executor.setKeepAliveSeconds(1000);
// Thread prefix name
executor.setThreadNamePrefix("taskThread-pool-");
// Configure rejection policy
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
Four 、 Troubleshoot problems
1、 When the task is interrupted, it is found that java and rabbit mq-server All links are closed , The preliminary judgment is caused by configuration problems , Find relevant abnormal information
2021-03-23 17:07:59,574 [dsTsZBxxTUEtoSUN3_Worker-1] ERROR [com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler33] - Job 'dsTsZBxxTUEtoSUN3' exception occur in job processing
org.springframework.amqp.AmqpResourceNotAvailableException: The channelMax limit is reached. Try later.
at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:60)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createBareChannel(CachingConnectionFactory.java:1434)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.access$200(CachingConnectionFactory.java:1420)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:723)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:706)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getCachedChannelProxy(CachingConnectionFactory.java:676)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:567)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$1600(CachingConnectionFactory.java:102)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:1439)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2095)
at org.springframework.amqp.rabbit.core.RabbitTemplate.lambda$execute$13(RabbitTemplate.java:2051)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2050)
at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:1009)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:1075)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:1068)
at com.qlm.yfb.thread.ZbPushThread.handlerCallableResult(ZbPushThread.java:302)
at com.qlm.yfb.thread.AbstractCallableThread.done(AbstractCallableThread.java:49)
at com.qlm.yfb.component.AbstractAsyncTemplate.lambda$doExecute$0(AbstractAsyncTemplate.java:208)
at java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy.rejectedExecution(ThreadPoolExecutor.java:2038)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:321)
at com.qlm.yfb.component.AbstractAsyncTemplate.doExecute(AbstractAsyncTemplate.java:206)
at com.qlm.yfb.component.AbstractAsyncTemplate.execute(AbstractAsyncTemplate.java:114)
at com.dangdang.ddframe.job.executor.type.SimpleJobExecutor.process(SimpleJobExecutor.java:41)
at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.process(AbstractElasticJobExecutor.java:206)
at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.process(AbstractElasticJobExecutor.java:171)
at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.execute(AbstractElasticJobExecutor.java:150)
at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.execute(AbstractElasticJobExecutor.java:122)
at com.dangdang.ddframe.job.lite.internal.schedule.LiteJob.execute(LiteJob.java:26)
at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)
It might be understood as rabbit mq Of channel-max The configuration is too small
Solution : modify rabbit mq server Configuration file for , Adjust the configuration value
The profile is located in /etc/rabbitmq/rabbitmq.conf
Original value 128, After modification 1000
Be careful : This value will be the same as java Client side comparison , Minimum value , If it's all for null or 0, There is no limit to
2、 Why does the task submitted to the thread pool generate an exception , Will cause a loop break in the main thread ?
Think for a long time , Personally, I think it is caused by the thread pool rejection policy
ThreadPoolExecutor.CallerRunsPolicy(): When the buffer queue of the thread pool is full , The main thread will be called back to execute the task
Unfortunately ,Future Code inside catch In the exception of , Does not include AmqpResourceNotAvailableException
So when an exception occurs in task processing , The main thread is interrupted
resolvent ,
1、 Call... In a loop doExecute when ,try{}catch(Exception e), Low performance , But simple and rough
2、 Enlarge the thread pool queue length , It takes some calculation to , Poor variability
3、 If the task allows discarding , Replace the reject policy
I use the method 1, Simple 、 Brutal ....
边栏推荐
- 怎么判断自己是否适合转行软件测试
- SQL Server real time backup library requirements
- 踩坑记录---线程池拒绝策略引起的一系列巧合
- lock
- [compréhension approfondie de la technologie tcaplusdb] sauvegarde des données d'affaires tcaplusdb
- IVX sailing
- [deeply understand tcapulusdb technology] tcapulusdb import data
- IET出席2022世界科技社团发展与治理论坛 为构建国际科技共同体献言献策
- Install spark + run Scala related projects with commands + crontab scheduled execution
- Sword finger offer double pointer
猜你喜欢

Garbage collector and memory allocation strategy

Encryption trend: Fashion advances to the meta universe

移动端异构运算技术 - GPU OpenCL 编程(基础篇)

深度学习网路模型

【深入理解TcaplusDB技术】TcaplusDB运维单据
![[deeply understand tcapulusdb technology] tcapulusdb import data](/img/31/4e33fafa090e0bb5b55e11978cdff8.png)
[deeply understand tcapulusdb technology] tcapulusdb import data

Sword finger offer double pointer

Solve nvprof error err_ NVGPUCTRPERM - The user does not have permission to profile on the target device.

lock

Article 7: there is no code prompt in clion,,,
随机推荐
安装spark + 用命令运行scala相关项目 + crontab定时执行
Swagger implements background interface automation document
How to open a stock account is it safe to open an account
Wechat applet reports an error: request:fail URL not in domain list
Using QT to make a beautiful login interface box
QT generate random numbers (random strings) within the specified range
Solve nvprof error err_ NVGPUCTRPERM - The user does not have permission to profile on the target device.
【深入理解TcaplusDB技术】单据受理之表管理
The icon is missing. What does the URL come from with the jesssionid?
Sword finger offer double pointer
【深入理解TcaplusDB技术】单据受理之创建游戏区
New characteristics of cultural consumption in the era of digital economy
【深入理解TcaplusDB技术】如何实现Tmonitor单机安装
【深入理解TcaplusDB技术】创建游戏区
new TypeReference用法 fastjson[通俗易懂]
ASP. Net supermarket convenience store online shopping mall source code, for the surrounding distribution system
158_模型_Power BI 使用 DAX + SVG 打通制作商业图表几乎所有可能
ASP.NET超市便利店在线购物商城源码,针对周边配送系统
【深入理解TcaplusDB技術】TcaplusDB業務數據備份
Is it convenient to open a stock account? Is online account opening safe?