当前位置:网站首页>线程池拒绝策略最佳实践
线程池拒绝策略最佳实践
2022-07-07 11:05:00 【BUG指挥官】
之前线上项目偶发出现线程池耗尽的问题,最近终于有空能好好研究一把,问题实际并不复杂,也得益于Dubbo线程池的拒绝策略才能很快找到大致的原因。
通过这个问题,也有些好奇各家使用的线程池拒绝策略是怎样的,刨刨坑、挖挖土,一起来看看吧~
问题背景
之前线上偶发出现线程池耗尽问题,现象如下:
在调用下游Dubbo接口时,提示Server端的线程池耗尽。
最开始以为是有突发流量,但是监控显示流量稳定,并且扩容后发现问题依然存在,渐渐意识到问题并不简单。

问题分析
既然有异常日志和堆栈,先看看到底什么场景下会出现这个异常。在Dubbo源码中,我们可以找到这一段提示出现在AbortPolicyWithReport中。
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy
AbortPolicyWithReport继承自 java.util.concurrent.ThreadPoolExecutor.AbortPolicy,是一种线程池拒绝策略,当线程池中的缓冲任务队列满,且线程数量达到最大时,就会触发拒绝策略,调用拒绝策略的rejectedExecution()方法进行处理。
那么,有哪些不同的拒绝策略呢?
JDK线程池拒绝策略
在java.util.concurrent.ThreadPoolExecutor,我们可以找到JDK预设置的四种拒绝策略:
- CallerRunsPolicy - 调用者线程处理
该策略下,如果线程池未关闭,则交由当前调用者线程进行处理,否则直接丢弃任务。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
- AbortPolicy - 抛出异常
如果不配置拒绝策略的话,线程池会默认使用该策略,直接抛出rejectedExecution,交由上层业务处理。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("...");
}
- DiscardPolicy - 丢弃当前任务
最简单的处理方法,直接丢弃。
//实际方法体就是空的,即该场景下不处理,直接丢弃
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
- DiscardOldestPolicy - 丢弃下一个要执行的任务
该策略是丢弃队列中最老的任务(其实就是下一个要执行的任务),并尝试执行当前任务。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
Dubbo线程池拒绝策略
那么Dubbo的拒绝策略是怎样的呢?
其实从名字就能看出来,AbortPolicyWithReport。
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
...
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("Thread pool is EXHAUSTED!" + ...);
logger.warn(msg);
dumpJStack();
dispatchThreadPoolExhaustedEvent(msg);
throw new RejectedExecutionException(msg);
}
...
}
Dubbo的拒绝策略是抛出异常RejectedExecutionException,同时还会做一件事情 - dumpJStack(),记录下当时的JVM线程堆栈。
dumpJStack
先来看看源码。
private void dumpJStack() {
//一些dump时间间隔和并发控制
...
//新建单线程池用于dump堆栈
ExecutorService pool = Executors.newSingleThreadExecutor();
pool.execute(() -> {
...
try (FileOutputStream jStackStream = new FileOutputStream(
new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr))) {
JVMUtil.jstack(jStackStream);
} catch (Throwable t) {
logger.error("dump jStack error", t);
}
...
});
...
}
做法其实很简单,最终调用JVMUtil.jstack把当前JVM的线程堆栈dump下来,而这样做有一个很大的好处,就是能知道当时其他线程到底在做什么,帮助分析线程池溢出的原因。
原因分析
有线程堆栈就好办了,看看当时线程都在做什么。
Dubbo底层使用Netty实现网络通信,涉及的线程池包括IO线程池(boss、worker)和业务线程池(处理业务事件)。通过之前的日志可以看到是Server端的业务线程池,即DubboServerHandler耗尽,那么统计一把,看看线程都在做什么。
很明显,大量的线程都阻塞在获取DB连接上。那接下来就好办了,可以看看同时间段是不是有慢查询长时间占住了连接,或者是真的连接池小了,线程池和连接池配比不对,分析至此就不继续展开(并不是讨论重点哈哈)。
不同的拒绝策略
可以看到,Dubbo通过重写了拒绝了策略,来帮助异常场景下进行问题定位,带来了很大的帮助。
那么其他主流组件是怎么做的呢?
RocketMQ
以Broker为例,其中包含了非常多的线程池用于处理不同的消息处理场景,包含send、put、pull、query等等。
在线程池的使用上,RocketMQ通过BrokerFixedThreadPoolExecutor继承封装了一层ThreadPoolExecutor,上层可以自行传入参数,其中也包含了可配置的RejectedExecutionHandler。
实际在Broker创建消息处理的不同线程池时,并没有指定特殊的拒绝策略,所以使用的是默认的AbortPolicy,即抛出异常。
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getSendMessageThreadPoolNums(),
this.brokerConfig.getSendMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.sendThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_")
//并没有设置拒绝策略
);
同时为了避免任务溢出,为每个线程池默认设置了较大的任务队列大小。
private int sendThreadPoolQueueCapacity = 10000;
private int putThreadPoolQueueCapacity = 10000;
private int pullThreadPoolQueueCapacity = 100000;
private int replyThreadPoolQueueCapacity = 10000;
private int queryThreadPoolQueueCapacity = 20000;
private int clientManagerThreadPoolQueueCapacity = 1000000;
private int consumerManagerThreadPoolQueueCapacity = 1000000;
private int heartbeatThreadPoolQueueCapacity = 50000;
private int endTransactionPoolQueueCapacity = 100000;
综上,RocketMQ的拒绝策略使用了AbortPolicy,即抛出异常,同时为了避免任务队列溢出,设置了较大的任务队列。
Netty
以EventLoopGroup为例,线程池的拒绝策略默认使用RejectedExecutionHandlers,通过单例模式提供Handler进行处理。
public final class RejectedExecutionHandlers {
private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() {
@Override
public void rejected(Runnable task, SingleThreadEventExecutor executor) {
throw new RejectedExecutionException();
}
};
private RejectedExecutionHandlers() { }
public static RejectedExecutionHandler reject() {
return REJECT;
}
...
}
可以看出,Netty的拒绝策略默认也是抛出异常,与RocketMQ对比的不同的点在于,任务队列的大小会取max(16, maxPendingTasks),io.netty.eventLoop.maxPengdingTasks可通过环境变量进行配置。
Doris
团队内一直在用Doris,属于计算存储分离、MPP架构的分析型存储组件,看了一眼FE中的拒绝策略,官方实现了两种:
LogDiscardPolicy
static class LogDiscardPolicy implements RejectedExecutionHandler {
private static final Logger LOG = LogManager.getLogger(LogDiscardPolicy.class);
private String threadPoolName;
public LogDiscardPolicy(String threadPoolName) {
this.threadPoolName = threadPoolName;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
LOG.warn("Task " + r.toString() + " rejected from " + threadPoolName + " " + executor.toString());
}
}
可以理解就是DiscardPolicy,丢弃任务,同时记录warn日志。
BlockedPolicy
static class BlockedPolicy implements RejectedExecutionHandler {
private String threadPoolName;
private int timeoutSeconds;
public BlockedPolicy(String threadPoolName, int timeoutSeconds) {
this.threadPoolName = threadPoolName;
this.timeoutSeconds = timeoutSeconds;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
executor.getQueue().offer(r, timeoutSeconds, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.warn("Task " + r.toString() + " wait to enqueue in " + threadPoolName + " " + executor.toString() + " failed");
}
}
}
这种策略会特殊一些,它会阻塞住当前线程,尽最大努力尝试将任务放入队列中。如果超过指定的阻塞时间timeoutSeconds(默认60s),仍然无法将任务放入队列中,则记录warn日志,并丢弃任务。
这两种策略在Doris中都有实际使用到,同时线程池的任务队列大小默认设置为10。
ElasticSearch
ES的拒绝策略相对复杂一些,其自定义实现了两种拒绝策略。
- EsAbortPolicy
public class EsAbortPolicy extends EsRejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (r instanceof AbstractRunnable) {
if (((AbstractRunnable) r).isForceExecution()) {
BlockingQueue<Runnable> queue = executor.getQueue();
if ((queue instanceof SizeBlockingQueue) == false) {
throw new IllegalStateException("forced execution, but expected a size queue");
}
try {
((SizeBlockingQueue<Runnable>) queue).forcePut(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("forced execution, but got interrupted", e);
}
return;
}
}
incrementRejections();
throw newRejectedException(r, executor, executor.isShutdown());
}
}
其实本质上就是AbortPolicy,但是会进行一些特殊处理,包括forceExecution强制执行的判断、任务拒绝次数统计,最终抛出异常。
ES中,线程池的
forceExecution是指什么?在满足条件的情况下,即使用了ES自定义的
AbstractRunnable进行任务封装、SizeBlockingQueue作为任务队列时,可以根据任务配置判断是否强制放入任务队列。对于一些比较重要的任务,不能丢弃时,可以将forceExecution设置为true。强制放入任务队列带来的效果取决于
SizeBlockingQueue中封装的队列类型,如果封装的是ArrayBlockingQueue,则会阻塞等待队列有空余;如果封装的是LinkedTransferQueue,由于队列大小无限,且put使用的是ASYNC模式,所以会立刻放入队列并返回。
- ForceQueuePolicy
static class ForceQueuePolicy extends EsRejectedExecutionHandler {
...
@Override
public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
if (rejectAfterShutdown) {
if (executor.isShutdown()) {
reject(executor, task);
} else {
put(executor, task);
if (executor.isShutdown() && executor.remove(task)) {
reject(executor, task);
}
}
} else {
put(executor, task);
}
}
private static void put(ThreadPoolExecutor executor, Runnable task) {
final BlockingQueue<Runnable> queue = executor.getQueue();
// force queue policy should only be used with a scaling queue
assert queue instanceof ExecutorScalingQueue;
try {
queue.put(task);
} catch (final InterruptedException e) {
assert false : "a scaling queue never blocks so a put to it can never be interrupted";
throw new AssertionError(e);
}
}
private void reject(ThreadPoolExecutor executor, Runnable task) {
incrementRejections();
throw newRejectedException(task, executor, true);
}
}
}
该策略在线程池未关闭,且使用了ES自定义的ExecutorScalingQueue的任务队列时,会强制将任务放入线程池队列中。其中,ExecutorScalingQueue也是继承自LinkedTransferQueue,最终调用put方法以ASYNC模式放入任务队列中。
看上去也是
forceExecution,而且最终都是使用LinkedTransferQueue的put方法以ASYNC模式非阻塞入队列。那么EsAbortPolicy和ForceQueuePolicy有什么不同呢?两者有很多相似点,都有
forceExecution的判断,而且拒绝时都是抛出RejectedExecutionException。不同点在于,
ForceQueuePolicy默认采用强制执行模式,且在线程池关闭时依然可能往队列放入任务。
其他
在GitHub上随意翻了一下,也有看到用策略链的方式,实现也很简单,可以随意组合配置不同的策略。
public class RejectedExecutionChainPolicy implements RejectedExecutionHandler {
private final RejectedExecutionHandler[] handlerChain;
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
for (RejectedExecutionHandler handler : handlerChain) {
handler.rejectedExecution(r, executor);
}
}
}
总结
拒绝策略主要应用在线程池出现资源溢出的情况下,除了常见的由JDK提供的四种拒绝策略外,不同的组件也会尝试使用不同的拒绝策略来应用。
JDK提供的拒绝策略
| 类型 | 说明 |
|---|---|
| CallerRunsPolicy | JDK提供,调用者线程处理 |
| AbortPolicy | JDK线程池默认使用,抛出RejectedExecutionException异常 |
| DiscardPolicy | JDK提供,丢弃当前任务 |
| DiscardOldestPolicy | JDK提供,丢弃下一个要执行的任务 |
自定义拒绝策略
| 组件 | 类型 | 说明 |
|---|---|---|
| RocketMQ | AbortPolicy | 使用的线程池默认拒绝的略,即AbortPolicy |
| Dubbo | AbortPolicyWithReport | 抛出RejectedExecutionException异常,并报告溢出,记录下JVM线程堆栈 |
| Netty | RejectedExecutionHandlers | 逻辑与AbortPolicy一致,抛出异常,封装为单例Handler使用 |
| Doris | LogDiscardPolicy | 逻辑与DiscardPolicy一致,丢弃任务,并记录warn日志 |
| Doris | BlockedPolicy | 尽最大努力尝试将任务放入队列执行,最多等待60s,超时后记录warn日志,并丢弃任务 |
| Elastic | EsAbortPolicy | 正常情况下与AbortPolicy一致,如果线程标记强制执行,则强制执行或放入任务队列,实际入队列的表现取决于队列类型,可能阻塞或立即返回 |
| Elastic | ForceQueuePolicy | 默认强制执行任务或放入任务队列,异步非阻塞 |
| 其他 | PolicyChain | 策略链,包含多种拒绝策略,根据条件与节点处理结果决定最终表现 |
边栏推荐
- 《ASP.NET Core 6框架揭秘》样章[200页/5章]
- . Net ultimate productivity of efcore sub table sub database fully automated migration codefirst
- [untitled]
- @Resource和@Autowired的区别?
- HZOJ #236. Recursive implementation of combinatorial enumeration
- test
- 《ASP.NET Core 6框架揭秘》样章[200页/5章]
- AUTOCAD——大于180度的角度标注、CAD直径符号怎么输入?
- Per capita Swiss number series, Swiss number 4 generation JS reverse analysis
- Blog recommendation | Apache pulsar cross regional replication scheme selection practice
猜你喜欢

Day-18 hash table, generic

Leetcode skimming: binary tree 23 (mode in binary search tree)

Differences between MySQL storage engine MyISAM and InnoDB

人均瑞数系列,瑞数 4 代 JS 逆向分析

ACL 2022 | small sample ner of sequence annotation: dual tower Bert model integrating tag semantics

聊聊Redis缓存4种集群方案、及优缺点对比

Leetcode skimming: binary tree 21 (verifying binary search tree)

飞桨EasyDL实操范例:工业零件划痕自动识别

COSCon'22 社区召集令来啦!Open the World,邀请所有社区一起拥抱开源,打开新世界~

2022a special equipment related management (boiler, pressure vessel and pressure pipeline) simulated examination question bank simulated examination platform operation
随机推荐
mysql怎么创建,删除,查看索引?
About the problem of APP flash back after appium starts the app - (solved)
How to reset Firefox browser
企业级自定义表单引擎解决方案(十二)--体验代码目录结构
ORACLE进阶(五)SCHEMA解惑
Shortcut key of Bash
事务的七种传播行为
《ASP.NET Core 6框架揭秘》样章[200页/5章]
如何将 @Transactional 事务注解运用到炉火纯青?
Practical case: using MYCAT to realize read-write separation of MySQL
Day22 deadlock, thread communication, singleton mode
leecode3. 无重复字符的最长子串
“新红旗杯”桌面应用创意大赛2022
File operation command
HZOJ #235. Recursive implementation of exponential enumeration
What if the xshell evaluation period has expired
Day-24 UDP, regular expression
关于 appium 如何关闭 app (已解决)
Conversion from non partitioned table to partitioned table and precautions
Leetcode brush questions: binary tree 19 (merge binary tree)