当前位置:网站首页>Trample hole record -- a series of coincidences caused by thread pool rejection policy

Trample hole record -- a series of coincidences caused by thread pool rejection policy

2022-06-25 18:16:00 Beipiao's vegetable Xiaobai

One 、 Business scenario
1、 The scheduled task searches a batch of data to be processed from the database , In about 140w about
2、140w Data is processed using a loop
3、 In circulation , Submit the data processing task to the thread pool ( A pit ), And in the task , Push the processing results to mq
4、 The timed mission ends
Two 、 Abnormal phenomenon
During execution of loop to , Sudden cycle interruption , Task processing failed , The cycle ends prematurely
3、 ... and 、 Code mapping
The main thread simplified code is as follows

//  Get all the task data to be processed 
        List<T> needHandlerList = getNeedHandlerList(shardingContext);
        //  Get the user-defined task thread pool 
        AsyncTaskExecutor threadPool = getAsyncTaskExecutor();
        if (Objects.isNull(threadPool)) {
            //  If the user's thread pool is not obtained , Then use the thread pool provided by itself 
            threadPool = SpringContextHolder.getBean(AsyncTaskExecutor.class, "callableAsyncThreadPool");
        }
        if (!CollectionUtils.isEmpty(needHandlerList)) {
            for (T obj : needHandlerList) {
                //  Get thread bean name 
                final String callableSpringBeanName = getCallableSpringBeanName();
                //  Verify non empty 
                Objects.requireNonNull(callableSpringBeanName);
                //  Get the thread object from the context , Each implementation class , adopt com.qlm.yfb.component.AbstractAsyncThreadPostProcessor  Set up Scope by prototype
                // @Scope("prototype")   explain : When a class has  prototype When the tag , from spring ioc When getting objects , The prototype mode will be maintained 
                AbstractCallableThread<T, V> callableThread = SpringContextHolder.getBean(AbstractCallableThread.class, callableSpringBeanName);
                //  Verify non empty 
                Objects.requireNonNull(callableThread);
                try{
                //  Calls to perform 
                    doExecute(callableThread, obj, threadPool, needHandlerList.size(), shardingContext.getJobName(), redisMemberSetKey, rateLimiter);
                }catch (Exception e){
                    logger.error(shardingContext.getJobName() + " Loop processing generated an exception " ,e );
                }
            }
     }




private void doExecute(AbstractCallableThread<T, V> callableThread,
                           T param,
                           AsyncTaskExecutor threadPool,
                           int total,
                           String jobName,
                           String redisMemberSetKey,
                           RateLimiter rateLimiter) {
        if (rateLimiter != null) {
            //  Start current limiting 
            rateLimiter.acquire();
        }
        //  Set execution parameters 
        callableThread.setAsyncParam(param);
        threadPool.execute(()->{
            //  Wrap Thread objects 
            FutureTask<V> futureTask = new FutureTask<>(callableThread);
            futureTask.run();
            callableThread.done(futureTask);
        });
        
        redisTemplate.opsForSet().add(redisMemberSetKey, param.getMemberId());
        Long size = redisTemplate.opsForSet().size(redisMemberSetKey);
        logger.info(" Timing task :{}, Total data {}: Total number of data currently processed :{}", jobName, total, size);
    }

The simplified version of the thread code is as follows

protected TemplateLogDTO asyncExecute(MemberDTO dto){
	//  Business logic 
}

protected void done(Future<TemplateLogDTO> result){
	try{
		result.get();
		//  Send to rabbit mq
	}catch(InterruptedException | ExecutionException | TimeoutException e) {
		//  normal catch Exception related information 
	}
}

Thread pool code

public AsyncTaskExecutor taskThreadPool() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // Get... From the server cpu kernel 
        int i = Runtime.getRuntime().availableProcessors();
        // Core pool size 
        executor.setCorePoolSize(i);
        // Maximum number of threads 
        executor.setMaxPoolSize(i * 2);
        // The queue length 
        executor.setQueueCapacity(300000);
        // Thread idle time 
        executor.setKeepAliveSeconds(1000);
        // Thread prefix name 
        executor.setThreadNamePrefix("taskThread-pool-");
        // Configure rejection policy 
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }

Four 、 Troubleshoot problems
1、 When the task is interrupted, it is found that java and rabbit mq-server All links are closed , The preliminary judgment is caused by configuration problems , Find relevant abnormal information

2021-03-23 17:07:59,574 [dsTsZBxxTUEtoSUN3_Worker-1] ERROR [com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler33] - Job 'dsTsZBxxTUEtoSUN3' exception occur in job processing
org.springframework.amqp.AmqpResourceNotAvailableException: The channelMax limit is reached. Try later.
	at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:60)
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createBareChannel(CachingConnectionFactory.java:1434)
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.access$200(CachingConnectionFactory.java:1420)
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:723)
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:706)
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getCachedChannelProxy(CachingConnectionFactory.java:676)
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:567)
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$1600(CachingConnectionFactory.java:102)
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:1439)
	at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2095)
	at org.springframework.amqp.rabbit.core.RabbitTemplate.lambda$execute$13(RabbitTemplate.java:2051)
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
	at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180)
	at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2050)
	at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:1009)
	at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:1075)
	at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:1068)
	at com.qlm.yfb.thread.ZbPushThread.handlerCallableResult(ZbPushThread.java:302)
	at com.qlm.yfb.thread.AbstractCallableThread.done(AbstractCallableThread.java:49)
	at com.qlm.yfb.component.AbstractAsyncTemplate.lambda$doExecute$0(AbstractAsyncTemplate.java:208)
	at java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy.rejectedExecution(ThreadPoolExecutor.java:2038)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:321)
	at com.qlm.yfb.component.AbstractAsyncTemplate.doExecute(AbstractAsyncTemplate.java:206)
	at com.qlm.yfb.component.AbstractAsyncTemplate.execute(AbstractAsyncTemplate.java:114)
	at com.dangdang.ddframe.job.executor.type.SimpleJobExecutor.process(SimpleJobExecutor.java:41)
	at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.process(AbstractElasticJobExecutor.java:206)
	at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.process(AbstractElasticJobExecutor.java:171)
	at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.execute(AbstractElasticJobExecutor.java:150)
	at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.execute(AbstractElasticJobExecutor.java:122)
	at com.dangdang.ddframe.job.lite.internal.schedule.LiteJob.execute(LiteJob.java:26)
	at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
	at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)

It might be understood as rabbit mq Of channel-max The configuration is too small
Solution : modify rabbit mq server Configuration file for , Adjust the configuration value
The profile is located in /etc/rabbitmq/rabbitmq.conf
Original value 128, After modification 1000
Be careful : This value will be the same as java Client side comparison , Minimum value , If it's all for null or 0, There is no limit to

2、 Why does the task submitted to the thread pool generate an exception , Will cause a loop break in the main thread ?
Think for a long time , Personally, I think it is caused by the thread pool rejection policy
ThreadPoolExecutor.CallerRunsPolicy(): When the buffer queue of the thread pool is full , The main thread will be called back to execute the task
Unfortunately ,Future Code inside catch In the exception of , Does not include AmqpResourceNotAvailableException

So when an exception occurs in task processing , The main thread is interrupted

resolvent ,
1、 Call... In a loop doExecute when ,try{}catch(Exception e), Low performance , But simple and rough
2、 Enlarge the thread pool queue length , It takes some calculation to , Poor variability
3、 If the task allows discarding , Replace the reject policy
I use the method 1, Simple 、 Brutal ....

原网站

版权声明
本文为[Beipiao's vegetable Xiaobai]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/176/202206251810071061.html