当前位置:网站首页>The principle and source code interpretation of executor thread pool in concurrent programming
The principle and source code interpretation of executor thread pool in concurrent programming
2022-06-11 00:43:00 【xujingyiss】
Why use thread pools ?
A thread pool is a thread cache .
stay web In development , The server needs to accept and process requests , So a thread will be allocated for a request to process . If a new thread is created for each request , Very simple to implement , But there is a problem :
If the number of concurrent requests is very large , But the execution time of each thread is very short , This will frequently create and destroy threads , This will greatly reduce the efficiency of the system . It may appear that the server spends more time and system resources on creating new threads and destroying threads for each request than it does on actual user requests . because The creation and destruction of threads is very time-consuming and system resource consuming !
To solve this problem , Thread pool is introduced .
Advantages of thread pool
- Reuse existing threads . Reduce thread creation 、 The cost of extinction , Improve performance
- Improve response time . When the mission arrives , Tasks can be executed without waiting for thread creation
- Improve the manageability of threads . Threads are scarce resources , If unlimited creation , Not only does it consume system resources , It also reduces the stability of the system , Uniform allocation is possible using thread pools , Tune and monitor
When to use thread pool ?
- Single task processing time is relatively short
- There are a lot of tasks to deal with
Executor frame
Executor The interface is the most basic part of the thread pool framework , Defines a for executing Runnable Of execute Method .
Its inheritance and implementation :

As you can see from the diagram Executor There is an important sub interface :ExecutorService.
It defines the specific behavior of the thread pool :
execute(Runnable command): perform Ruannable Type of task
submit(task): Can be used to submit Callable or Runnable Mission , And return the Future object
shutdown(): Close up after completing the submitted tasks , No longer taking over new tasks
shutdownNow(): Stop all tasks in progress and close things down
isTerminated(): Test whether all tasks have been completed
isShutdown(): Test whether you should ExecutorService Has been closed
ExecutorService There is an important implementation class :ThreadPoolExecutor, It is commonly used as a thread pool .
When we create a thread pool , You can go directly through ThreadPoolExecutor Constructor creation for , It can also be done through Executors To create , But in fact Executors.newFixedThreadPool()、Executors.newCachedThreadPool() etc. , The interior is also created ThreadPoolExecutor.

Thread pool key properties
/**
* ctl It is a field to control the running state of thread pool and the number of effective threads in thread pool .
* It contains two parts of information :
* The running state of the thread pool (runState) And the number of valid threads in the thread pool (workerCount)
* Used Integer Type to hold , high 3 Bit save runState, low 29 Bit save workerCount.
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// COUNT_BITS Namely 32-3=29
private static final int COUNT_BITS = Integer.SIZE - 3;// CAPACITY Namely 1 Move left 29 Bitwise subtraction 1(29 individual 1), This constant represents workerCount The upper limit of , It's about 5 Billion
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
And ctl Related methods
runStateOf: Get running state
workerCountOf: Get the number of active threads
ctlOf: Get the value of the running state and the number of active threads
Thread pool state
Thread pool has 5 States :
| state | value | High three position |
|---|---|---|
| RUNNING | 1 << COUNT_BITS | 111 |
| SHUTDOWN | 0 << COUNT_BITS | 000 |
| STOP | 1 << COUNT_BITS | 001 |
| TIDYING | 2 << COUNT_BITS | 010 |
| TERMINATED | 3 << COUNT_BITS | 011 |
1、RUNNING
Status description : The thread pool is in RUNNING In the state of , Ability to take on new tasks , And processing the added tasks .
State switching : The initialization state of the thread pool is RUNNING. in other words , The thread pool is created once , It's in RUNNING state , And the number of tasks in the thread pool is 0!
2、SHUTDOWN
Status description : The thread pool is in SHUTDOWN In the state of , Don't take on new tasks , But can handle added tasks .
State switching : Call the thread pool shutdown() Interface , Thread pools are made up of RUNNING -> SHUTDOWN.
3、STOP
Status description : The thread pool is in STOP In the state of , Don't take on new tasks , Do not process the added tasks , And it interrupts the task being processed .
State switching : Call the thread pool shutdownNow() Interface , Thread pools are made up of (RUNNING or SHUTDOWN ) -> STOP.
4、TIDYING
Status description : When all tasks have terminated ,ctl Records of the ” Number of tasks ” by 0, The thread pool becomes TIDYING state . When the thread pool becomes TIDYING In the state of , The hook function is executed terminated().terminated() stay ThreadPoolExecutor Class is empty , If the user wants to change in the thread pool TIDYING when , Deal with it accordingly , Can overload terminated() function .
State switching : When the thread pool is in SHUTDOWN State, , When the blocking queue is empty and the tasks executing in the thread pool are also empty , It will be by SHUTDOWN -> TIDYING. When the thread pool is in STOP State, , The tasks executed in the thread pool are null , It will be by STOP -> TIDYING.
5、TERMINATED
Status description : The thread pool terminates completely , It becomes TERMINATED state .
State switching : The thread pool is in TIDYING In the state of , After execution terminated() after , It will be by TIDYING - > TERMINATED.

ThreadPoolExecutor Thread pool
Constructors
ThreadPoolExecutor There are several constructors , The most 7 Parameters :
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
Parameter description :
corePoolSize: In the thread pool 【 Number of core threads 】
-- When a task is submitted , The thread pool will create a new thread to execute the task , Until the current number of threads is equal to corePoolSize;
-- If the current number of threads is corePoolSize, Tasks that continue to commit are saved to a blocking queue , Waiting to be executed ;
-- If thread pool is executed prestartAllCoreThreads() Method , The thread pool is created ahead of time and starts all the core threads ;maximumPoolSize: Allowed in thread pool 【 Maximum number of threads 】
-- If the current blocking queue is full , And continue to submit tasks , Create a new thread to execute the task , If the current number of threads is less than maximumPoolSize;
keepAliveTime: The thread pool maintains the free time allowed for threads
-- When the number of threads in the thread pool is greater than corePoolSize When , If no new tasks are submitted at this time , Threads outside the core thread are not immediately destroyed , They wait , Until the wait is over keepAliveTime;
unit:keepAliveTime The unit ofworkQueue: A blocking queue used to hold tasks waiting to be executed , And the task must achieve Runable Interface
-- stay JDK The following blocking queue is provided in :
1、ArrayBlockingQueue: Bounded blocking queue based on array structure , Press FIFO Sorting task ;
2、LinkedBlockingQuene: Blocking queue based on linked list structure , Press FIFO Sorting task , Throughput is usually higher ArrayBlockingQuene;
3、SynchronousQuene: A blocked queue that does not store elements , Each insert must wait until another thread calls the remove operation , Otherwise, the insert operation is always blocked , Throughput is usually higher LinkedBlockingQuene;
4、priorityBlockingQuene: Unbounded blocking queue with priority ;threadFactory: It is ThreadFactory Variable of type , To create a new thread
-- By default Executors.defaultThreadFactory() To create threads .
-- Use default ThreadFactory To create a thread , Will cause the newly created thread to have the same NORM_PRIORITY priority , And it's a non Guardian thread , It also sets the name of the thread .
handler: Thread pool saturation strategy .
-- When the blocking queue is full , There are no idle worker threads , If you continue to submit the task , A strategy must be adopted to deal with the task , Thread pools are provided 4 Strategies :
1、AbortPolicy: Throw an exception directly , The default policy ;
2、CallerRunsPolicy: Use the thread of the caller to execute the task ;
3、DiscardOldestPolicy: Discard the top task in the blocking queue , And carry out the current task ;
4、DiscardPolicy: Discard tasks directly ;
for example Executor.newFixedThreadPool(int nThreads) Method , In fact, the internal constructor is also called to create the thread pool . Only one parameter is passed in :nThreads, Others use fixed values :
1. corePoolSize = nThreads
2. maximumPoolSize = nThreads
3. keepAliveTime = 0
4. unit = TimeUnit.MILLISECONDS
5. workQueue = new LinkedBlockingQueue<Runnable>()
6. threadFactory = Executors.defaultThreadFactory()
7. handler = AbortPolicy
Thread pool monitoring
public long getTaskCount() // Total number of tasks executed and not executed by thread pool
public long getCompletedTaskCount() // Number of tasks completed
public int getPoolSize() // The current number of threads in the thread pool
public int getActiveCount() // The number of threads in the thread pool that are executing tasks
Thread pool principle

Process description :
- Submit a task , If the current number of threads is less than the number of core threads (corePoolSize), Then add worker thread ( Core thread ) And implement ;
- If the current number of threads is greater than the number of core threads , Determine the blocking queue (workQueue) Is it full ( For example, if the blocking queue uses ArrayBlockingQueue, Then the length is fixed , May be full ), If it's not full , Then join the blocking queue ;
- If the blocking queue is full , Then judge whether the current number of threads is less than the maximum number of threads (maximumPoolSize), If it is less than , Then add worker thread ( Non core thread ) And implement ;
- If the blocking queue is full , And the current thread is equal to the maximum number of threads , Reject policy processing (4 In the strategy )
The source code parsing
execute
Normal creation ThreadPoolExecutor after , Call it the execute() Method to create a thread .
This execute() Method , This corresponds to the above schematic diagram
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// If the current number of threads is less than the number of core threads
if (workerCountOf(c) < corePoolSize) {
// Add worker threads ( Core thread , The second parameter is true) And implement
if (addWorker(command, true))
Return on success
return;
c = ctl.get();
}
// Add task to blocking queue . Queue not full , return true, Returns when the queue is full false
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// If the blocking queue is full , Add a non core thread ( The second parameter is false)
else if (!addWorker(command, false))
reject(command);
}
addWorker
Add a worker thread and start the thread , Is in addWorker Method :
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// Do some checks and put workerCount + 1
......boolean workerStarted = false; // worker Whether the corresponding thread is started
boolean workerAdded = false; // worker Whether to add successfully
Worker w = null;
try {
// establish worker, stay Worker In the constructor of , Created a thread Thread, And follow worker binding
w = new Worker(firstTask);
// from worker Get the thread object in
final Thread t = w.thread;
if (t != null) {
// During operation execution , It needs to be locked first . To operate workers list , The lock must be acquired first !
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Running state
int rs = runStateOf(ctl.get());
// Under normal circumstances Running state ,RUNNING = -1 < SHUTDOWN
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// add to worker
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// Set up worker Add success
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// Start thread
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker
The thread is in Worker In the construction method of .
Worker Is an important part of the thread pool , Used to create 、 Hold thread , And really perform the task .
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
final Thread thread; // Threads
Runnable firstTask; // Mission
volatile long completedTasks; // Number of tasks completed , For statistics .Worker You can reuseWorker(Runnable firstTask) {
// take AQS Of state Set to -1, To prevent the runWorker Was interrupted before . It will release after startup (w.unlock())
setState(-1);
this.firstTask = firstTask;
// Create thread .this Is the current Worder,Worder Is itself a Runnable
this.thread = getThreadFactory().newThread(this);
}// Worker Realized Runnable, Realized run() Method
public void run() {
runWorker(this);
}
// Worker It is also a lock . It's a " An exclusive lock ", And " Do not reenter "( Unlike ReentrantLock It can be locked many times )
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
......
}
runWorker
Worker It's a Runnable, Creating Thread Time is passed in Worker, So execute t.start() when , In the end Worker Medium run() Method
public void run() {
runWorker(this);
}final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // Allow the interrupt , Set up AQS Of state by 0(state>=0 Only interrupt can be called )
boolean completedAbruptly = true;
try {
// task It's empty time , Get... From the blocking queue , As long as you can get , Just keep running . That is to say " Thread reuse "
while (task != null || (task = getTask()) != null) {
w.lock(); // Add an exclusive lock , And cannot re-enter
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// Empty method
beforeExecute(wt, task);
Throwable thrown = null;
try {
// Finally run the task , That is, what you define Runnable Medium run() Method
task.run();
} catch (xxx) {
xxx
} finally {
afterExecute(task, thrown);
}
} finally {
task = null; // After the end task Set as null, So that tasks can be retrieved from the blocking queue later
w.completedTasks++; // worker Number of tasks completed +1, For statistics
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
getTask
There's a getTask Method . stay runWorker when , Pull and execute the task from the blocking queue , Achieve reuse of worker threads (Worker) The role of
private Runnable getTask() {
boolean timedOut = false; // lately poll() Whether the task timed outfor (;;) {
int c = ctl.get();
// Verify thread pool status
...// Number of active threads in the current thread pool
int wc = workerCountOf(c);/**
* timed: Whether timeout control is required
* allowCoreThreadTimeOut:" Core thread " Is timeout allowed , The default is false, That is, it is not allowed to
* corePoolSize: Number of core threads
* So this means : If " Core thread " Allow timeout ,
* And the current number of active threads is greater than that of core threads ( That is, there are non core threads ), Timeout control is required
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;/**
* When several conditions are met , return null, sign out runWorker The cycle in
* To remove Worker, To control the number of effective threads in the thread pool
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}try {
/**
* Whether timeout control is required , Get tasks from the blocking queue in different ways
* poll(long timeout, TimeUnit unit): If it can't be taken out immediately , , etc. time Time specified by parameters , Return when not available null
* take(): if BlockingQueue It's empty , Then it enters the waiting state until a new object is added to the blocking queue
*/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
processWorkerExit
stay runWorker Methodical finilly in , Called processWorkerExit Method to destroy the worker thread . in other words , There are no tasks to perform ,while The loop will exit , And then destroy it. Worker. It will put Worker from workers Remove from list .
边栏推荐
- Shengteng AI development experience based on target detection and identification of Huawei cloud ECS [Huawei cloud to jianzhiyuan]
- Yum source update
- With a market value of 21.5 billion yuan, will the post-80s generation in Sichuan make TV history?
- [kingcraft] 3.1 link layer - functions of data link layer
- [database] MySQL index interview questions
- MP framework basic operation (self use)
- Unity custom folder icon color personalized unity compiler
- 海贼oj#148.字符串反转
- How to ensure the sequence of messages, that messages are not lost or consumed repeatedly
- Installation of phpstudy
猜你喜欢
![[network planning] 2.2.3 user server interaction: cookies](/img/a8/74a1b44ce4d8b0b1a85043a091a91d.jpg)
[network planning] 2.2.3 user server interaction: cookies

Download Google gcr IO image

Lucene mind map makes search engines no longer difficult to understand

飞利浦 COO 人事变动,将临危受命解决“供应链和产品召回”双重危机

USB IP core FPGA debugging (I)

【无标题】6666666
![[network planning] 1.5 seven layer network model and five layer network model](/img/a8/74a1b44ce4d8b0b1a85043a091a91d.jpg)
[network planning] 1.5 seven layer network model and five layer network model

系统应用安装时,签名校验失败问题

VTK example -- three intersecting planes

【数据库】Mysql索引面试题
随机推荐
系统应用安装时,签名校验失败问题
Yum source update
canvas绘画折线段
Word在目录里插入引导符(页码前的小点点)的方法
Kubernetes入门介绍与基础搭建
如何保证消息的顺序性、消息不丢失、不被重复消费
mybaits merge into
With a market value of 21.5 billion yuan, will the post-80s generation in Sichuan make TV history?
数组的字典排序
The mystery of number idempotent and perfect square
Unity自定义文件夹图标颜色 个性化Unity编译器
Is it safe to open an account for stock speculation in Shanghai?
[database] types of NoSQL database
Exemple VTK - - trois plans qui se croisent
Yii2 activerecord uses the ID associated with the table to automatically remove duplicates
Unable to return to the default page after page Jump
f‘s‘f‘s‘f‘s‘d
The JVM determines whether an object can be recycled
Dictionary sort of array
Rich text activity test 1