当前位置:网站首页>Write a thread pool by hand, and take you to learn the implementation principle of ThreadPoolExecutor thread pool
Write a thread pool by hand, and take you to learn the implementation principle of ThreadPoolExecutor thread pool
2022-07-04 07:23:00 【InfoQ】
One 、 Thread pool explanation
Let's take an example
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10));
threadPoolExecutor.execute(() -> {
System.out.println("Hi Thread pool !");
});
threadPoolExecutor.shutdown();
// Executors.newFixedThreadPool(10);
// Executors.newCachedThreadPool();
// Executors.newScheduledThreadPool(10);
// Executors.newSingleThreadExecutor();
Write a thread pool
2.1 Implementation process
data:image/s3,"s3://crabby-images/3b06e/3b06ec437da1ab9e05ea4e7e26d13cdbf47e61e6" alt="null"
- Yes n Running threads , This is equivalent to the thread pool size allowed when we create the thread pool .
- Submit the thread to the thread pool to run .
- If the running thread pool is full , Put the thread in the queue .
- Finally, when there is free time , Get the threads in the queue to run .
2.2 Implementation code
public class ThreadPoolTrader implements Executor {
private final AtomicInteger ctl = new AtomicInteger(0);
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private final BlockingQueue<Runnable> workQueue;
public ThreadPoolTrader(int corePoolSize, int maximumPoolSize, BlockingQueue<Runnable> workQueue) {
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
}
@Override
public void execute(Runnable command) {
int c = ctl.get();
if (c < corePoolSize) {
if (!addWorker(command)) {
reject();
}
return;
}
if (!workQueue.offer(command)) {
if (!addWorker(command)) {
reject();
}
}
}
private boolean addWorker(Runnable firstTask) {
if (ctl.get() >= maximumPoolSize) return false;
Worker worker = new Worker(firstTask);
worker.thread.start();
ctl.incrementAndGet();
return true;
}
private final class Worker implements Runnable {
final Thread thread;
Runnable firstTask;
public Worker(Runnable firstTask) {
this.thread = new Thread(this);
this.firstTask = firstTask;
}
@Override
public void run() {
Runnable task = firstTask;
try {
while (task != null || (task = getTask()) != null) {
task.run();
if (ctl.get() > maximumPoolSize) {
break;
}
task = null;
}
} finally {
ctl.decrementAndGet();
}
}
private Runnable getTask() {
for (; ; ) {
try {
System.out.println("workQueue.size:" + workQueue.size());
return workQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private void reject() {
throw new RuntimeException("Error!ctl.count:" + ctl.get() + " workQueue.size:" + workQueue.size());
}
public static void main(String[] args) {
ThreadPoolTrader threadPoolTrader = new ThreadPoolTrader(2, 2, new ArrayBlockingQueue<Runnable>(10));
for (int i = 0; i < 10; i++) {
int finalI = i;
threadPoolTrader.execute(() -> {
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(" Task number :" + finalI);
});
}
}
}
// test result
Task number :1
Task number :0
workQueue.size:8
workQueue.size:8
Task number :3
workQueue.size:6
Task number :2
workQueue.size:5
Task number :5
workQueue.size:4
Task number :4
workQueue.size:3
Task number :7
workQueue.size:2
Task number :6
workQueue.size:1
Task number :8
Task number :9
workQueue.size:0
workQueue.size:0
- ctl, Used to record the number of threads in the thread pool .
- corePoolSize、maximumPoolSize, Used to limit thread pool capacity .
- workQueue, Thread pool queue , That is, the threads that can't be run in time , Will be loaded into this queue .
- execute, For commit thread , This is a general interface method . In this method, the main implementation is , The current thread submitted is added to worker、 Queue or abandon .
- addWorker, Mainly class Worker The specific operation of , Create and execute threads . It also includes getTask() Method , In other words, the queue is constantly getting the threads that have not been executed .
Thread pool source code analysis
3.1 Thread pool class diagram
data:image/s3,"s3://crabby-images/ab537/ab5379cab3914186e2120f39852a0f084de457e8" alt="null"
- Interface Executor、ExecutorService, The basic method of defining thread pool . In especial execute(Runnable command) Submit route pool method .
- abstract class AbstractExecutorService, The basic general interface method is realized .
- ThreadPoolExecutor, It is the core tool class method of the whole thread pool , All other classes and interfaces , To provide functionality around this class .
- Worker, It's a task class , That is, the method of the thread of final execution .
- RejectedExecutionHandler, Is the deny policy interface , There are four implementation classes ;AbortPolicy( Reject by throwing exception )、DiscardPolicy( Direct discarding )、DiscardOldestPolicy( Discard the longest lived mission )、CallerRunsPolicy( Who submits and who executes ).
- Executors, Is a thread pool used to create different strategies that we use ,newFixedThreadPool、newCachedThreadPool、newScheduledThreadPool、newSingleThreadExecutor.
3.2 high 3 Bit and low 29 position
data:image/s3,"s3://crabby-images/966fa/966fadfa7bea651b34c2baa82c72935cc7c3c674" alt="null"
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
3.3 Thread pool state
data:image/s3,"s3://crabby-images/9c31f/9c31ff7637a1902cc2a4cfc484f4f0d7d49fe037" alt="null"
- RUNNING: Running state , Accept new tasks and process tasks in the queue .
- SHUTDOWN: The closed position ( Called shutdown Method ). No new assignments ,, But to deal with tasks in the queue .
- STOP: Stop state ( Called shutdownNow Method ). No new assignments , Nor do the tasks in the queue , And interrupt the task being processed .
- TIDYING: All missions have been terminated ,workerCount by 0, When the thread pool enters this state, it will call terminated() Methods into the TERMINATED state .
- TERMINATED: Termination status ,terminated() The state after the end of a method call .
3.4 Commit thread (execute)
data:image/s3,"s3://crabby-images/c2fbb/c2fbb3148b546785f8d80301d48a0009fbfb2d95" alt="null"
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
- ctl.get(), Take the value of recording the thread state and the number of threads , Finally, the method is needed workerCountOf(), To get the current number of threads .`workerCountOf Execution is c & CAPACITY operation .
- According to the number of threads in the current thread pool , And core threads corePoolSize comparing , If less than, the thread is added to the task execution queue .
- If the number of threads is full , Then we need to judge whether the thread pool is in the running state isRunning(c). If it is running, the thread that cannot be executed is put into the thread queue .
- After putting into the thread queue , And whether the operation needs to be rerun , If not running and removed , Then reject the policy . Otherwise, the number of threads is determined as 0 Add new thread after .
- Finally, try to add task execution again , At this point the method addWorker The second input parameter of is false, Finally, it will affect the judgment of the number of tasks to be executed . If the addition fails, reject the policy .
3.5 Add execution task (addWorker)
data:image/s3,"s3://crabby-images/cb969/cb969610ef50adfbc00ad34c3ea46d52a0d14caf" alt="null"
The first part 、 Increase the number of threads
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
The second part 、 Create start thread
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
- if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty())), Determine the current thread pool state , Is it SHUTDOWN、STOP、TIDYING、TERMINATED One of them . And the current status is SHUTDOWN、 And the task passed in is null, And the queue is not empty . So return false.
- compareAndIncrementWorkerCount,CAS operation , Increase the number of threads , Success will jump out of the marked loop .
- runStateOf(c) != rs, Finally, the thread pool state judgment , Decide whether to cycle or not .
- After the number of thread pools is successfully recorded , You need to enter the locking process , Create execution thread , And record the status . In the end, if it is judged that the startup is not successful , You need to execute addWorkerFailed Method , Remove to thread method and other operations .
3.6 Execute thread (runWorker)
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // Allow the interrupt
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null)
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
- beforeExecute、afterExecute, Do some statistics before and after thread execution .
- In addition, the lock operation here is Worker Inherit AQS A non reentrant exclusive lock implemented by itself .
- processWorkerExit, If you're interested , Similar methods can also be used to learn more about . When the thread exits workers Do some removal processing and the number of tasks completed , It's also very interesting
3.7 Queue get task (getTask)
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
- getTask Method gets the task waiting to be executed from the blocking queue , That is to say, take out threads one by one .
- if (rs >= SHUTDOWN ..., Determine whether the thread is shut down .
- wc = workerCountOf(c),wc > corePoolSize, If the number of worker threads exceeds the number of core threads corePoolSize also workQueue Not empty , Add worker threads . But if the timeout does not get the thread , Will be greater than corePoolSize The thread is destroyed .
- timed, yes allowCoreThreadTimeOut from . Final timed by true when , By blocking the queue poll Method to control the timeout .
- If in keepAliveTime No task was obtained in time , Then return to null. If false, The block .
Two 、 summary
- This chapter does not cover all the knowledge of thread pool , Otherwise, a piece of content will be bloated . In this chapter we start with the handwritten thread pool , Step by step, analyze the code in the Java How is it implemented in the thread pool of , The knowledge involved is almost what we have introduced before , Include : queue 、CAS、AQS、 Reentrant lock 、 Exclusive lock, etc . So this knowledge is basically linked , It's better to have some foundation, otherwise it will be difficult to understand .
- In addition to what is described in this chapter , We haven't talked about thread destruction yet 、 Selection and use of four thread pool methods 、 And in CPU Intensive task 、IO How to configure for intensive tasks . In addition to Spring Also has its own thread pool method . These knowledge points are very close to the actual operation .
边栏推荐
- [Chongqing Guangdong education] National Open University spring 2019 770 real estate appraisal reference questions
- [untitled] notice on holding "2022 traditional fermented food and modern brewing technology"
- Boosting the Performance of Video Compression Artifact Reduction with Reference Frame Proposals and
- Zephyr 学习笔记2,Scheduling
- What is the use of cloud redis? How to use cloud redis?
- [FreeRTOS] FreeRTOS learning notes (7) - handwritten FreeRTOS two-way linked list / source code analysis
- 2022 - 021arts: début du deuxième semestre
- Unity opens the explorer from the inspector interface, selects and records the file path
- Two years ago, the United States was reluctant to sell chips, but now there are mountains of chips begging China for help
- How can the old version of commonly used SQL be migrated to the new version?
猜你喜欢
Knowledge payment applet dream vending machine V2
the input device is not a TTY. If you are using mintty, try prefixing the command with ‘winpty‘
关于IDEA如何设置快捷键集
响应式移动Web测试题
Zephyr study notes 2, scheduling
博客停更声明
The IP bound to the socket is inaddr_ The meaning of any htonl (inaddr_any) (0.0.0.0 all addresses, uncertain addresses, arbitrary addresses)
uniapp小程序分包
Status of the thread
Summary of MySQL common judgment functions!! Have you used it
随机推荐
Deep profile data leakage prevention scheme
【网络数据传输】基于FPGA的百兆网/兆网千UDP数据包收发系统开发,PC到FPGA
Crawler (III) crawling house prices in Tianjin
Computer connects raspberry pie remotely through putty
Implementation of ZABBIX agent active mode
[freertos] freertos Learning notes (7) - written freertos bidirectionnel Link LIST / source analysis
响应式移动Web测试题
notepad++如何统计单词数量
Introduction to rce in attack and defense world
Technical experts from large factories: common thinking models in architecture design
Four sets of APIs for queues
Introduction to spark core components
[network data transmission] FPGA based development of 100M / Gigabit UDP packet sending and receiving system, PC to FPGA
The cloud native programming challenge ended, and Alibaba cloud launched the first white paper on application liveliness technology in the field of cloud native
Master-slave replication principle of MySQL database
Guoguo took you to write a linked list, and the primary school students said it was good after reading it
Introduction to deep learning Ann neural network parameter optimization problem (SGD, momentum, adagrad, rmsprop, Adam)
How notepad++ counts words
Node connection MySQL access denied for user 'root' @ 'localhost' (using password: yes
Basic DOS commands