当前位置:网站首页>Write a thread pool by hand, and take you to learn the implementation principle of ThreadPoolExecutor thread pool
Write a thread pool by hand, and take you to learn the implementation principle of ThreadPoolExecutor thread pool
2022-07-04 07:23:00 【InfoQ】
One 、 Thread pool explanation
Let's take an example
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10));
threadPoolExecutor.execute(() -> {
System.out.println("Hi Thread pool !");
});
threadPoolExecutor.shutdown();
// Executors.newFixedThreadPool(10);
// Executors.newCachedThreadPool();
// Executors.newScheduledThreadPool(10);
// Executors.newSingleThreadExecutor();
Write a thread pool
2.1 Implementation process
- Yes n Running threads , This is equivalent to the thread pool size allowed when we create the thread pool .
- Submit the thread to the thread pool to run .
- If the running thread pool is full , Put the thread in the queue .
- Finally, when there is free time , Get the threads in the queue to run .
2.2 Implementation code
public class ThreadPoolTrader implements Executor {
private final AtomicInteger ctl = new AtomicInteger(0);
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private final BlockingQueue<Runnable> workQueue;
public ThreadPoolTrader(int corePoolSize, int maximumPoolSize, BlockingQueue<Runnable> workQueue) {
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
}
@Override
public void execute(Runnable command) {
int c = ctl.get();
if (c < corePoolSize) {
if (!addWorker(command)) {
reject();
}
return;
}
if (!workQueue.offer(command)) {
if (!addWorker(command)) {
reject();
}
}
}
private boolean addWorker(Runnable firstTask) {
if (ctl.get() >= maximumPoolSize) return false;
Worker worker = new Worker(firstTask);
worker.thread.start();
ctl.incrementAndGet();
return true;
}
private final class Worker implements Runnable {
final Thread thread;
Runnable firstTask;
public Worker(Runnable firstTask) {
this.thread = new Thread(this);
this.firstTask = firstTask;
}
@Override
public void run() {
Runnable task = firstTask;
try {
while (task != null || (task = getTask()) != null) {
task.run();
if (ctl.get() > maximumPoolSize) {
break;
}
task = null;
}
} finally {
ctl.decrementAndGet();
}
}
private Runnable getTask() {
for (; ; ) {
try {
System.out.println("workQueue.size:" + workQueue.size());
return workQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private void reject() {
throw new RuntimeException("Error!ctl.count:" + ctl.get() + " workQueue.size:" + workQueue.size());
}
public static void main(String[] args) {
ThreadPoolTrader threadPoolTrader = new ThreadPoolTrader(2, 2, new ArrayBlockingQueue<Runnable>(10));
for (int i = 0; i < 10; i++) {
int finalI = i;
threadPoolTrader.execute(() -> {
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(" Task number :" + finalI);
});
}
}
}
// test result
Task number :1
Task number :0
workQueue.size:8
workQueue.size:8
Task number :3
workQueue.size:6
Task number :2
workQueue.size:5
Task number :5
workQueue.size:4
Task number :4
workQueue.size:3
Task number :7
workQueue.size:2
Task number :6
workQueue.size:1
Task number :8
Task number :9
workQueue.size:0
workQueue.size:0
- ctl, Used to record the number of threads in the thread pool .
- corePoolSize、maximumPoolSize, Used to limit thread pool capacity .
- workQueue, Thread pool queue , That is, the threads that can't be run in time , Will be loaded into this queue .
- execute, For commit thread , This is a general interface method . In this method, the main implementation is , The current thread submitted is added to worker、 Queue or abandon .
- addWorker, Mainly class Worker The specific operation of , Create and execute threads . It also includes getTask() Method , In other words, the queue is constantly getting the threads that have not been executed .
Thread pool source code analysis
3.1 Thread pool class diagram
- Interface Executor、ExecutorService, The basic method of defining thread pool . In especial execute(Runnable command) Submit route pool method .
- abstract class AbstractExecutorService, The basic general interface method is realized .
- ThreadPoolExecutor, It is the core tool class method of the whole thread pool , All other classes and interfaces , To provide functionality around this class .
- Worker, It's a task class , That is, the method of the thread of final execution .
- RejectedExecutionHandler, Is the deny policy interface , There are four implementation classes ;AbortPolicy( Reject by throwing exception )、DiscardPolicy( Direct discarding )、DiscardOldestPolicy( Discard the longest lived mission )、CallerRunsPolicy( Who submits and who executes ).
- Executors, Is a thread pool used to create different strategies that we use ,newFixedThreadPool、newCachedThreadPool、newScheduledThreadPool、newSingleThreadExecutor.
3.2 high 3 Bit and low 29 position
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
3.3 Thread pool state
- RUNNING: Running state , Accept new tasks and process tasks in the queue .
- SHUTDOWN: The closed position ( Called shutdown Method ). No new assignments ,, But to deal with tasks in the queue .
- STOP: Stop state ( Called shutdownNow Method ). No new assignments , Nor do the tasks in the queue , And interrupt the task being processed .
- TIDYING: All missions have been terminated ,workerCount by 0, When the thread pool enters this state, it will call terminated() Methods into the TERMINATED state .
- TERMINATED: Termination status ,terminated() The state after the end of a method call .
3.4 Commit thread (execute)
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
- ctl.get(), Take the value of recording the thread state and the number of threads , Finally, the method is needed workerCountOf(), To get the current number of threads .`workerCountOf Execution is c & CAPACITY operation .
- According to the number of threads in the current thread pool , And core threads corePoolSize comparing , If less than, the thread is added to the task execution queue .
- If the number of threads is full , Then we need to judge whether the thread pool is in the running state isRunning(c). If it is running, the thread that cannot be executed is put into the thread queue .
- After putting into the thread queue , And whether the operation needs to be rerun , If not running and removed , Then reject the policy . Otherwise, the number of threads is determined as 0 Add new thread after .
- Finally, try to add task execution again , At this point the method addWorker The second input parameter of is false, Finally, it will affect the judgment of the number of tasks to be executed . If the addition fails, reject the policy .
3.5 Add execution task (addWorker)
The first part 、 Increase the number of threads
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
The second part 、 Create start thread
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
- if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty())), Determine the current thread pool state , Is it SHUTDOWN、STOP、TIDYING、TERMINATED One of them . And the current status is SHUTDOWN、 And the task passed in is null, And the queue is not empty . So return false.
- compareAndIncrementWorkerCount,CAS operation , Increase the number of threads , Success will jump out of the marked loop .
- runStateOf(c) != rs, Finally, the thread pool state judgment , Decide whether to cycle or not .
- After the number of thread pools is successfully recorded , You need to enter the locking process , Create execution thread , And record the status . In the end, if it is judged that the startup is not successful , You need to execute addWorkerFailed Method , Remove to thread method and other operations .
3.6 Execute thread (runWorker)
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // Allow the interrupt
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null)
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
- beforeExecute、afterExecute, Do some statistics before and after thread execution .
- In addition, the lock operation here is Worker Inherit AQS A non reentrant exclusive lock implemented by itself .
- processWorkerExit, If you're interested , Similar methods can also be used to learn more about . When the thread exits workers Do some removal processing and the number of tasks completed , It's also very interesting
3.7 Queue get task (getTask)
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
- getTask Method gets the task waiting to be executed from the blocking queue , That is to say, take out threads one by one .
- if (rs >= SHUTDOWN ..., Determine whether the thread is shut down .
- wc = workerCountOf(c),wc > corePoolSize, If the number of worker threads exceeds the number of core threads corePoolSize also workQueue Not empty , Add worker threads . But if the timeout does not get the thread , Will be greater than corePoolSize The thread is destroyed .
- timed, yes allowCoreThreadTimeOut from . Final timed by true when , By blocking the queue poll Method to control the timeout .
- If in keepAliveTime No task was obtained in time , Then return to null. If false, The block .
Two 、 summary
- This chapter does not cover all the knowledge of thread pool , Otherwise, a piece of content will be bloated . In this chapter we start with the handwritten thread pool , Step by step, analyze the code in the Java How is it implemented in the thread pool of , The knowledge involved is almost what we have introduced before , Include : queue 、CAS、AQS、 Reentrant lock 、 Exclusive lock, etc . So this knowledge is basically linked , It's better to have some foundation, otherwise it will be difficult to understand .
- In addition to what is described in this chapter , We haven't talked about thread destruction yet 、 Selection and use of four thread pool methods 、 And in CPU Intensive task 、IO How to configure for intensive tasks . In addition to Spring Also has its own thread pool method . These knowledge points are very close to the actual operation .
边栏推荐
- Latex中的单引号,双引号如何输入?
- tornado项目之路由装饰器
- Campus network problems
- When JDBC connects to es query, is there a God who meets the following situation?
- Detailed introduction to the big changes of Xcode 14
- 2022 - 021arts: début du deuxième semestre
- Transition technology from IPv4 to IPv6
- The idea of implementing charts chart view in all swiftui versions (1.0-4.0) was born
- window上用.bat文件启动项目
- 输入年份、月份,确定天数
猜你喜欢
notepad++如何统计单词数量
[web security] nodejs prototype chain pollution analysis
The idea of implementing charts chart view in all swiftui versions (1.0-4.0) was born
Chain ide -- the infrastructure of the metauniverse
A real penetration test
Review of enterprise security incidents: how can enterprises do a good job in preventing source code leakage?
The final week, I split
[GF (q) + LDPC] regular LDPC coding and decoding design and MATLAB simulation based on the GF (q) field of binary graph
Status of the thread
Solution of running crash caused by node error
随机推荐
Blue Bridge Cup Quick sort (code completion)
notepad++如何统计单词数量
A new understanding of how to encrypt industrial computers: host reinforcement application
The idea of implementing charts chart view in all swiftui versions (1.0-4.0) was born
Novel website program source code that can be automatically collected
jdbc连接es查询的时候,有遇到下面这种情况的大神嘛?
2022-021ARTS:下半年開始
Cochez une colonne d'affichage dans une colonne de tableau connue
Zephyr 学习笔记1,threads
the input device is not a TTY. If you are using mintty, try prefixing the command with ‘winpty‘
博客停更声明
Industrial computer anti-virus
Zephyr Learning note 2, Scheduling
Introduction to rce in attack and defense world
Selenium ide plug-in download, installation and use tutorial
Pangu open source: multi support and promotion, the wave of chip industry
[Android reverse] function interception (use cache_flush system function to refresh CPU cache | refresh CPU cache disadvantages | recommended time for function interception)
window上用.bat文件启动项目
输入年份、月份,确定天数
手写简易版flexible.js以及源码分析