当前位置:网站首页>自定义线程池拒绝策略
自定义线程池拒绝策略
2022-07-07 11:05:00 【BUG指挥官】
一. 默认的拒绝策略
ThreadPoolExceutor.AbortPolicy : 丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。 ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务
ThreadPoolExecutor.CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务
一般来说, 默认的这4种拒绝策略, 在一些场景是不适用的, 比如我们不想丢弃任何任务, 那么前3种拒绝策略就不适用了
第4种拒绝策略比较特殊, 当执行该策略后, 会把当前线程交给 提交任务的线程 去执行这个任务, 比如说你是通过 主线程 开启的线程任务, 那么这个拒绝策略就是把 被线程池拒绝执行的任务, 反手交给 主线程执行, 并发量高的场景下, 会阻塞主线程, 所以也不是很适用, 下面使用伪代码自定义拒绝策略
二. 自定义拒绝策略
如果不想丢弃该任务和不想交给主线程来执行该任务, 我们是不是可以把这些被拒绝执行的任务收集起来? 所以可以考虑使用缓存容器, 并且一定要是并发安全的缓存容器, 所以可以使用 juc 包下的 java.util.concurrent.LinkedBlockingQueue, 先介绍下这个类
LinkedBlockingQueue是一个单向链表实现的阻塞队列, 该队列按 FIFO(先进先出)排序元素, 新元素插入到队列的尾部, 并且队列获取操作会获得位于队列头部的元素, 链接队列的吞吐量通常要高于基于数组的队列, 但是在大多数并发应用程序中, 其可预知的性能要低
此外, LinkedBlockingQueue还是可选容量的(防止过度膨胀), 即可以指定队列的容量, 如果不指定, 默认容量大小等于Integer.MAX_VALUE, 我这里就不指定队列的容量了
LinkedBlockingQueue在实现 "多线程对竞争资源的互斥访问" 时, 对于该类中的 put 和 take 方法操作分别使用了不同的锁, 对于 put 操作, 通过 "插入锁 putLock" 进行同步, 对于取出操作, 通过 "取出锁takeLock" 进行同步
若某线程(代码中模拟是 SchedulerTask)要取出数据时, 队列正好为空, 则该线程会执 notEmpty.await()进行等待, 当其它某个线程(代码中模拟是线程池的拒绝任务)向队列中插入了数据之后, 会调用 notEmpty.signal() 唤醒 "notEmpty上的等待线程”, 此时, (代码中模拟是 SchedulerTask)会被唤醒从而得以继续运行, 此外, (代码中模拟是 SchedulerTask)在执行取操作前, 会获取 takeLock, 在取操作执行完毕再释放 takeLock
若某线程(代码中模拟是线程池的拒绝任务)要插入数据时, 队列已满, 则该线程会它执行 notFull.await() 进行等待, 当其它某个线程(代码中模拟是 SchedulerTask)取出数据之后,会调用notFull.signal() 唤醒 "notFull上的等待线程", 此时, (代码中模拟是线程池的拒绝任务)就会被唤醒从而得以继续运行, 此外, (代码中模拟是线程池的拒绝任务)在执行插入操作前,会获取 putLock, 在插入操作执行完毕才释放 putLock
2.1 首先自定义拒绝策略实现
自定义一个类实现 java.util.concurrent.RejectedExecutionHandler 接口
public class CustomizeRejectionPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (r != null) {
// 线程池没来得及执行的任务先放入队列
ThreadReader.put(r);
}
}
}
2.2 实现一个中间类, 用于声明 LinkedBlockingQueue
@Component
public class ThreadReader {
private static final Logger logger = LoggerFactory.getLogger(ThreadReader.class);
private static final BlockingQueue<Runnable> BLOCKING_QUEUE = new LinkedBlockingQueue<>();
@Qualifier("thread-pool")
@Autowired
private ThreadPoolExecutor threadPoolExecutor;
public static void put(Runnable runnable) {
BlockingQueue<Runnable> blockingQueue = getBlockingQueue();
try {
blockingQueue.put(runnable);
} catch (InterruptedException e) {
// 忽略
}
}
public void take() {
BlockingQueue<Runnable> blockingQueue = getBlockingQueue();
if (CollectionUtils.isEmpty(blockingQueue)) {
return;
}
try {
Runnable runnable = blockingQueue.take();
logger.info(LogUtils.format("取出当前线程池没来得及执行的任务, runnable=<{0}>", runnable));
threadPoolExecutor.execute(runnable);
} catch (InterruptedException e) {
//
}
}
public static BlockingQueue<Runnable> getBlockingQueue() {
return BLOCKING_QUEUE;
}
}
2.3 定义该队列的取出类, 这里使用的是单线程, 不再是主线程
@Component
public class CustomizeScheduler implements ApplicationListener<ContextRefreshedEvent> {
@Autowired
private ThreadReader threadReader;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setThreadFactory(new NamedThreadFactory("SchedulerTask")).build();
ScheduledExecutorService scheduledExecutor =
new ScheduledThreadPoolExecutor(1, threadFactory, new CustomizeRejectionPolicy());
// 之后每隔1秒执行队列中没有来得及执行的任务
scheduledExecutor.scheduleAtFixedRate(() -> threadReader.take(), 1, 1, TimeUnit.SECONDS);
}
}
2.4 模拟测试
首先线程池配置如下, 然后使用 jmeter 模拟2000个请求
@Configuration
public class ThreadPoolConfig {
@Bean("thread-pool")
public static ThreadPoolExecutor threadPoolExecutor() {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8, 16,1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(100),
new NamedThreadFactory("common-service"), new CustomizeRejectionPolicy());
threadPoolExecutor.allowCoreThreadTimeOut(true);
return threadPoolExecutor;
}
}
之后查看控制台, 可以看到日志打印情况, 可以发现 单线程 每隔1秒从队列中捞取一个被线程池拒绝执行的任务, 并把该任务重新加入到线程池中, 再执行
三. 相关设计模式
这里使用到了多线程的经典设计模式, Producer-Consumer模式, 线程池提交被拒绝执行的任务, 他相当于生产者, 单线程相当于消费者, 处理这些任务, 中间类 ThreadReader 相当于 Channel 通道, 此设计模式详解可以查看博客 : 多线程基础之设计模式Producer-Consumer模式
为什么不能直接在拒绝策略中引入单线程去执行被拒绝的任务?
这样程序的性能会降低, 线程池只需要把被拒绝的任务往队列中添加, 无需关心单线程怎么调度的, 也无需等待单线程对这些任务的处理, 就可以立即把下一个任务放入到队列中
边栏推荐
猜你喜欢
《开源圆桌派》第十一期“冰与火之歌”——如何平衡开源与安全间的天然矛盾?
Leetcode brush questions: binary tree 19 (merge binary tree)
Leetcode skimming: binary tree 22 (minimum absolute difference of binary search tree)
Aosikang biological sprint scientific innovation board of Hillhouse Investment: annual revenue of 450million yuan, lost cooperation with kangxinuo
2022 polymerization process test question simulation test question bank and online simulation test
红杉中国完成新一期90亿美元基金募集
共创软硬件协同生态:Graphcore IPU与百度飞桨的“联合提交”亮相MLPerf
飞桨EasyDL实操范例:工业零件划痕自动识别
[crawler] avoid script detection when using selenium
ACL 2022 | 序列标注的小样本NER:融合标签语义的双塔BERT模型
随机推荐
飞桨EasyDL实操范例:工业零件划痕自动识别
Sample chapter of "uncover the secrets of asp.net core 6 framework" [200 pages /5 chapters]
[untitled]
[learn microservices from 0] [03] explore the microservice architecture
[untitled]
HZOJ #235. 递归实现指数型枚举
2022 polymerization process test question simulation test question bank and online simulation test
How to continue after handling chain interruption / sub chain error removed from scheduling
AUTOCAD——大于180度的角度标注、CAD直径符号怎么输入?
Go语言学习笔记-结构体(Struct)
非分区表转换成分区表以及注意事项
Importance of database security
【学习笔记】线段树选做
File operation command
[untitled]
. Net ultimate productivity of efcore sub table sub database fully automated migration codefirst
Day-19 IO stream
HZOJ #240. 图形打印四
Practical example of propeller easydl: automatic scratch recognition of industrial parts
如何将 @Transactional 事务注解运用到炉火纯青?