当前位置:网站首页>Custom thread pool rejection policy
Custom thread pool rejection policy
2022-07-07 13:29:00 【Bug Commander】
One . Default deny policy
ThreadPoolExceutor.AbortPolicy : Discard the task and throw it out RejectedExecutionException abnormal .
ThreadPoolExecutor.DiscardPolicy: Discarding the task , But no exception is thrown . ThreadPoolExecutor.DiscardOldestPolicy: Discard the top task in the queue , Then resubmit the rejected task
ThreadPoolExecutor.CallerRunsPolicy: By the calling thread ( The thread that submitted the task ) Deal with the task
Generally speaking , Default this 4 Type of rejection policy , It is not applicable in some scenarios , For example, we don't want to discard any tasks , So before 3 The two rejection strategies are not applicable
The first 4 This rejection strategy is special , When this policy is implemented , Will give the current thread to The thread that submitted the task To perform this task , For example, you are through The main thread Open thread task , So this rejection strategy is to put Tasks rejected by the thread pool , Backhand to Main thread execution , Scenarios with high concurrency , Will block the main thread , So it is not very applicable , The following uses pseudo code to customize the rejection policy
Two . Custom reject policy
If you don't want to discard the task and leave it to the main thread to execute the task , Can we collect these rejected tasks ? So consider using a cache container , And it must be a concurrency safe cache container , So you can use juc Under bag java.util.concurrent.LinkedBlockingQueue, Introduce this class first
LinkedBlockingQueue Is a one-way linked list implementation of the blocking queue , The queue by FIFO( fifo ) Sorting elements , The new element is inserted at the end of the queue , And the queue get operation will get the element at the head of the queue , The throughput of linked queues is usually higher than that of array based queues , But in most concurrent applications , Its predictable performance is lower
Besides , LinkedBlockingQueue Or optional capacity ( Prevent excessive expansion ), That is, you can specify the capacity of the queue , If you don't specify , The default capacity size is equal to Integer.MAX_VALUE, I will not specify the capacity of the queue here
LinkedBlockingQueue In the realization of " Mutex access to competing resources by multiple threads " when , For put and take Method operations use different locks , about put operation , adopt " Insert the lock putLock" To synchronize , For the extraction operation , adopt " Take out the lock takeLock" To synchronize
If a thread ( The simulation in the code is SchedulerTask) To retrieve data , The queue is just empty , The thread will execute notEmpty.await() Wait for , When some other thread ( The simulation in the code is the reject task of the thread pool ) After inserting data into the queue , Would call notEmpty.signal() Wake up the "notEmpty On the waiting thread ”, here , ( The simulation in the code is SchedulerTask) Will be awakened to continue to run , Besides , ( The simulation in the code is SchedulerTask) Before the fetch operation , Will get takeLock, Release after the fetch operation is completed takeLock
If a thread ( The simulation in the code is the reject task of the thread pool ) To insert data , The queue is full , Then the thread will execute notFull.await() Wait for , When some other thread ( The simulation in the code is SchedulerTask) After taking out the data , Would call notFull.signal() Wake up the "notFull On the waiting thread ", here , ( The simulation in the code is the reject task of the thread pool ) Will be awakened to continue to run , Besides , ( The simulation in the code is the reject task of the thread pool ) Before performing the insert operation , Will get putLock, It is not released until the insertion operation is completed putLock
2.1 First, customize the rejection policy implementation
Customize a class implementation java.util.concurrent.RejectedExecutionHandler Interface
public class CustomizeRejectionPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (r != null) {
// Tasks that the thread pool does not have time to execute are put into the queue first
ThreadReader.put(r);
}
}
}
2.2 Implement an intermediate class , For declaration LinkedBlockingQueue
@Component
public class ThreadReader {
private static final Logger logger = LoggerFactory.getLogger(ThreadReader.class);
private static final BlockingQueue<Runnable> BLOCKING_QUEUE = new LinkedBlockingQueue<>();
@Qualifier("thread-pool")
@Autowired
private ThreadPoolExecutor threadPoolExecutor;
public static void put(Runnable runnable) {
BlockingQueue<Runnable> blockingQueue = getBlockingQueue();
try {
blockingQueue.put(runnable);
} catch (InterruptedException e) {
// Ignore
}
}
public void take() {
BlockingQueue<Runnable> blockingQueue = getBlockingQueue();
if (CollectionUtils.isEmpty(blockingQueue)) {
return;
}
try {
Runnable runnable = blockingQueue.take();
logger.info(LogUtils.format(" Fetch the tasks that the current thread pool has not been able to execute , runnable=<{0}>", runnable));
threadPoolExecutor.execute(runnable);
} catch (InterruptedException e) {
//
}
}
public static BlockingQueue<Runnable> getBlockingQueue() {
return BLOCKING_QUEUE;
}
}
2.3 Define the fetch class of the queue , Single thread is used here , No longer the main thread
@Component
public class CustomizeScheduler implements ApplicationListener<ContextRefreshedEvent> {
@Autowired
private ThreadReader threadReader;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setThreadFactory(new NamedThreadFactory("SchedulerTask")).build();
ScheduledExecutorService scheduledExecutor =
new ScheduledThreadPoolExecutor(1, threadFactory, new CustomizeRejectionPolicy());
// After every 1 Second, there is no time to execute tasks in the execution queue
scheduledExecutor.scheduleAtFixedRate(() -> threadReader.take(), 1, 1, TimeUnit.SECONDS);
}
}
2.4 Simulation test
First Thread pool The configuration is as follows , And then use jmeter simulation 2000 A request
@Configuration
public class ThreadPoolConfig {
@Bean("thread-pool")
public static ThreadPoolExecutor threadPoolExecutor() {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8, 16,1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(100),
new NamedThreadFactory("common-service"), new CustomizeRejectionPolicy());
threadPoolExecutor.allowCoreThreadTimeOut(true);
return threadPoolExecutor;
}
}
Then check the console , You can see the log printing , You can find Single thread every other 1 Seconds from the queue Tasks rejected by the thread pool , And add the task back to the thread pool , Re execution
3、 ... and . Related design patterns
The classic design pattern of multithreading is used here , Producer-Consumer Pattern , The thread pool submits a task that is denied execution , He is equivalent to a producer , Single thread is equivalent to consumer , Deal with these tasks , Intermediate class ThreadReader amount to Channel passageway , This design pattern can be explained in detail in the blog : Multithreading based design patterns Producer-Consumer Pattern
Why not directly introduce a single thread into the rejection policy to execute the rejected task ?
In this way, the performance of the program will be reduced , The thread pool only needs to add the rejected tasks to the queue , There is no need to care about how a single thread is scheduled , There is no need to wait for a single thread to process these tasks , You can immediately put the next task in the queue
边栏推荐
猜你喜欢
随机推荐
error LNK2019: 无法解析的外部符号
靠卖概念上市,认养一头牛能走多远?
MongoDB命令汇总
简单好用的代码规范
数字ic设计——SPI
[learning notes] segment tree selection
Mongodb slice summary
[learning notes] zkw segment tree
ESP32构解工程添加组件
MATLAB中polarscatter函数使用
Realbasicvsr test pictures and videos
Getting started with cinnamon applet
学习突围2 - 关于高效学习的方法
Write it down once Net a new energy system thread surge analysis
Mongodb replication (replica set) summary
Japanese government and enterprise employees got drunk and lost 460000 information USB flash drives. They publicly apologized and disclosed password rules
QQ medicine, Tencent ticket
[Presto profile series] timeline use
Use of polarscatter function in MATLAB
服务器到服务器 (S2S) 事件 (Adjust)