当前位置:网站首页>Thread pool reject policy best practices

Thread pool reject policy best practices

2022-07-07 13:29:00 Bug Commander

Thread pool exhaustion occurs occasionally in projects on the front line , Recently, I finally have time to study it well , The problem is actually not complicated , Also thanks to Dubbo The rejection policy of thread pool can quickly find the general reason .

Through this question , I'm also curious about the thread pool rejection strategy used by various countries , Dig a hole 、 Dig soil , Let's see ~

The problem background

Thread pool exhaustion occurs occasionally on the front line , The phenomenon is as follows :

 

Downstream of the call Dubbo Interface , Tips Server The thread pool on the end is exhausted .

At first, I thought there was sudden traffic , But the monitoring shows that the flow is stable , And problems still exist after capacity expansion , Gradually realize that the problem is not simple .

Problem analysis

Since there are exception logs and stacks , First, let's see what scenario this exception will appear . stay Dubbo Source code , We can find this prompt in AbortPolicyWithReport in .

public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy

AbortPolicyWithReport Inherited from java.util.concurrent.ThreadPoolExecutor.AbortPolicy, It is a thread pool rejection policy , When the buffer task queue in the thread pool is full , When the number of threads reaches the maximum , Will trigger the rejection policy , Invoking the rejection policy rejectedExecution() Methods to deal with .

that , What are the different rejection strategies ?

JDK Thread pool rejection policy

stay java.util.concurrent.ThreadPoolExecutor, We can find JDK Four preset rejection strategies :

  • CallerRunsPolicy - Caller thread processing

Under this strategy , If the thread pool is not closed , Then it will be handled by the current caller thread , Otherwise, discard the task directly .

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        r.run();
    }
}
  • AbortPolicy - Throw an exception

If the rejection policy is not configured , The thread pool will use this policy by default , Direct selling rejectedExecution, Leave it to the upper business .

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("...");
}
  • DiscardPolicy - Discard the current task

The simplest way to deal with it , Direct discarding .

// The actual method body is empty , That is, it is not processed in this scenario , Direct discarding 
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
  • DiscardOldestPolicy - Discard the next task to perform

This strategy is to discard the oldest task in the queue ( In fact, it is the next task to be performed ), And try to perform the current task .

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        e.getQueue().poll();
        e.execute(r);
    }
}

Dubbo Thread pool rejection policy

that Dubbo What is your rejection strategy ?

In fact, it can be seen from the name ,AbortPolicyWithReport.

public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
    ...
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        String msg = String.format("Thread pool is EXHAUSTED!" + ...);
        logger.warn(msg);
        dumpJStack();
        dispatchThreadPoolExhaustedEvent(msg);
        throw new RejectedExecutionException(msg);
    }
    ...
}

Dubbo The rejection strategy of is to throw an exception RejectedExecutionException, At the same time, I will do one thing - dumpJStack(), Record the time JVM Thread stack .

dumpJStack

Let's look at the source code first .

private void dumpJStack() {

   // some dump Time interval and concurrency control 
   ...

    // Create a new one-way pool for dump Stack 
    ExecutorService pool = Executors.newSingleThreadExecutor();
    pool.execute(() -> {
        ...
	try (FileOutputStream jStackStream = new FileOutputStream(
            new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr))) {
            JVMUtil.jstack(jStackStream);
        } catch (Throwable t) {
            logger.error("dump jStack error", t);
        }
        ...
    });
    ...
}

It's very simple , The final call JVMUtil.jstack Put the present JVM The thread stack of dump Come down , And doing so has a great advantage , That is to know what other threads were doing at that time , Help analyze the cause of thread pool overflow .

Cause analysis

It's easy to have a thread stack , Look at what threads were doing at that time .

Dubbo Bottom use Netty Network communication , The thread pools involved include IO Thread pool (boss、worker) And business thread pool ( Handle business events ). From the previous log, we can see that Server Business thread pool on the end , namely DubboServerHandler Run out of , Then count , See what threads are doing .

Obviously , A large number of threads are blocked in getting DB Connected to the . Then it's easy to do , You can check whether slow queries occupy the connection for a long time in the same time period , Or is it true that the connection pool is small , The thread pool and connection pool are mismatched , So far, the analysis will not continue ( It's not the point of discussion, haha ).

Different rejection strategies

You can see ,Dubbo By rewriting the reject policy , To help locate problems in abnormal scenarios , It has helped a lot .

So how do other mainstream components do it ?

RocketMQ

With Broker For example , It contains many thread pools for different message processing scenarios , contain send、put、pull、query wait .

Use of process pool ,RocketMQ adopt BrokerFixedThreadPoolExecutor Inheritance encapsulates a layer ThreadPoolExecutor, The upper layer can pass in parameters by itself , It also includes configurable RejectedExecutionHandler.

Actually Broker When creating different thread pools for message processing , No special rejection policy is specified , So the default is AbortPolicy, Throw an exception .

this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
    this.brokerConfig.getSendMessageThreadPoolNums(),
    this.brokerConfig.getSendMessageThreadPoolNums(),
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.sendThreadPoolQueue,
    new ThreadFactoryImpl("SendMessageThread_")
    // No rejection policy is set 
);

At the same time, in order to avoid task overflow , A larger task queue size is set for each thread pool by default .

private int sendThreadPoolQueueCapacity = 10000;
private int putThreadPoolQueueCapacity = 10000;
private int pullThreadPoolQueueCapacity = 100000;
private int replyThreadPoolQueueCapacity = 10000;
private int queryThreadPoolQueueCapacity = 20000;
private int clientManagerThreadPoolQueueCapacity = 1000000;
private int consumerManagerThreadPoolQueueCapacity = 1000000;
private int heartbeatThreadPoolQueueCapacity = 50000;
private int endTransactionPoolQueueCapacity = 100000;

Sum up ,RocketMQ The rejection strategy of uses AbortPolicy, Throw an exception , At the same time, in order to avoid task queue overflow , Set a large task queue .

Netty

With EventLoopGroup For example , By default, the reject policy of the thread pool uses RejectedExecutionHandlers, Provide... Through singleton mode Handler To deal with .

public final class RejectedExecutionHandlers {
    private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() {
        @Override
        public void rejected(Runnable task, SingleThreadEventExecutor executor) {
            throw new RejectedExecutionException();
        }
    };

    private RejectedExecutionHandlers() { }

    public static RejectedExecutionHandler reject() {
        return REJECT;
    } 

    ...
}

It can be seen that ,Netty By default, the rejection policy of also throws an exception , And RocketMQ The difference in comparison is , The size of the task queue will take max(16, maxPendingTasks),io.netty.eventLoop.maxPengdingTasks It can be configured through environment variables .

Doris

The team has been using Doris, It belongs to computing storage separation 、MPP Analytical storage components of the architecture , Take a look at FE Rejection strategy in , Officially, there are two :

LogDiscardPolicy

static class LogDiscardPolicy implements RejectedExecutionHandler {

    private static final Logger LOG = LogManager.getLogger(LogDiscardPolicy.class);

    private String threadPoolName;

    public LogDiscardPolicy(String threadPoolName) {
        this.threadPoolName = threadPoolName;
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
       LOG.warn("Task " + r.toString() + " rejected from " + threadPoolName + " " + executor.toString());
    }
}

It's understandable that DiscardPolicy, Discarding the task , Simultaneous recording warn journal .

BlockedPolicy

static class BlockedPolicy implements RejectedExecutionHandler {
    private String threadPoolName;
    private int timeoutSeconds;

    public BlockedPolicy(String threadPoolName, int timeoutSeconds) {
        this.threadPoolName = threadPoolName;
        this.timeoutSeconds = timeoutSeconds;
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            executor.getQueue().offer(r, timeoutSeconds, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            LOG.warn("Task " + r.toString() + " wait to enqueue in " + threadPoolName + " " + executor.toString() + " failed");
        }
    }
}

This strategy will be special , It will block the current thread , Try your best to put the task in the queue . If the specified blocking time is exceeded timeoutSeconds( Default 60s), Still unable to put the task in the queue , Record warn journal , And discard the task .

These two strategies are Doris It is actually used in , At the same time, the task queue size of the thread pool is set to 10.

ElasticSearch

ES The rejection strategy of is relatively complex , Its customization implements two rejection policies .

  • EsAbortPolicy
public class EsAbortPolicy extends EsRejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (r instanceof AbstractRunnable) {
            if (((AbstractRunnable) r).isForceExecution()) {
                BlockingQueue<Runnable> queue = executor.getQueue();
                if ((queue instanceof SizeBlockingQueue) == false) {
                    throw new IllegalStateException("forced execution, but expected a size queue");
                }
                try {
                    ((SizeBlockingQueue<Runnable>) queue).forcePut(r);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IllegalStateException("forced execution, but got interrupted", e);
                }
                return;
            }
        }
        incrementRejections();
        throw newRejectedException(r, executor, executor.isShutdown());
    }
}

It's essentially AbortPolicy, But there will be some special treatment , Include forceExecution Judgment of enforcement 、 Statistics of task rejection times , Finally throw an exception .

ES in , Thread pool forceExecution What is it? ?

When the conditions are met , That is to use ES Self defined AbstractRunnable Perform task encapsulation 、SizeBlockingQueue As a task queue , You can determine whether to force it into the task queue according to the task configuration . For some important tasks , When it cannot be discarded , Can be forceExecution Set to true.

The effect of being forced into the task queue depends on SizeBlockingQueue Queue types encapsulated in , If the package is ArrayBlockingQueue, Then the waiting queue will be blocked and free ; If the package is LinkedTransferQueue, Because the queue size is infinite , And put It uses ASYNC Pattern , So it will be put in the queue immediately and return .

  • ForceQueuePolicy
static class ForceQueuePolicy extends EsRejectedExecutionHandler {

        ...

        @Override
        public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
            if (rejectAfterShutdown) {
                if (executor.isShutdown()) {
                    reject(executor, task);
                } else {
                    put(executor, task);
                    if (executor.isShutdown() && executor.remove(task)) {
                        reject(executor, task);
                    }
                }
            } else {
                put(executor, task);
            }
        }

        private static void put(ThreadPoolExecutor executor, Runnable task) {
            final BlockingQueue<Runnable> queue = executor.getQueue();
            // force queue policy should only be used with a scaling queue
            assert queue instanceof ExecutorScalingQueue;
            try {
                queue.put(task);
            } catch (final InterruptedException e) {
                assert false : "a scaling queue never blocks so a put to it can never be interrupted";
                throw new AssertionError(e);
            }
        }

        private void reject(ThreadPoolExecutor executor, Runnable task) {
            incrementRejections();
            throw newRejectedException(task, executor, true);
        }
    }

}

This policy does not close the thread pool , And used ES Self defined ExecutorScalingQueue Task queue of , Will force the task to be placed in the thread pool queue . among ,ExecutorScalingQueue It's also inherited from LinkedTransferQueue, The final call put Methods to ASYNC Put the mode into the task queue .

It looks like forceExecution, And they all end up using LinkedTransferQueue Of put Methods to ASYNC Mode non blocking queue . that EsAbortPolicy and ForceQueuePolicy What's the difference ?

There are many similarities between the two , There are forceExecution The judgment of the , And when you refuse, you throw RejectedExecutionException.

The difference is ,ForceQueuePolicy The default mode is enforcement , And it is still possible to put tasks into the queue when the process pool is closed .

other

stay GitHub I flipped it casually , There are also ways to use the strategy chain , The implementation is also simple , Different strategies can be combined and configured at will .

public class RejectedExecutionChainPolicy implements RejectedExecutionHandler {

    private final RejectedExecutionHandler[] handlerChain;

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        for (RejectedExecutionHandler handler : handlerChain) {
            handler.rejectedExecution(r, executor);
        }
    }
}

summary

The rejection policy is mainly used in the case of resource overflow in the thread pool , In addition to the common reasons JDK In addition to the four rejection strategies provided , Different components will also try to use different rejection strategies to apply .

JDK The rejection policy provided

type explain
CallerRunsPolicyJDK Provide , Caller thread processing
AbortPolicyJDK Thread pool is used by default , Throw out RejectedExecutionException abnormal
DiscardPolicyJDK Provide , Discard the current task
DiscardOldestPolicyJDK Provide , Discard the next task to perform

Custom reject policy

Components type explain
RocketMQAbortPolicy The thread pool used is slightly rejected by default , namely AbortPolicy
DubboAbortPolicyWithReport Throw out RejectedExecutionException abnormal , And report overflow , Record JVM Thread stack
NettyRejectedExecutionHandlers Logic and AbortPolicy Agreement , Throw an exception , Encapsulation is a single example Handler Use
DorisLogDiscardPolicy Logic and DiscardPolicy Agreement , Discarding the task , And record warn journal
DorisBlockedPolicy Try your best to put the task in the queue for execution , Waiting for the most 60s, Record after timeout warn journal , And discard the task
ElasticEsAbortPolicy Under normal circumstances, it is similar to AbortPolicy Agreement , If thread marking is enforced , Then it is forced to execute or put into the task queue , The performance of the actual queue depends on the queue type , May block or return immediately
ElasticForceQueuePolicy By default, the task is enforced or placed in the task queue , Asynchronous non-blocking
other PolicyChain Policy chain , Contains a variety of rejection policies , Determine the final performance according to the conditions and node processing results
原网站

版权声明
本文为[Bug Commander]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/188/202207071104519452.html