当前位置:网站首页>自定义线程池拒绝策略
自定义线程池拒绝策略
2022-07-07 11:05:00 【BUG指挥官】
一. 默认的拒绝策略
ThreadPoolExceutor.AbortPolicy : 丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。 ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务
ThreadPoolExecutor.CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务
一般来说, 默认的这4种拒绝策略, 在一些场景是不适用的, 比如我们不想丢弃任何任务, 那么前3种拒绝策略就不适用了
第4种拒绝策略比较特殊, 当执行该策略后, 会把当前线程交给 提交任务的线程 去执行这个任务, 比如说你是通过 主线程 开启的线程任务, 那么这个拒绝策略就是把 被线程池拒绝执行的任务, 反手交给 主线程执行, 并发量高的场景下, 会阻塞主线程, 所以也不是很适用, 下面使用伪代码自定义拒绝策略
二. 自定义拒绝策略
如果不想丢弃该任务和不想交给主线程来执行该任务, 我们是不是可以把这些被拒绝执行的任务收集起来? 所以可以考虑使用缓存容器, 并且一定要是并发安全的缓存容器, 所以可以使用 juc 包下的 java.util.concurrent.LinkedBlockingQueue, 先介绍下这个类
LinkedBlockingQueue是一个单向链表实现的阻塞队列, 该队列按 FIFO(先进先出)排序元素, 新元素插入到队列的尾部, 并且队列获取操作会获得位于队列头部的元素, 链接队列的吞吐量通常要高于基于数组的队列, 但是在大多数并发应用程序中, 其可预知的性能要低
此外, LinkedBlockingQueue还是可选容量的(防止过度膨胀), 即可以指定队列的容量, 如果不指定, 默认容量大小等于Integer.MAX_VALUE, 我这里就不指定队列的容量了
LinkedBlockingQueue在实现 "多线程对竞争资源的互斥访问" 时, 对于该类中的 put 和 take 方法操作分别使用了不同的锁, 对于 put 操作, 通过 "插入锁 putLock" 进行同步, 对于取出操作, 通过 "取出锁takeLock" 进行同步
若某线程(代码中模拟是 SchedulerTask)要取出数据时, 队列正好为空, 则该线程会执 notEmpty.await()进行等待, 当其它某个线程(代码中模拟是线程池的拒绝任务)向队列中插入了数据之后, 会调用 notEmpty.signal() 唤醒 "notEmpty上的等待线程”, 此时, (代码中模拟是 SchedulerTask)会被唤醒从而得以继续运行, 此外, (代码中模拟是 SchedulerTask)在执行取操作前, 会获取 takeLock, 在取操作执行完毕再释放 takeLock
若某线程(代码中模拟是线程池的拒绝任务)要插入数据时, 队列已满, 则该线程会它执行 notFull.await() 进行等待, 当其它某个线程(代码中模拟是 SchedulerTask)取出数据之后,会调用notFull.signal() 唤醒 "notFull上的等待线程", 此时, (代码中模拟是线程池的拒绝任务)就会被唤醒从而得以继续运行, 此外, (代码中模拟是线程池的拒绝任务)在执行插入操作前,会获取 putLock, 在插入操作执行完毕才释放 putLock
2.1 首先自定义拒绝策略实现
自定义一个类实现 java.util.concurrent.RejectedExecutionHandler 接口
public class CustomizeRejectionPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (r != null) {
// 线程池没来得及执行的任务先放入队列
ThreadReader.put(r);
}
}
}
2.2 实现一个中间类, 用于声明 LinkedBlockingQueue
@Component
public class ThreadReader {
private static final Logger logger = LoggerFactory.getLogger(ThreadReader.class);
private static final BlockingQueue<Runnable> BLOCKING_QUEUE = new LinkedBlockingQueue<>();
@Qualifier("thread-pool")
@Autowired
private ThreadPoolExecutor threadPoolExecutor;
public static void put(Runnable runnable) {
BlockingQueue<Runnable> blockingQueue = getBlockingQueue();
try {
blockingQueue.put(runnable);
} catch (InterruptedException e) {
// 忽略
}
}
public void take() {
BlockingQueue<Runnable> blockingQueue = getBlockingQueue();
if (CollectionUtils.isEmpty(blockingQueue)) {
return;
}
try {
Runnable runnable = blockingQueue.take();
logger.info(LogUtils.format("取出当前线程池没来得及执行的任务, runnable=<{0}>", runnable));
threadPoolExecutor.execute(runnable);
} catch (InterruptedException e) {
//
}
}
public static BlockingQueue<Runnable> getBlockingQueue() {
return BLOCKING_QUEUE;
}
}
2.3 定义该队列的取出类, 这里使用的是单线程, 不再是主线程
@Component
public class CustomizeScheduler implements ApplicationListener<ContextRefreshedEvent> {
@Autowired
private ThreadReader threadReader;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setThreadFactory(new NamedThreadFactory("SchedulerTask")).build();
ScheduledExecutorService scheduledExecutor =
new ScheduledThreadPoolExecutor(1, threadFactory, new CustomizeRejectionPolicy());
// 之后每隔1秒执行队列中没有来得及执行的任务
scheduledExecutor.scheduleAtFixedRate(() -> threadReader.take(), 1, 1, TimeUnit.SECONDS);
}
}
2.4 模拟测试
首先线程池配置如下, 然后使用 jmeter 模拟2000个请求
@Configuration
public class ThreadPoolConfig {
@Bean("thread-pool")
public static ThreadPoolExecutor threadPoolExecutor() {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8, 16,1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(100),
new NamedThreadFactory("common-service"), new CustomizeRejectionPolicy());
threadPoolExecutor.allowCoreThreadTimeOut(true);
return threadPoolExecutor;
}
}
之后查看控制台, 可以看到日志打印情况, 可以发现 单线程 每隔1秒从队列中捞取一个被线程池拒绝执行的任务, 并把该任务重新加入到线程池中, 再执行
三. 相关设计模式
这里使用到了多线程的经典设计模式, Producer-Consumer模式, 线程池提交被拒绝执行的任务, 他相当于生产者, 单线程相当于消费者, 处理这些任务, 中间类 ThreadReader 相当于 Channel 通道, 此设计模式详解可以查看博客 : 多线程基础之设计模式Producer-Consumer模式
为什么不能直接在拒绝策略中引入单线程去执行被拒绝的任务?
这样程序的性能会降低, 线程池只需要把被拒绝的任务往队列中添加, 无需关心单线程怎么调度的, 也无需等待单线程对这些任务的处理, 就可以立即把下一个任务放入到队列中
边栏推荐
- COSCon'22 社区召集令来啦!Open the World,邀请所有社区一起拥抱开源,打开新世界~
- Aosikang biological sprint scientific innovation board of Hillhouse Investment: annual revenue of 450million yuan, lost cooperation with kangxinuo
- @Resource和@Autowired的区别?
- What if the xshell evaluation period has expired
- 【无标题】
- 通过Keil如何查看MCU的RAM与ROM使用情况
- ACL 2022 | 序列标注的小样本NER:融合标签语义的双塔BERT模型
- MySQL importing SQL files and common commands
- Day-19 IO stream
- 谷歌浏览器如何重置?谷歌浏览器恢复默认设置?
猜你喜欢
DHCP 动态主机设置协议 分析
Awk of three swordsmen in text processing
.Net下極限生產力之efcore分錶分庫全自動化遷移CodeFirst
Sequoia China completed the new phase of $9billion fund raising
Session
【学习笔记】zkw 线段树
Leetcode brush question: binary tree 24 (the nearest common ancestor of binary tree)
博文推荐|Apache Pulsar 跨地域复制方案选型实践
如何将 @Transactional 事务注解运用到炉火纯青?
ICLR 2022 | pre training language model based on anti self attention mechanism
随机推荐
AUTOCAD——大于180度的角度标注、CAD直径符号怎么输入?
Day22 deadlock, thread communication, singleton mode
.Net下极限生产力之efcore分表分库全自动化迁移CodeFirst
MySQL导入SQL文件及常用命令
HZOJ #235. 递归实现指数型枚举
Leetcode skimming: binary tree 25 (the nearest common ancestor of binary search tree)
聊聊Redis缓存4种集群方案、及优缺点对比
Guangzhou held work safety conference
关于 appium 启动 app 后闪退的问题 - (已解决)
Shortcut key of Bash
Go language learning notes - structure
Lingyunguang of Dachen and Xiaomi investment is listed: the market value is 15.3 billion, and the machine is implanted into the eyes and brain
飞桨EasyDL实操范例:工业零件划痕自动识别
Analysis of DHCP dynamic host setting protocol
HZOJ #236. 递归实现组合型枚举
ip2long与long2IP 分析
Smart cloud health listed: with a market value of HK $15billion, SIG Jingwei and Jingxin fund are shareholders
【无标题】
How to reset Firefox browser
Design and implementation of communication protocol