当前位置:网站首页>Thread pool reject policy best practices
Thread pool reject policy best practices
2022-07-07 13:29:00 【Bug Commander】
Thread pool exhaustion occurs occasionally in projects on the front line , Recently, I finally have time to study it well , The problem is actually not complicated , Also thanks to Dubbo The rejection policy of thread pool can quickly find the general reason .
Through this question , I'm also curious about the thread pool rejection strategy used by various countries , Dig a hole 、 Dig soil , Let's see ~
The problem background
Thread pool exhaustion occurs occasionally on the front line , The phenomenon is as follows :
Downstream of the call Dubbo Interface , Tips Server The thread pool on the end is exhausted .
At first, I thought there was sudden traffic , But the monitoring shows that the flow is stable , And problems still exist after capacity expansion , Gradually realize that the problem is not simple .

Problem analysis
Since there are exception logs and stacks , First, let's see what scenario this exception will appear . stay Dubbo Source code , We can find this prompt in AbortPolicyWithReport in .
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy
AbortPolicyWithReport Inherited from java.util.concurrent.ThreadPoolExecutor.AbortPolicy, It is a thread pool rejection policy , When the buffer task queue in the thread pool is full , When the number of threads reaches the maximum , Will trigger the rejection policy , Invoking the rejection policy rejectedExecution() Methods to deal with .
that , What are the different rejection strategies ?
JDK Thread pool rejection policy
stay java.util.concurrent.ThreadPoolExecutor, We can find JDK Four preset rejection strategies :
- CallerRunsPolicy - Caller thread processing
Under this strategy , If the thread pool is not closed , Then it will be handled by the current caller thread , Otherwise, discard the task directly .
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
- AbortPolicy - Throw an exception
If the rejection policy is not configured , The thread pool will use this policy by default , Direct selling rejectedExecution, Leave it to the upper business .
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("...");
}
- DiscardPolicy - Discard the current task
The simplest way to deal with it , Direct discarding .
// The actual method body is empty , That is, it is not processed in this scenario , Direct discarding
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
- DiscardOldestPolicy - Discard the next task to perform
This strategy is to discard the oldest task in the queue ( In fact, it is the next task to be performed ), And try to perform the current task .
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
Dubbo Thread pool rejection policy
that Dubbo What is your rejection strategy ?
In fact, it can be seen from the name ,AbortPolicyWithReport.
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
...
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("Thread pool is EXHAUSTED!" + ...);
logger.warn(msg);
dumpJStack();
dispatchThreadPoolExhaustedEvent(msg);
throw new RejectedExecutionException(msg);
}
...
}
Dubbo The rejection strategy of is to throw an exception RejectedExecutionException, At the same time, I will do one thing - dumpJStack(), Record the time JVM Thread stack .
dumpJStack
Let's look at the source code first .
private void dumpJStack() {
// some dump Time interval and concurrency control
...
// Create a new one-way pool for dump Stack
ExecutorService pool = Executors.newSingleThreadExecutor();
pool.execute(() -> {
...
try (FileOutputStream jStackStream = new FileOutputStream(
new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr))) {
JVMUtil.jstack(jStackStream);
} catch (Throwable t) {
logger.error("dump jStack error", t);
}
...
});
...
}
It's very simple , The final call JVMUtil.jstack Put the present JVM The thread stack of dump Come down , And doing so has a great advantage , That is to know what other threads were doing at that time , Help analyze the cause of thread pool overflow .
Cause analysis
It's easy to have a thread stack , Look at what threads were doing at that time .
Dubbo Bottom use Netty Network communication , The thread pools involved include IO Thread pool (boss、worker) And business thread pool ( Handle business events ). From the previous log, we can see that Server Business thread pool on the end , namely DubboServerHandler Run out of , Then count , See what threads are doing .
Obviously , A large number of threads are blocked in getting DB Connected to the . Then it's easy to do , You can check whether slow queries occupy the connection for a long time in the same time period , Or is it true that the connection pool is small , The thread pool and connection pool are mismatched , So far, the analysis will not continue ( It's not the point of discussion, haha ).
Different rejection strategies
You can see ,Dubbo By rewriting the reject policy , To help locate problems in abnormal scenarios , It has helped a lot .
So how do other mainstream components do it ?
RocketMQ
With Broker For example , It contains many thread pools for different message processing scenarios , contain send、put、pull、query wait .
Use of process pool ,RocketMQ adopt BrokerFixedThreadPoolExecutor Inheritance encapsulates a layer ThreadPoolExecutor, The upper layer can pass in parameters by itself , It also includes configurable RejectedExecutionHandler.
Actually Broker When creating different thread pools for message processing , No special rejection policy is specified , So the default is AbortPolicy, Throw an exception .
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getSendMessageThreadPoolNums(),
this.brokerConfig.getSendMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.sendThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_")
// No rejection policy is set
);
At the same time, in order to avoid task overflow , A larger task queue size is set for each thread pool by default .
private int sendThreadPoolQueueCapacity = 10000;
private int putThreadPoolQueueCapacity = 10000;
private int pullThreadPoolQueueCapacity = 100000;
private int replyThreadPoolQueueCapacity = 10000;
private int queryThreadPoolQueueCapacity = 20000;
private int clientManagerThreadPoolQueueCapacity = 1000000;
private int consumerManagerThreadPoolQueueCapacity = 1000000;
private int heartbeatThreadPoolQueueCapacity = 50000;
private int endTransactionPoolQueueCapacity = 100000;
Sum up ,RocketMQ The rejection strategy of uses AbortPolicy, Throw an exception , At the same time, in order to avoid task queue overflow , Set a large task queue .
Netty
With EventLoopGroup For example , By default, the reject policy of the thread pool uses RejectedExecutionHandlers, Provide... Through singleton mode Handler To deal with .
public final class RejectedExecutionHandlers {
private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() {
@Override
public void rejected(Runnable task, SingleThreadEventExecutor executor) {
throw new RejectedExecutionException();
}
};
private RejectedExecutionHandlers() { }
public static RejectedExecutionHandler reject() {
return REJECT;
}
...
}
It can be seen that ,Netty By default, the rejection policy of also throws an exception , And RocketMQ The difference in comparison is , The size of the task queue will take max(16, maxPendingTasks),io.netty.eventLoop.maxPengdingTasks It can be configured through environment variables .
Doris
The team has been using Doris, It belongs to computing storage separation 、MPP Analytical storage components of the architecture , Take a look at FE Rejection strategy in , Officially, there are two :
LogDiscardPolicy
static class LogDiscardPolicy implements RejectedExecutionHandler {
private static final Logger LOG = LogManager.getLogger(LogDiscardPolicy.class);
private String threadPoolName;
public LogDiscardPolicy(String threadPoolName) {
this.threadPoolName = threadPoolName;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
LOG.warn("Task " + r.toString() + " rejected from " + threadPoolName + " " + executor.toString());
}
}
It's understandable that DiscardPolicy, Discarding the task , Simultaneous recording warn journal .
BlockedPolicy
static class BlockedPolicy implements RejectedExecutionHandler {
private String threadPoolName;
private int timeoutSeconds;
public BlockedPolicy(String threadPoolName, int timeoutSeconds) {
this.threadPoolName = threadPoolName;
this.timeoutSeconds = timeoutSeconds;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
executor.getQueue().offer(r, timeoutSeconds, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.warn("Task " + r.toString() + " wait to enqueue in " + threadPoolName + " " + executor.toString() + " failed");
}
}
}
This strategy will be special , It will block the current thread , Try your best to put the task in the queue . If the specified blocking time is exceeded timeoutSeconds( Default 60s), Still unable to put the task in the queue , Record warn journal , And discard the task .
These two strategies are Doris It is actually used in , At the same time, the task queue size of the thread pool is set to 10.
ElasticSearch
ES The rejection strategy of is relatively complex , Its customization implements two rejection policies .
- EsAbortPolicy
public class EsAbortPolicy extends EsRejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (r instanceof AbstractRunnable) {
if (((AbstractRunnable) r).isForceExecution()) {
BlockingQueue<Runnable> queue = executor.getQueue();
if ((queue instanceof SizeBlockingQueue) == false) {
throw new IllegalStateException("forced execution, but expected a size queue");
}
try {
((SizeBlockingQueue<Runnable>) queue).forcePut(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("forced execution, but got interrupted", e);
}
return;
}
}
incrementRejections();
throw newRejectedException(r, executor, executor.isShutdown());
}
}
It's essentially AbortPolicy, But there will be some special treatment , Include forceExecution Judgment of enforcement 、 Statistics of task rejection times , Finally throw an exception .
ES in , Thread pool
forceExecutionWhat is it? ?When the conditions are met , That is to use ES Self defined
AbstractRunnablePerform task encapsulation 、SizeBlockingQueueAs a task queue , You can determine whether to force it into the task queue according to the task configuration . For some important tasks , When it cannot be discarded , Can beforceExecutionSet to true.The effect of being forced into the task queue depends on
SizeBlockingQueueQueue types encapsulated in , If the package isArrayBlockingQueue, Then the waiting queue will be blocked and free ; If the package isLinkedTransferQueue, Because the queue size is infinite , And put It uses ASYNC Pattern , So it will be put in the queue immediately and return .
- ForceQueuePolicy
static class ForceQueuePolicy extends EsRejectedExecutionHandler {
...
@Override
public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
if (rejectAfterShutdown) {
if (executor.isShutdown()) {
reject(executor, task);
} else {
put(executor, task);
if (executor.isShutdown() && executor.remove(task)) {
reject(executor, task);
}
}
} else {
put(executor, task);
}
}
private static void put(ThreadPoolExecutor executor, Runnable task) {
final BlockingQueue<Runnable> queue = executor.getQueue();
// force queue policy should only be used with a scaling queue
assert queue instanceof ExecutorScalingQueue;
try {
queue.put(task);
} catch (final InterruptedException e) {
assert false : "a scaling queue never blocks so a put to it can never be interrupted";
throw new AssertionError(e);
}
}
private void reject(ThreadPoolExecutor executor, Runnable task) {
incrementRejections();
throw newRejectedException(task, executor, true);
}
}
}
This policy does not close the thread pool , And used ES Self defined ExecutorScalingQueue Task queue of , Will force the task to be placed in the thread pool queue . among ,ExecutorScalingQueue It's also inherited from LinkedTransferQueue, The final call put Methods to ASYNC Put the mode into the task queue .
It looks like
forceExecution, And they all end up usingLinkedTransferQueueOf put Methods to ASYNC Mode non blocking queue . thatEsAbortPolicyandForceQueuePolicyWhat's the difference ?There are many similarities between the two , There are
forceExecutionThe judgment of the , And when you refuse, you throwRejectedExecutionException.The difference is ,
ForceQueuePolicyThe default mode is enforcement , And it is still possible to put tasks into the queue when the process pool is closed .
other
stay GitHub I flipped it casually , There are also ways to use the strategy chain , The implementation is also simple , Different strategies can be combined and configured at will .
public class RejectedExecutionChainPolicy implements RejectedExecutionHandler {
private final RejectedExecutionHandler[] handlerChain;
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
for (RejectedExecutionHandler handler : handlerChain) {
handler.rejectedExecution(r, executor);
}
}
}
summary
The rejection policy is mainly used in the case of resource overflow in the thread pool , In addition to the common reasons JDK In addition to the four rejection strategies provided , Different components will also try to use different rejection strategies to apply .
JDK The rejection policy provided
| type | explain |
|---|---|
| CallerRunsPolicy | JDK Provide , Caller thread processing |
| AbortPolicy | JDK Thread pool is used by default , Throw out RejectedExecutionException abnormal |
| DiscardPolicy | JDK Provide , Discard the current task |
| DiscardOldestPolicy | JDK Provide , Discard the next task to perform |
Custom reject policy
| Components | type | explain |
|---|---|---|
| RocketMQ | AbortPolicy | The thread pool used is slightly rejected by default , namely AbortPolicy |
| Dubbo | AbortPolicyWithReport | Throw out RejectedExecutionException abnormal , And report overflow , Record JVM Thread stack |
| Netty | RejectedExecutionHandlers | Logic and AbortPolicy Agreement , Throw an exception , Encapsulation is a single example Handler Use |
| Doris | LogDiscardPolicy | Logic and DiscardPolicy Agreement , Discarding the task , And record warn journal |
| Doris | BlockedPolicy | Try your best to put the task in the queue for execution , Waiting for the most 60s, Record after timeout warn journal , And discard the task |
| Elastic | EsAbortPolicy | Under normal circumstances, it is similar to AbortPolicy Agreement , If thread marking is enforced , Then it is forced to execute or put into the task queue , The performance of the actual queue depends on the queue type , May block or return immediately |
| Elastic | ForceQueuePolicy | By default, the task is enforced or placed in the task queue , Asynchronous non-blocking |
| other | PolicyChain | Policy chain , Contains a variety of rejection policies , Determine the final performance according to the conditions and node processing results |
边栏推荐
猜你喜欢
随机推荐
JS determines whether an object is empty
JS缓动动画原理教学(超细节)
MongoDB复制(副本集)总结
服务器到服务器 (S2S) 事件 (Adjust)
单片机原理期末复习笔记
ESP32构解工程添加组件
Ogre入门尝鲜
Pay close attention to the work of safety production and make every effort to ensure the safety of people's lives and property
[untitled]
PAcP learning note 1: programming with pcap
shell 批量文件名(不含扩展名)小写改大写
Practical example of propeller easydl: automatic scratch recognition of industrial parts
Digital IC Design SPI
分布式事务解决方案
PACP学习笔记三:PCAP方法说明
leecode3. 无重复字符的最长子串
提升树莓派性能的方法
Navicat运行sql文件导入数据不全或导入失败
QQ的药,腾讯的票
About the problem of APP flash back after appium starts the app - (solved)

![[learning notes] agc010](/img/2c/37f2537a4dadd84adacf3da5f1327a.png)







