Abstract : Start with handwriting thread pool , Step by step, analyze the code in the Java How is it implemented in the thread pool of .

This article is shared from Huawei cloud community 《 Handwriting thread pool , Contrast learning ThreadPoolExecutor Thread pool implementation principle !》, author : Little brother Fu .

Thank you plane , Notes !, The last time I lost was on the thread , Is it possible to pit twice at a time !

Thank you plane : You ask , I'm ready !!!

interviewer : Um. , How thread pool state is designed for storage ?

Thank you plane : this ! next , next !

interviewer :Worker Implementation class of , Why not use ReentrantLock To implement? , It's your own inheritance AQS?

Thank you plane : I …!

interviewer : Let's briefly describe it ,execute The implementation of the process !

Thank you plane : bye !

One 、 Thread pool explanation

1. Let's take an example

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10));
threadPoolExecutor.execute(() -> {
System.out.println("Hi Thread pool !");
});
threadPoolExecutor.shutdown(); // Executors.newFixedThreadPool(10);
// Executors.newCachedThreadPool();
// Executors.newScheduledThreadPool(10);
// Executors.newSingleThreadExecutor();

This is an example of creating a thread pool , I believe you've used it many times .

The core purpose of thread pool is the utilization of resources , Avoid resource consumption caused by repeatedly creating threads . So the idea of pooling technology is introduced , Avoid creating repeatedly 、 Performance overhead of destruction .

that , Next, we will analyze the structure of the pool through practice , See how it handles threads .

2. Write a thread pool

2.1 Implementation process

In order to better understand and analyze the source code of thread pool , Let's follow the thread pool idea first , Write a very simple thread pool .

In fact, most of the time, the core main logic of a piece of functional code may not be very complex , But in order for the core process to run smoothly , You need to add many additional branches of the auxiliary process . Like I always say , To protect my hands, I made my ass paper so big !

About the figure 21-1, The implementation of this handwritten thread pool is also very simple , It only reflects the core process , Include :

  1. Yes n Running threads , This is equivalent to the thread pool size allowed when we create the thread pool .
  2. Submit the thread to the thread pool to run .
  3. If the running thread pool is full , Put the thread in the queue .
  4. Finally, when there is free time , Get the threads in the queue to run .

2.2 Implementation code

public class ThreadPoolTrader implements Executor {

    private final AtomicInteger ctl = new AtomicInteger(0);

    private volatile int corePoolSize;
private volatile int maximumPoolSize; private final BlockingQueue<Runnable> workQueue; public ThreadPoolTrader(int corePoolSize, int maximumPoolSize, BlockingQueue<Runnable> workQueue) {
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
} @Override
public void execute(Runnable command) {
int c = ctl.get();
if (c < corePoolSize) {
if (!addWorker(command)) {
reject();
}
return;
}
if (!workQueue.offer(command)) {
if (!addWorker(command)) {
reject();
}
}
} private boolean addWorker(Runnable firstTask) {
if (ctl.get() >= maximumPoolSize) return false; Worker worker = new Worker(firstTask);
worker.thread.start();
ctl.incrementAndGet();
return true;
} private final class Worker implements Runnable { final Thread thread;
Runnable firstTask; public Worker(Runnable firstTask) {
this.thread = new Thread(this);
this.firstTask = firstTask;
} @Override
public void run() {
Runnable task = firstTask;
try {
while (task != null || (task = getTask()) != null) {
task.run();
if (ctl.get() > maximumPoolSize) {
break;
}
task = null;
}
} finally {
ctl.decrementAndGet();
}
} private Runnable getTask() {
for (; ; ) {
try {
System.out.println("workQueue.size:" + workQueue.size());
return workQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} private void reject() {
throw new RuntimeException("Error!ctl.count:" + ctl.get() + " workQueue.size:" + workQueue.size());
} public static void main(String[] args) {
ThreadPoolTrader threadPoolTrader = new ThreadPoolTrader(2, 2, new ArrayBlockingQueue<Runnable>(10)); for (int i = 0; i < 10; i++) {
int finalI = i;
threadPoolTrader.execute(() -> {
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(" Task number :" + finalI);
});
}
} } // test result Task number :1
Task number :0
workQueue.size:8
workQueue.size:8
Task number :3
workQueue.size:6
Task number :2
workQueue.size:5
Task number :5
workQueue.size:4
Task number :4
workQueue.size:3
Task number :7
workQueue.size:2
Task number :6
workQueue.size:1
Task number :8
Task number :9
workQueue.size:0
workQueue.size:0

above , The implementation of thread pool is very simple , The core idea of pooling can be reflected from the test results . The main function logic includes :

  • ctl, Used to record the number of threads in the thread pool .
  • corePoolSize、maximumPoolSize, Used to limit thread pool capacity .
  • workQueue, Thread pool queue , That is, the threads that can't be run in time , Will be loaded into this queue .
  • execute, For commit thread , This is a general interface method . In this method, the main implementation is , The current thread submitted is added to worker、 Queue or abandon .
  • addWorker, Mainly class Worker The specific operation of , Create and execute threads . It also includes getTask() Method , In other words, the queue is constantly getting the threads that have not been executed .

good , What about the above , This is the concrete embodiment of this simple thread pool implementation . But if you think about it, you'll find that there's a lot to be done here , such as : What about the thread pool status , It's impossible to run all the time !?、 What about the thread pool lock , Will there be no concurrency issues ?、 What about the policy after thread pool rejection ?, None of these problems have been solved in the mainstream process , Because there is no such process , So the code above is easier to understand .

Next , We start to analyze the source code of thread pool , Compared with our simple thread pool reference , It's easier to understand !

3. Thread pool source code analysis

3.1 Thread pool class diagram

Around the core class ThreadPoolExecutor The implementation of the expanded class between the implementation and inheritance relationship , Pictured 21-2 Thread pool class diagram .

  • Interface Executor、ExecutorService, The basic method of defining thread pool . In especial execute(Runnable command) Submit route pool method .
  • abstract class AbstractExecutorService, The basic general interface method is realized .
  • ThreadPoolExecutor, It is the core tool class method of the whole thread pool , All other classes and interfaces , To provide functionality around this class .
  • Worker, It's a task class , That is, the method of the thread of final execution .
  • RejectedExecutionHandler, Is the deny policy interface , There are four implementation classes ;AbortPolicy( Reject by throwing exception )、DiscardPolicy( Direct discarding )、DiscardOldestPolicy( Discard the longest lived mission )、CallerRunsPolicy( Who submits and who executes ).
  • Executors, Is a thread pool used to create different strategies that we use ,newFixedThreadPool、newCachedThreadPool、newScheduledThreadPool、newSingleThreadExecutor.

3.2 high 3 Bit and low 29 position

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1; private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

stay ThreadPoolExecutor In the thread pool implementation class , Use AtomicInteger Type of ctl Record the thread pool status and the number of thread pools . Record multiple values on a type , It uses the segmentation of data regions , high 3 Bit record status , low 29 Number of bit storage threads , Default RUNNING state , The number of threads is 0 individual .

3.2 Thread pool state

chart 22-4 It is the state flow relationship in the thread pool , This includes the following states :

  • RUNNING: Running state , Accept new tasks and process tasks in the queue .
  • SHUTDOWN: The closed position ( Called shutdown Method ). No new assignments ,, But to deal with tasks in the queue .
  • STOP: Stop state ( Called shutdownNow Method ). No new assignments , Nor do the tasks in the queue , And interrupt the task being processed .
  • TIDYING: All missions have been terminated ,workerCount by 0, When the thread pool enters this state, it will call terminated() Methods into the TERMINATED state .
  • TERMINATED: Termination status ,terminated() The state after the end of a method call .

3.3 Commit thread (execute)

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

When reading this part of the source code , We can refer to our own thread pool . In fact, the ultimate goal is the same , This is the submitted thread , Start execution 、 Join the queue 、 Decision making strategy , These three ways .

  • ctl.get(), Take the value of recording the thread state and the number of threads , Finally, the method is needed workerCountOf(), To get the current number of threads .`workerCountOf Execution is c & CAPACITY operation
  • According to the number of threads in the current thread pool , And core threads corePoolSize comparing , If less than, the thread is added to the task execution queue .
  • If the number of threads is full , Then we need to judge whether the thread pool is in the running state isRunning(c). If it is running, the thread that cannot be executed is put into the thread queue .
  • After putting into the thread queue , And whether the operation needs to be rerun , If not running and removed , Then reject the policy . Otherwise, the number of threads is determined as 0 Add new thread after .
  • Finally, try to add task execution again , At this point the method addWorker The second input parameter of is false, Finally, it will affect the judgment of the number of tasks to be executed . If the addition fails, reject the policy .

3.5 Add execution task (addWorker)

private boolean addWorker(Runnable firstTask, boolean core)

The first part 、 Increase the number of threads

retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

The first part 、 Create start thread

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;

The process of adding execution tasks can be divided into two parts , The code above is used to record the number of threads 、 The following code section creates the execution thread in the exclusive lock and starts . This part of the code is not looking at the lock 、CAS Wait for the operation , So it's basically the same as the first handwritten thread pool

  • if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty())), Determine the current thread pool state , Is it SHUTDOWN、STOP、TIDYING、TERMINATED One of them . And the current status is SHUTDOWN、 And the task passed in is null, And the queue is not empty . So return false.
  • compareAndIncrementWorkerCount,CAS operation , Increase the number of threads , Success will jump out of the marked loop .
  • runStateOf(c) != rs, Finally, the thread pool state judgment , Decide whether to cycle or not .
  • After the number of thread pools is successfully recorded , You need to enter the locking process , Create execution thread , And record the status . In the end, if it is judged that the startup is not successful , You need to execute addWorkerFailed Method , Remove to thread method and other operations .

3.6 Execute thread (runWorker)

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // Allow the interrupt
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null)
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

Actually , There's a foundation for handwritten threads , At this point, we have a basic understanding , What's the thread pool doing . The core point here is task.run() Let the thread run . Some additional processes are as follows ;

  • beforeExecute、afterExecute, Do some statistics before and after thread execution .
  • In addition, the lock operation here is Worker Inherit AQS A non reentrant exclusive lock implemented by itself .
  • processWorkerExit, If you're interested , Similar methods can also be used to learn more about . When the thread exits workers Do some removal processing and the number of tasks completed , It's also very interesting

3.7 Queue get task (getTask)

If you've started reading the source code , Can be in runWorker In the method , See a loop code like this while (task != null || (task = getTask()) != null). This is the same way that we operate in the handwritten thread pool , The core purpose is to get thread methods from the queue .

private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
  • getTask Method gets the task waiting to be executed from the blocking queue , That is to say, take out threads one by one .
  • if (rs >= SHUTDOWN ..., Determine whether the thread is shut down .
  • wc = workerCountOf(c),wc > corePoolSize, If the number of worker threads exceeds the number of core threads corePoolSize also workQueue Not empty , Add worker threads . But if the timeout does not get the thread , Will be greater than corePoolSize The thread is destroyed .
  • timed, yes allowCoreThreadTimeOut from . Final timed by true when , By blocking the queue poll Method to control the timeout .
  • If in keepAliveTime No task was obtained in time , Then return to null. If false, The block .

Two 、 summary

  • This chapter does not cover all the knowledge of thread pool , Otherwise, a piece of content will be bloated . In this chapter we start with the handwritten thread pool , Step by step, analyze the code in the Java How is it implemented in the thread pool of , The knowledge involved is almost what we have introduced before , Include : queue 、CAS、AQS、 Reentrant lock 、 Exclusive lock, etc . So this knowledge is basically linked , It's better to have some foundation, otherwise it will be difficult to understand .
  • In addition to what is described in this chapter , We haven't talked about thread destruction yet 、 Selection and use of four thread pool methods 、 And in CPU Intensive task 、IO How to configure for intensive tasks . In addition to Spring Also has its own thread pool method . These knowledge points are very close to the actual operation .

Click to follow , The first time to learn about Huawei's new cloud technology ~

Write a thread pool , Take you to study ThreadPoolExecutor More related articles on the implementation principle of thread pool

  1. Handwriting thread pool , Contrast learning ThreadPoolExecutor Thread pool implementation principle !

    author : Little brother Fu Blog :https://bugstack.cn Github:https://github.com/fuzhengwei/CodeGuide/wiki precipitation . Share . grow up , Let yourself and others have ...

  2. Write a mini Web The server

    Today we will follow the example of Tomcat Server to write the simplest and miniest version of web The server , Learning exchange only . 1. In you windows System disk F Under the plate , Create a folder webroot, Used to store front-end code .  2. Code Introduction : ( ...

  3. 【 primary 】 Writing a promise

    In the last article , We introduced Promise Basic use of , In this article , We're trying to write one ourselves Promise, It's mainly about learning Promise Internal mechanism , Learn its programming ideas . !!! remarks : This article is not well written , For your own learning only , ...

  4. 『 try 』 Write an independent Json Algorithm JsonHelper

    background : > Always use  Newtonsoft.Json.dll It's quite stable . > But this framework is also very disturbing : > 1. Impact compilation failure :https://www.cnblogs.com/zih ...

  5. Write a simple one by hand ElasticSearch SQL converter ( One )

    One . Preface There was a need , Is to make ElasticSearch Support use SQL Simple query , Newer versions of ES This feature is already supported ( But it seems to be experimental ?) , and git There are also elasticsearch-sql plug-in unit , ...

  6. Look at the annual salary 50W How to write a SpringMVC frame

    Preface do Java Web Developed by you , I must have heard of SpringMVC The name of , As the most widely used Java frame , So far, it still maintains a strong vitality and a wide user base . How does this work eclipse Build it step by step S ...

  7. Used message queuing ?Kafka? Can I write a message queue ? meng

    Do you have the same experience ? The interviewer asks you what projects you have done , I'll have a good talk , The project takes advantage of message queuing ,kafka,rocketMQ wait . well , Please start your show , The interviewer handed over a pen : Hand me a message queue !!WHAT? For you to meet ...

  8. Analyze handwriting Vue, You can also write a MVVM frame

    Analyze handwriting Vue, You can also write a MVVM frame # mailbox :[email protected] github: https://github.com/xiaoqiuxiong author : Xiao Qiuxiong (eddy) Thank you very much ...

  9. webview A brief introduction and handwritten one H5 Jacketed webview

    1.webview What is it? ? What is the role ? What does it have to do with browsers ? Webview It's based on webkit engine , Can be parsed DOM Elements , Exhibition html Page controls , It works the same way that browsers display pages , So think of it as ...

  10. The CARDS on the table ! I'm going to write a “Spring Boot”

    Current words , Have the Spring MVC Related common notes such as @GetMapping [email protected] [email protected] Finished writing . I've also made the project open source , Address :https://gi ...

Random recommendation

  1. JS Common methods of recording

    // Yes Object Of num Field to sort var compare = function (propertyName) { return function (object1, object2) { var ...

  2. compile ITK

    [2016 year 7 month 4 Zhou ] compile ITK 1. Download the required files  InsightToolkit-4.8.1.cmake 2.cmake compile modify CMAKE_INSTALL_PREFIX Configure to the directory to be generated . ...

  3. shell Medium &gt; File redirection and The standard input 、 Output 、 Mistakes and 2&amp;1 The meaning of *

    http://www.cnblogs.com/chenmh/p/5382044.html ask : among Of 2>&1 What's going on? ? . test.sh > test.log 2>&a ...

  4. mysql Use in count() The peculiarity of Statistics

    If you need to count the total number , Why use count(*), Avoid specifying specific column names ? count() When the parameters in the function are column names , Then we calculate the number of times we have a value . That is to say , Items that do not have a value in this column do not enter the calculation range . such ...

  5. Spring( 3、 ... and )——AOP

    AOP Full name Aspect-Oriented Programming, It means programming for the cross section , We've introduced     Programming for cross sections AOP The understanding of the  , We can easily extend our application through this programming idea . One , how ...

  6. ZeroClipboard plug-in unit : Compatible with the web page copy function of all browsers

    Conventional utilization JS The page copy function written is only for IE It works , Can't be compatible with other browsers , The code is as follows : function copyToClipBoard(){ var clipBoardContent="" ...

  7. Gulp Automatic construction of front-end development integration

    gulp Is based on Nodejs Automatic task runner for , She can do it automatically javascript/coffee/sass/less/html/image/css And so on . Check . Merge . Compress . format . Browser from ...

  8. 《 Wechat app on the 7th 》- Sixth days : Applet devtool Hidden secrets

    < Wechat app on the 7th > Series articles : The first day : Life if only as first see : the second day : You may want to get rid of the original responsive development thinking : On the third day : Get along well with Page Life cycle of components : The fourth day : Page path up to five layers ? Navigation can play like this : The fifth ...

  9. Linux Instructions -- Performance monitoring and optimization commands related instructions

    The source of the original text is :http://www.cnblogs.com/peida/archive/2012/12/05/2803591.html. Thank you for sharing Performance monitoring and optimization command related instructions are :top,free,v ...

  10. MyBatis(1)—— Quick start

    MyBatis brief introduction MyBatis This is a apache An open source project of iBatis, 2010 This project was initiated by apache software foundation Moved to google code, And renamed it ...