当前位置:网站首页>Custom thread pool rejection policy
Custom thread pool rejection policy
2022-07-07 13:29:00 【Bug Commander】
One . Default deny policy
ThreadPoolExceutor.AbortPolicy : Discard the task and throw it out RejectedExecutionException abnormal .
ThreadPoolExecutor.DiscardPolicy: Discarding the task , But no exception is thrown . ThreadPoolExecutor.DiscardOldestPolicy: Discard the top task in the queue , Then resubmit the rejected task
ThreadPoolExecutor.CallerRunsPolicy: By the calling thread ( The thread that submitted the task ) Deal with the task
Generally speaking , Default this 4 Type of rejection policy , It is not applicable in some scenarios , For example, we don't want to discard any tasks , So before 3 The two rejection strategies are not applicable
The first 4 This rejection strategy is special , When this policy is implemented , Will give the current thread to The thread that submitted the task To perform this task , For example, you are through The main thread Open thread task , So this rejection strategy is to put Tasks rejected by the thread pool , Backhand to Main thread execution , Scenarios with high concurrency , Will block the main thread , So it is not very applicable , The following uses pseudo code to customize the rejection policy
Two . Custom reject policy
If you don't want to discard the task and leave it to the main thread to execute the task , Can we collect these rejected tasks ? So consider using a cache container , And it must be a concurrency safe cache container , So you can use juc Under bag java.util.concurrent.LinkedBlockingQueue, Introduce this class first
LinkedBlockingQueue Is a one-way linked list implementation of the blocking queue , The queue by FIFO( fifo ) Sorting elements , The new element is inserted at the end of the queue , And the queue get operation will get the element at the head of the queue , The throughput of linked queues is usually higher than that of array based queues , But in most concurrent applications , Its predictable performance is lower
Besides , LinkedBlockingQueue Or optional capacity ( Prevent excessive expansion ), That is, you can specify the capacity of the queue , If you don't specify , The default capacity size is equal to Integer.MAX_VALUE, I will not specify the capacity of the queue here
LinkedBlockingQueue In the realization of " Mutex access to competing resources by multiple threads " when , For put and take Method operations use different locks , about put operation , adopt " Insert the lock putLock" To synchronize , For the extraction operation , adopt " Take out the lock takeLock" To synchronize
If a thread ( The simulation in the code is SchedulerTask) To retrieve data , The queue is just empty , The thread will execute notEmpty.await() Wait for , When some other thread ( The simulation in the code is the reject task of the thread pool ) After inserting data into the queue , Would call notEmpty.signal() Wake up the "notEmpty On the waiting thread ”, here , ( The simulation in the code is SchedulerTask) Will be awakened to continue to run , Besides , ( The simulation in the code is SchedulerTask) Before the fetch operation , Will get takeLock, Release after the fetch operation is completed takeLock
If a thread ( The simulation in the code is the reject task of the thread pool ) To insert data , The queue is full , Then the thread will execute notFull.await() Wait for , When some other thread ( The simulation in the code is SchedulerTask) After taking out the data , Would call notFull.signal() Wake up the "notFull On the waiting thread ", here , ( The simulation in the code is the reject task of the thread pool ) Will be awakened to continue to run , Besides , ( The simulation in the code is the reject task of the thread pool ) Before performing the insert operation , Will get putLock, It is not released until the insertion operation is completed putLock
2.1 First, customize the rejection policy implementation
Customize a class implementation java.util.concurrent.RejectedExecutionHandler Interface
public class CustomizeRejectionPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (r != null) {
// Tasks that the thread pool does not have time to execute are put into the queue first
ThreadReader.put(r);
}
}
}
2.2 Implement an intermediate class , For declaration LinkedBlockingQueue
@Component
public class ThreadReader {
private static final Logger logger = LoggerFactory.getLogger(ThreadReader.class);
private static final BlockingQueue<Runnable> BLOCKING_QUEUE = new LinkedBlockingQueue<>();
@Qualifier("thread-pool")
@Autowired
private ThreadPoolExecutor threadPoolExecutor;
public static void put(Runnable runnable) {
BlockingQueue<Runnable> blockingQueue = getBlockingQueue();
try {
blockingQueue.put(runnable);
} catch (InterruptedException e) {
// Ignore
}
}
public void take() {
BlockingQueue<Runnable> blockingQueue = getBlockingQueue();
if (CollectionUtils.isEmpty(blockingQueue)) {
return;
}
try {
Runnable runnable = blockingQueue.take();
logger.info(LogUtils.format(" Fetch the tasks that the current thread pool has not been able to execute , runnable=<{0}>", runnable));
threadPoolExecutor.execute(runnable);
} catch (InterruptedException e) {
//
}
}
public static BlockingQueue<Runnable> getBlockingQueue() {
return BLOCKING_QUEUE;
}
}
2.3 Define the fetch class of the queue , Single thread is used here , No longer the main thread
@Component
public class CustomizeScheduler implements ApplicationListener<ContextRefreshedEvent> {
@Autowired
private ThreadReader threadReader;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setThreadFactory(new NamedThreadFactory("SchedulerTask")).build();
ScheduledExecutorService scheduledExecutor =
new ScheduledThreadPoolExecutor(1, threadFactory, new CustomizeRejectionPolicy());
// After every 1 Second, there is no time to execute tasks in the execution queue
scheduledExecutor.scheduleAtFixedRate(() -> threadReader.take(), 1, 1, TimeUnit.SECONDS);
}
}
2.4 Simulation test
First Thread pool The configuration is as follows , And then use jmeter simulation 2000 A request
@Configuration
public class ThreadPoolConfig {
@Bean("thread-pool")
public static ThreadPoolExecutor threadPoolExecutor() {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8, 16,1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(100),
new NamedThreadFactory("common-service"), new CustomizeRejectionPolicy());
threadPoolExecutor.allowCoreThreadTimeOut(true);
return threadPoolExecutor;
}
}
Then check the console , You can see the log printing , You can find Single thread every other 1 Seconds from the queue Tasks rejected by the thread pool , And add the task back to the thread pool , Re execution
3、 ... and . Related design patterns
The classic design pattern of multithreading is used here , Producer-Consumer Pattern , The thread pool submits a task that is denied execution , He is equivalent to a producer , Single thread is equivalent to consumer , Deal with these tasks , Intermediate class ThreadReader amount to Channel passageway , This design pattern can be explained in detail in the blog : Multithreading based design patterns Producer-Consumer Pattern
Why not directly introduce a single thread into the rejection policy to execute the rejected task ?
In this way, the performance of the program will be reduced , The thread pool only needs to add the rejected tasks to the queue , There is no need to care about how a single thread is scheduled , There is no need to wait for a single thread to process these tasks , You can immediately put the next task in the queue
边栏推荐
- QQ的药,腾讯的票
- Ways to improve the performance of raspberry pie
- Problems that cannot be accessed in MySQL LAN
- Split screen bug notes
- Error lnk2019: unresolved external symbol
- Talk about pseudo sharing
- Cloud detection 2020: self attention generation countermeasure network for cloud detection in high-resolution remote sensing images
- 学习突围2 - 关于高效学习的方法
- Centso7 OpenSSL error Verify return code: 20 (unable to get local issuer certificate)
- Vscade editor esp32 header file wavy line does not jump completely solved
猜你喜欢
Centso7 OpenSSL error Verify return code: 20 (unable to get local issuer certificate)
【Presto Profile系列】Timeline使用
Cloud detection 2020: self attention generation countermeasure network for cloud detection in high-resolution remote sensing images
[untitled]
Show the mathematical formula in El table
Vscode编辑器ESP32头文件波浪线不跳转彻底解决
About the problem of APP flash back after appium starts the app - (solved)
记一次 .NET 某新能源系统 线程疯涨 分析
数字ic设计——SPI
如何让join跑得更快?
随机推荐
抓细抓实抓好安全生产各项工作 全力确保人民群众生命财产安全
High end for 8 years, how is Yadi now?
【学习笔记】线段树选做
记一次 .NET 某新能源系统 线程疯涨 分析
JNA学习笔记一:概念
Mongodb meets spark (for integration)
PCAP学习笔记二:pcap4j源码笔记
工具箱之 IKVM.NET 项目新进展
Clion mingw64 Chinese garbled code
User management summary of mongodb
PACP学习笔记一:使用 PCAP 编程
Problems that cannot be accessed in MySQL LAN
1. Deep copy 2. Call apply bind 3. For of in differences
QQ的药,腾讯的票
单片机原理期末复习笔记
Split screen bug notes
Per capita Swiss number series, Swiss number 4 generation JS reverse analysis
Detr introduction
OSI seven layer model
【等保】云计算安全扩展要求关注的安全目标和实现方式区分原则有哪些?