当前位置:网站首页>Thread pool reject policy best practices
Thread pool reject policy best practices
2022-07-07 13:29:00 【Bug Commander】
Thread pool exhaustion occurs occasionally in projects on the front line , Recently, I finally have time to study it well , The problem is actually not complicated , Also thanks to Dubbo The rejection policy of thread pool can quickly find the general reason .
Through this question , I'm also curious about the thread pool rejection strategy used by various countries , Dig a hole 、 Dig soil , Let's see ~
The problem background
Thread pool exhaustion occurs occasionally on the front line , The phenomenon is as follows :
Downstream of the call Dubbo
Interface , Tips Server
The thread pool on the end is exhausted .
At first, I thought there was sudden traffic , But the monitoring shows that the flow is stable , And problems still exist after capacity expansion , Gradually realize that the problem is not simple .
Problem analysis
Since there are exception logs and stacks , First, let's see what scenario this exception will appear . stay Dubbo
Source code , We can find this prompt in AbortPolicyWithReport
in .
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy
AbortPolicyWithReport
Inherited from java.util.concurrent.ThreadPoolExecutor.AbortPolicy, It is a thread pool rejection policy , When the buffer task queue in the thread pool is full , When the number of threads reaches the maximum , Will trigger the rejection policy , Invoking the rejection policy rejectedExecution()
Methods to deal with .
that , What are the different rejection strategies ?
JDK Thread pool rejection policy
stay java.util.concurrent.ThreadPoolExecutor
, We can find JDK Four preset rejection strategies :
- CallerRunsPolicy - Caller thread processing
Under this strategy , If the thread pool is not closed , Then it will be handled by the current caller thread , Otherwise, discard the task directly .
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
- AbortPolicy - Throw an exception
If the rejection policy is not configured , The thread pool will use this policy by default , Direct selling rejectedExecution
, Leave it to the upper business .
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("...");
}
- DiscardPolicy - Discard the current task
The simplest way to deal with it , Direct discarding .
// The actual method body is empty , That is, it is not processed in this scenario , Direct discarding
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
- DiscardOldestPolicy - Discard the next task to perform
This strategy is to discard the oldest task in the queue ( In fact, it is the next task to be performed ), And try to perform the current task .
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
Dubbo Thread pool rejection policy
that Dubbo
What is your rejection strategy ?
In fact, it can be seen from the name ,AbortPolicyWithReport
.
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
...
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("Thread pool is EXHAUSTED!" + ...);
logger.warn(msg);
dumpJStack();
dispatchThreadPoolExhaustedEvent(msg);
throw new RejectedExecutionException(msg);
}
...
}
Dubbo
The rejection strategy of is to throw an exception RejectedExecutionException
, At the same time, I will do one thing - dumpJStack()
, Record the time JVM Thread stack .
dumpJStack
Let's look at the source code first .
private void dumpJStack() {
// some dump Time interval and concurrency control
...
// Create a new one-way pool for dump Stack
ExecutorService pool = Executors.newSingleThreadExecutor();
pool.execute(() -> {
...
try (FileOutputStream jStackStream = new FileOutputStream(
new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr))) {
JVMUtil.jstack(jStackStream);
} catch (Throwable t) {
logger.error("dump jStack error", t);
}
...
});
...
}
It's very simple , The final call JVMUtil.jstack
Put the present JVM The thread stack of dump Come down , And doing so has a great advantage , That is to know what other threads were doing at that time , Help analyze the cause of thread pool overflow .
Cause analysis
It's easy to have a thread stack , Look at what threads were doing at that time .
Dubbo
Bottom use Netty
Network communication , The thread pools involved include IO Thread pool (boss、worker) And business thread pool ( Handle business events ). From the previous log, we can see that Server Business thread pool on the end , namely DubboServerHandler
Run out of , Then count , See what threads are doing .
Obviously , A large number of threads are blocked in getting DB Connected to the . Then it's easy to do , You can check whether slow queries occupy the connection for a long time in the same time period , Or is it true that the connection pool is small , The thread pool and connection pool are mismatched , So far, the analysis will not continue ( It's not the point of discussion, haha ).
Different rejection strategies
You can see ,Dubbo
By rewriting the reject policy , To help locate problems in abnormal scenarios , It has helped a lot .
So how do other mainstream components do it ?
RocketMQ
With Broker
For example , It contains many thread pools for different message processing scenarios , contain send、put、pull、query wait .
Use of process pool ,RocketMQ
adopt BrokerFixedThreadPoolExecutor
Inheritance encapsulates a layer ThreadPoolExecutor
, The upper layer can pass in parameters by itself , It also includes configurable RejectedExecutionHandler
.
Actually Broker
When creating different thread pools for message processing , No special rejection policy is specified , So the default is AbortPolicy
, Throw an exception .
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getSendMessageThreadPoolNums(),
this.brokerConfig.getSendMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.sendThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_")
// No rejection policy is set
);
At the same time, in order to avoid task overflow , A larger task queue size is set for each thread pool by default .
private int sendThreadPoolQueueCapacity = 10000;
private int putThreadPoolQueueCapacity = 10000;
private int pullThreadPoolQueueCapacity = 100000;
private int replyThreadPoolQueueCapacity = 10000;
private int queryThreadPoolQueueCapacity = 20000;
private int clientManagerThreadPoolQueueCapacity = 1000000;
private int consumerManagerThreadPoolQueueCapacity = 1000000;
private int heartbeatThreadPoolQueueCapacity = 50000;
private int endTransactionPoolQueueCapacity = 100000;
Sum up ,RocketMQ
The rejection strategy of uses AbortPolicy
, Throw an exception , At the same time, in order to avoid task queue overflow , Set a large task queue .
Netty
With EventLoopGroup
For example , By default, the reject policy of the thread pool uses RejectedExecutionHandlers
, Provide... Through singleton mode Handler
To deal with .
public final class RejectedExecutionHandlers {
private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() {
@Override
public void rejected(Runnable task, SingleThreadEventExecutor executor) {
throw new RejectedExecutionException();
}
};
private RejectedExecutionHandlers() { }
public static RejectedExecutionHandler reject() {
return REJECT;
}
...
}
It can be seen that ,Netty
By default, the rejection policy of also throws an exception , And RocketMQ
The difference in comparison is , The size of the task queue will take max(16, maxPendingTasks)
,io.netty.eventLoop.maxPengdingTasks
It can be configured through environment variables .
Doris
The team has been using Doris
, It belongs to computing storage separation 、MPP Analytical storage components of the architecture , Take a look at FE Rejection strategy in , Officially, there are two :
LogDiscardPolicy
static class LogDiscardPolicy implements RejectedExecutionHandler {
private static final Logger LOG = LogManager.getLogger(LogDiscardPolicy.class);
private String threadPoolName;
public LogDiscardPolicy(String threadPoolName) {
this.threadPoolName = threadPoolName;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
LOG.warn("Task " + r.toString() + " rejected from " + threadPoolName + " " + executor.toString());
}
}
It's understandable that DiscardPolicy
, Discarding the task , Simultaneous recording warn journal .
BlockedPolicy
static class BlockedPolicy implements RejectedExecutionHandler {
private String threadPoolName;
private int timeoutSeconds;
public BlockedPolicy(String threadPoolName, int timeoutSeconds) {
this.threadPoolName = threadPoolName;
this.timeoutSeconds = timeoutSeconds;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
executor.getQueue().offer(r, timeoutSeconds, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.warn("Task " + r.toString() + " wait to enqueue in " + threadPoolName + " " + executor.toString() + " failed");
}
}
}
This strategy will be special , It will block the current thread , Try your best to put the task in the queue . If the specified blocking time is exceeded timeoutSeconds
( Default 60s), Still unable to put the task in the queue , Record warn journal , And discard the task .
These two strategies are Doris
It is actually used in , At the same time, the task queue size of the thread pool is set to 10.
ElasticSearch
ES
The rejection strategy of is relatively complex , Its customization implements two rejection policies .
- EsAbortPolicy
public class EsAbortPolicy extends EsRejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (r instanceof AbstractRunnable) {
if (((AbstractRunnable) r).isForceExecution()) {
BlockingQueue<Runnable> queue = executor.getQueue();
if ((queue instanceof SizeBlockingQueue) == false) {
throw new IllegalStateException("forced execution, but expected a size queue");
}
try {
((SizeBlockingQueue<Runnable>) queue).forcePut(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("forced execution, but got interrupted", e);
}
return;
}
}
incrementRejections();
throw newRejectedException(r, executor, executor.isShutdown());
}
}
It's essentially AbortPolicy
, But there will be some special treatment , Include forceExecution
Judgment of enforcement 、 Statistics of task rejection times , Finally throw an exception .
ES in , Thread pool
forceExecution
What is it? ?When the conditions are met , That is to use ES Self defined
AbstractRunnable
Perform task encapsulation 、SizeBlockingQueue
As a task queue , You can determine whether to force it into the task queue according to the task configuration . For some important tasks , When it cannot be discarded , Can beforceExecution
Set to true.The effect of being forced into the task queue depends on
SizeBlockingQueue
Queue types encapsulated in , If the package isArrayBlockingQueue
, Then the waiting queue will be blocked and free ; If the package isLinkedTransferQueue
, Because the queue size is infinite , And put It uses ASYNC Pattern , So it will be put in the queue immediately and return .
- ForceQueuePolicy
static class ForceQueuePolicy extends EsRejectedExecutionHandler {
...
@Override
public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
if (rejectAfterShutdown) {
if (executor.isShutdown()) {
reject(executor, task);
} else {
put(executor, task);
if (executor.isShutdown() && executor.remove(task)) {
reject(executor, task);
}
}
} else {
put(executor, task);
}
}
private static void put(ThreadPoolExecutor executor, Runnable task) {
final BlockingQueue<Runnable> queue = executor.getQueue();
// force queue policy should only be used with a scaling queue
assert queue instanceof ExecutorScalingQueue;
try {
queue.put(task);
} catch (final InterruptedException e) {
assert false : "a scaling queue never blocks so a put to it can never be interrupted";
throw new AssertionError(e);
}
}
private void reject(ThreadPoolExecutor executor, Runnable task) {
incrementRejections();
throw newRejectedException(task, executor, true);
}
}
}
This policy does not close the thread pool , And used ES Self defined ExecutorScalingQueue
Task queue of , Will force the task to be placed in the thread pool queue . among ,ExecutorScalingQueue
It's also inherited from LinkedTransferQueue
, The final call put Methods to ASYNC Put the mode into the task queue .
It looks like
forceExecution
, And they all end up usingLinkedTransferQueue
Of put Methods to ASYNC Mode non blocking queue . thatEsAbortPolicy
andForceQueuePolicy
What's the difference ?There are many similarities between the two , There are
forceExecution
The judgment of the , And when you refuse, you throwRejectedExecutionException
.The difference is ,
ForceQueuePolicy
The default mode is enforcement , And it is still possible to put tasks into the queue when the process pool is closed .
other
stay GitHub I flipped it casually , There are also ways to use the strategy chain , The implementation is also simple , Different strategies can be combined and configured at will .
public class RejectedExecutionChainPolicy implements RejectedExecutionHandler {
private final RejectedExecutionHandler[] handlerChain;
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
for (RejectedExecutionHandler handler : handlerChain) {
handler.rejectedExecution(r, executor);
}
}
}
summary
The rejection policy is mainly used in the case of resource overflow in the thread pool , In addition to the common reasons JDK In addition to the four rejection strategies provided , Different components will also try to use different rejection strategies to apply .
JDK The rejection policy provided
type | explain |
---|---|
CallerRunsPolicy | JDK Provide , Caller thread processing |
AbortPolicy | JDK Thread pool is used by default , Throw out RejectedExecutionException abnormal |
DiscardPolicy | JDK Provide , Discard the current task |
DiscardOldestPolicy | JDK Provide , Discard the next task to perform |
Custom reject policy
Components | type | explain |
---|---|---|
RocketMQ | AbortPolicy | The thread pool used is slightly rejected by default , namely AbortPolicy |
Dubbo | AbortPolicyWithReport | Throw out RejectedExecutionException abnormal , And report overflow , Record JVM Thread stack |
Netty | RejectedExecutionHandlers | Logic and AbortPolicy Agreement , Throw an exception , Encapsulation is a single example Handler Use |
Doris | LogDiscardPolicy | Logic and DiscardPolicy Agreement , Discarding the task , And record warn journal |
Doris | BlockedPolicy | Try your best to put the task in the queue for execution , Waiting for the most 60s, Record after timeout warn journal , And discard the task |
Elastic | EsAbortPolicy | Under normal circumstances, it is similar to AbortPolicy Agreement , If thread marking is enforced , Then it is forced to execute or put into the task queue , The performance of the actual queue depends on the queue type , May block or return immediately |
Elastic | ForceQueuePolicy | By default, the task is enforced or placed in the task queue , Asynchronous non-blocking |
other | PolicyChain | Policy chain , Contains a variety of rejection policies , Determine the final performance according to the conditions and node processing results |
边栏推荐
- 基于鲲鹏原生安全,打造安全可信的计算平台
- 滑轨步进电机调试(全国海洋航行器大赛)(STM32主控)
- 简单好用的代码规范
- Mongodb meets spark (for integration)
- 室内ROS机器人导航调试记录(膨胀半径的选取经验)
- MongoDB优化的几点原则
- Practical example of propeller easydl: automatic scratch recognition of industrial parts
- Ways to improve the performance of raspberry pie
- Scrapy教程经典实战【新概念英语】
- Mongodb replication (replica set) summary
猜你喜欢
MATLAB中polarscatter函数使用
Analysis of DHCP dynamic host setting protocol
1. Deep copy 2. Call apply bind 3. For of in differences
1、深拷贝 2、call apply bind 3、for of for in 区别
Scripy tutorial classic practice [New Concept English]
JS缓动动画原理教学(超细节)
Scrapy教程经典实战【新概念英语】
Ogre introduction
Cinnamon Applet 入门
我那“不好惹”的00后下属:不差钱,怼领导,抵制加班
随机推荐
MongoDB 遇见 spark(进行整合)
服务器到服务器 (S2S) 事件 (Adjust)
How far can it go to adopt a cow by selling the concept to the market?
Talk about pseudo sharing
Navicat运行sql文件导入数据不全或导入失败
PAcP learning note 3: pcap method description
Write it down once Net a new energy system thread surge analysis
聊聊伪共享
滑轨步进电机调试(全国海洋航行器大赛)(STM32主控)
Cloud detection 2020: self attention generation countermeasure network for cloud detection in high-resolution remote sensing images
【Presto Profile系列】Timeline使用
DETR介绍
Use of polarscatter function in MATLAB
Ikvm of toolbox Net project new progress
MongoDB优化的几点原则
xshell连接服务器把密钥登陆改为密码登陆
分屏bug 小记
[untitled]
Write it down once Net a new energy system thread surge analysis
Fast development board pinctrl and GPIO subsystem experiment for itop-imx6ull - modify the device tree file