当前位置:网站首页>Implementation principle and source code analysis of ThreadPoolExecutor thread pool
Implementation principle and source code analysis of ThreadPoolExecutor thread pool
2022-06-23 08:02:00 【Ricardo, Ricardo】
Catalog
1. Thread pool implementation principle
2. Thread pool class inheritance system
3.2 Core configuration parameters
3.3 The thread pool closes gracefully
3.3.1 The life cycle of the thread pool
3.3.2 Steps to properly close the thread pool
3.3.3 shutdown() and shutdowNow() The difference between
3.4 Task submission process analysis
3.5 Task execution process analysis
3.5.1 shutdown() Integrated analysis with task execution process
3.5.2 shutdownNow() Integrated analysis with task execution process
3.6 Four rejection strategies of thread pool
3.6.1 CallerRunsPolicy Strategy
3.6.4 DiscardOldestPolicy Strategy
1. Thread pool implementation principle
The following figure shows the implementation principle of thread pool : Callers constantly submit tasks to the thread pool ; There is a set of threads in the thread pool , Ceaselessly Fetch a task from a queue , This is a typical producer — Consumer model .

To implement such a thread pool , There are several questions to consider :
- Queue settings ? If it is unbounded , The caller keeps putting tasks into the queue , May cause memory exhaustion . If it is bounded , When the queue is full , How the caller handles ?
- The number of threads in the thread pool is fixed , Or dynamic ?
- Each time a new task is submitted , Yes, put in the queue ? Or start a new thread ?
- When there is no task , A thread is a short period of sleep ? Or into a jam ? If the entry is blocked , How to wake up ?
Aiming at problems 4, Yes 3 Methods :
- Do not block queues , Use only normal thread safe queues , No obstruction / Wake up mechanism . When the queue is empty , Line Threads in the process pool can only sleep for a while , Then wake up to see if there are any new tasks in the queue , So constantly polling .
- Do not block queues , But outside the queue 、 Blocking is implemented inside the thread pool / Wake up mechanism .
- Use blocking queue .
It is obvious that 3 The most perfect , Avoid self implementation blocking inside the thread pool / The trouble of wake-up mechanism , It also avoids the practice of 1 Sleep sleep / Resource consumption and delay caused by polling . Because of this ThreadPoolExector/ScheduledThreadPoolExecutor Are based on blocking queues , Not a general queue .
2. Thread pool class inheritance system

ad locum , There are two core classes : ThreadPoolExector and ScheduledThreadPoolExecutor , The latter can not only perform a task , You can also perform tasks periodically .
Each task submitted to the thread pool , All must be realized Runnable Interface , Through the top Executor Interface
execute(Runnable command) Submit tasks to the thread pool .
then , stay ExecutorService in , Defines the closing interface of thread pool shutdown() , It also defines tasks that can have return values , That is to say Callable.
3. The code analysis
3.1 Core data structure
public class ThreadPoolExecutor extends AbstractExecutorService {
// Thread pool state and Number of valid threads
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Blocking queues , For storing tasks
private final BlockingQueue<Runnable> workQueue;
// Mutually exclusive access control of various variables in the thread pool
private final ReentrantLock mainLock = new ReentrantLock();
// Thread set
private final HashSet<Worker> workers = new HashSet<Worker>();
//....
}Worker yes ThreadPoolExector The inner class of , Every Worker Store a thread
private final class Worker extends AbstractQueuedSynchronizer
implements Runnable{
// Encapsulating threads
final Thread thread;
// Initial task to run . May be empty .
Runnable firstTask;
//worker Number of completed tasks
volatile long completedTasks;
//.....
}Worker Inherited from AQS, in other words Worker Itself is a lock . This lock is used to close the thread pool 、 The process of a thread executing a task .
3.2 Core configuration parameters
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}Parameter interpretation :
1. corePoolSize: Number of core threads , The number of threads that are always maintained in the thread pool .
2. maxPoolSize: stay corePooSize Is full 、 When the queue is full , Extend thread to this value .
3. keepAliveTime/TimeUnit:maxPoolSize Idle threads in , The time required for destruction , The bus number shrinks
return corePoolSize.
4. blockingQueue: The queue type used by the thread pool .
5. threadFactory: Thread creation factory , You can customize , The default value is Executors.defaultThreadFactory() .
6. RejectedExecutionHandler:corePoolSize Is full , The queue is full ,maxPoolSize Is full , Final refusal
No strategy .
Configure the running process of parameters during task submission :
Step one : Determine whether the current number of threads is greater than or equal to the number of core threads corePoolSize. If it is less than , New thread perform ; If it is greater than , Step 2 .
Step two : Determine if the queue is full . If not full , Put it in the queue ; If full , Go to step 3 .
Step three : Judge whether the current number of threads is greater than or equal to maxPoolSize. If it is less than , New thread execution ; If Greater than , Step 4 .
Step four : Reject the task according to the rejection policy .
summary : First judgement corePoolSize, Second, judgment blockingQueue Whether is full , Then judge maxPoolSize, Finally, the rejection strategy is used .
3.3 The thread pool closes gracefully
Thread pool closure , More complex than thread shutdown .
When closing a thread pool , Some threads are still executing a task , Some callers are submitting tasks to the thread pool , And there may be unexecuted tasks in the queue .
therefore , The closing process cannot be instantaneous , It requires a smooth transition , This involves the full lifecycle management of the thread pool .
3.3.1 The life cycle of the thread pool
stay JDK 7 in , Number of threads (workerCount) And thread pool status (runState) These two variables are packaged and stored in a field , namely ctl Variable . As shown in the figure below , The highest 3 Bit storage thread pool status , rest 29 Number of bit storage threads . And in the JDK 6 in , These two variables are stored separately .


As you can see from the code ,ctl The variable is split in two , The highest 3 Bit represents the state of the thread pool , Low 29 Bit represents the number of threads .
There are five states of thread pool , Namely RUNNING、SHUTDOWN、STOP、TIDYING and TERMINATED.
The process of migrating between states , As shown in the figure :

There are two ways to close a thread pool ,shutdown() and shutdownNow(), These two methods will switch the thread pool to different states . When the queue is empty , After the thread pool is also empty , Get into TIDYING state ; Finally, execute a hook method terminated(), Get into TERMINATED state , The thread pool is really closed .
The state transition here has a very key feature : From small to large ,-1,0,1,2,3, Only small state values will migrate to large state values , No reverse migration . for example , When the thread pool state is TIDYING = 2 when , Next, you can only migrate to TERMINATED = 3, It is impossible to migrate back to STOP = 1 Or other states .
except terminated() outside , Thread pools also provide several other hook methods , The implementation of these methods is empty . If you want to implement your own thread pool , You can override these methods :
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }3.3.2 Steps to properly close the thread pool
The process of closing the thread pool is : Calling shutdown() perhaps shutdownNow() after , Thread pools do not shut down immediately , Next you need to call awaitTermination() To wait for the thread pool to close . The correct steps to close the thread pool are as follows :
executor.shutdown();
try {
boolean flag = true;
do {
flag = ! executor.awaitTermination(500, TimeUnit.MICROSECONDS);
}while (flag);
}catch (InterruptedException e){
//.....
}awaitTermination(...) The internal implementation of the method is very simple , As shown below . Continuously cycle to judge whether the process pool has reached the final state TERMINATED, If it is , Just go back to ; If not , Through termination Conditional variables are blocked for a period of time , Then continue to judge .
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}3.3.3 shutdown() and shutdowNow() The difference between
- shutdown() The task queue will not be emptied , Will wait until all tasks are completed ,shutdownNow() Clear the task queue .
- shutdown() Only idle threads will be interrupted ,shutdownNow() Will break all threads .
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
// Lock to ensure thread safety
mainLock.lock();
try {
// Check if you have permission to close the thread
checkShutdownAccess();
// Change the thread pool state to SHUTDOWN
advanceRunState(SHUTDOWN);
// Interrupt idle thread
interruptIdleWorkers();
// Hook method with empty cube
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
// Lock to ensure thread safety
mainLock.lock();
try {
// Check for permissions on the closed thread pool
checkShutdownAccess();
// Set the thread pool state to STOP
advanceRunState(STOP);
// Interrupt all threads
interruptWorkers();
// Clear the task queue
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}Next, let's look at the difference between interrupting idle threads and interrupting all threads :
shutdown() Methods interruptIdleWorkers() Method implementation :
// Interrupt idle thread
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
// Interrupt idle thread
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (ThreadPoolExecutor.Worker w : workers) {
Thread t = w.thread;
// If tryLock success , Indicates that the thread is idle ;
// If it doesn't work , Indicates that the thread holds the lock , Performing a task
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}shutdownNow() Methods interruptWorkers() Method implementation :
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (ThreadPoolExecutor.Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}The key difference is tryLock():
A thread executes a task before , Will be locked , This means by whether to hold the lock , You can determine whether the thread is idle .
tryLock() If the call succeeds , Indicates that the thread is idle , Send interrupt signal to it ; Otherwise do not send .
public boolean tryLock() { return tryAcquire(1); }
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}shutdownNow() Called interruptWorkers() Method :
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (ThreadPoolExecutor.Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}void interruptIfStarted() {
Thread t;
// All interrupts that are in operation and are not interrupted
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}shutdown() and shutdownNow() They all called. tryTerminate() Method , As shown below :
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
// When workQueue It's empty ,workCount = 0 when
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Switch the state to TIDYING state
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// Call hook function
terminated();
} finally {
// Change the status from TIDYING Change it to TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
// notice awaitTermination(.....)
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}tryTerminate() The thread pool will not be forcibly terminated , Just a test : When workerCount by 0,workerQueue by Space time , First switch the state to TIDYING, Then call the hook method. terminated(). When the hook method execution is complete , Change the state from TIDYING Change it to TERMINATED, Then call termination.sinaglAll(), Blocking in front of notification awaitTermination All caller threads .
therefore ,TIDYING and TREMINATED The difference is that a hook method is executed between the two terminated(), At present, it is an empty implementation .
3.4 Task submission process analysis
The task submission process is as follows :
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// The current thread is less than corePoolSize, Then start a new thread
if (workerCountOf(c) < corePoolSize) {
// add to worker, And will comman Set to Worker The first task of begins
if (addWorker(command, true))
return;
c = ctl.get();
}
// If the current thread is greater than or equal to corePoolSize, Call workerQueue.offer Put in queue
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// If the thread pool is stopping , Will command The task is removed from the queue , And refuse command Mission request
if (! isRunning(recheck) && remove(command))
reject(command);
// It is found that there is no thread executing the task when it is put into the queue , Start a new thread
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// The queue is full , And the number of threads is greater than maxPoolSize, Execute reject strategy
else if (!addWorker(command, false))
reject(command);
}
// Used to start a new thread , If core Parameter is true, Then use corePoolSize As a ceiling , Otherwise use maxPoolSize As a ceiling
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// If the thread pool status value is at least SHUTDOWN and STOP, Or the first task is not null, Or the work queue is empty
// Then add worker Failure , return false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// Maximum number of worker threads reached , Or corePoolSize Or maximumPoolSize, Failed to start thread
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// increase worker Quantity successful , Back to retry sentence
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
// If the running state of the thread pool is at least SHUTDOWN, Retry retry Tag statement ,CAS
if (runStateOf(c) != rs)
continue retry;
}
}
// worker Quantity plus 1 After success , Then run :
boolean workerStarted = false;
boolean workerAdded = false;
ThreadPoolExecutor.Worker w = null;
try {
// newly build worker object
w = new ThreadPoolExecutor.Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// Because the thread is already running , Can't start , Throw exceptions
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// Put the thread corresponding to worker Join in worker aggregate
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// If you add worker success , Start the worker The corresponding thread
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// If starting a new thread fails
if (! workerStarted)
// workCount - 1
addWorkerFailed(w);
}
return workerStarted;
}3.5 Task execution process analysis
In the task submission process above , May open a new Worker, And take the task itself as firstTask To be given Worker. But for a Worker Come on , Not just one task , Instead, it continuously takes tasks from the queue to execute , It's a cyclical process .
The following term Woker Of run() Method implementation .
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
// At present Worker Object encapsulated thread
final Thread thread;
// The first task that the thread needs to run . It can be null, If it is null, Then the thread gets the task from the queue
Runnable firstTask;
// Record the number of tasks completed by the thread , One counter per thread
volatile long completedTasks;
/**
* Use the given first task and create a thread factory Worker example
* @param firstTask The first task of the thread , without , Set it to null,
* At this point, the thread will be removed from the queue Access to task .
*/
Worker(Runnable firstTask) {
// Thread is blocked , call runWorker Interrupt when
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
// call ThreadPoolExecutor Of runWorker Method to execute the running of the thread
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// interrupt Worker Encapsulated threads
w.unlock();
boolean completedAbruptly = true;
try {
// If the initial task of the thread is not null, Or the task obtained from the queue is not null, Indicates that the thread should execute any service .
while (task != null || (task = getTask()) != null) {
// Get thread lock
w.lock();
// If the thread pool stops , Make sure the thread is interrupted
// If the thread pool is running , Ensure that the thread is not interrupted
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
// After getting the task , Check the thread pool status again , If you find that the thread pool has stopped , Send it to yourself Interrupt signal
wt.interrupt();
try {
// Hook method before task execution , Implementation is empty
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
// Hook method after task execution , Implementation is empty
afterExecute(task, thrown);
}
} finally {
// Task execution completed , take task Set to null
task = null;
// The number of tasks completed by the thread plus 1
w.completedTasks++;
// Release thread lock
w.unlock();
}
}
// Determine whether the thread exits normally
completedAbruptly = false;
} finally {
// Worker sign out
processWorkerExit(w, completedAbruptly);
}
}
}3.5.1 shutdown() Integrated analysis with task execution process
The execution process of the task and The above thread pool closing process is combined for analysis , When calling shutdown() When , The following scenarios may occur :
1. When calling shutdown() When , All threads are idle . This means that the task queue must be empty . here , All threads will block in getTask() Method place . then , All threads will receive interruptIdleWorkers() Interrupt signal from ,getTask() return null, all Worker Will quit while loop , After that processWorkerExit.
2. When calling shutdown() When , All threads are busy . here , The queue may be empty , It may also be non empty .interruptIdleWorkers() Inside tryLock Call failed , Do nothing , All threads will continue to perform their current tasks . After that, all threads will complete the tasks in the queue , Until the queue is empty ,getTask() Will return null. after , Just like the scene 1 The same , sign out while loop .
3. When calling shutdown() When , Some threads are busy , Some threads are idle . Some threads are idle , Indicates that the queue must be empty , These threads must be blocking getTask() Method place . These idle threads will be related to the scene 1 Handle it the same way , Threads that are not idle will be associated with the scene 2 Handle it the same way .
getTask() Internal details of the method :
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// If the thread pool calls shutdownNow(), return null
// If the thread pool calls shutdown(), And the task queue is empty , Also returned null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// Reduce the number of worker threads by one
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// If the queue is empty , It will block pool perhaps take, The former has a timeout , The latter has no timeout
// Once interrupted , Throw an exception here , Corresponding to the above scenario 1.
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}3.5.2 shutdownNow() Integrated analysis with task execution process
And the above shutdown() similar , Just one more link , That is, clear the task queue . If a thread is executing some business code , Even if an interrupt signal is sent to it , It doesn't work , We can only wait for it to finish executing the code . therefore , The difference between interrupting an idle thread and interrupting all threads is not great , Unless the thread is currently blocking somewhere .
When one Worker When you finally quit , Clean up will be carried out :
private void processWorkerExit(ThreadPoolExecutor.Worker w, boolean completedAbruptly) {
// If the thread exits normally , Not execute if The sentence of , This is usually an abnormal exit , Need to put worker Quantity minus one
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// To his own worker Remove from collection
workers.remove(w);
} finally {
mainLock.unlock();
}
// Each thread will call this method at the end , See if you can stop the thread pool
tryTerminate();
int c = ctl.get();
// If before the thread exits , The thread pool has not been closed yet
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// If there are no other threads in the thread pool , And the task queue is not empty
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// If the number of worker threads is greater than min, Indicates that tasks in the queue can be executed by other threads , Exit the current thread
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// If the thread pool does not end before the current thread exits , The task queue is not empty , There are no other threads to execute
// Just start another thread to process .
addWorker(null, false);
}
}3.6 Four rejection strategies of thread pool
stay execute(Runnable command) Last , Called reject(command) Execute reject strategy , The code is as follows :
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}handler This is the rejection policy manager that we can set :
/**
* Handler called when saturated or shutdown in execute.
*/
private volatile RejectedExecutionHandler handler;
// Default rejection policy
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}3.6.1 CallerRunsPolicy Strategy
Strategy : Who requests who handles
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}3.6.2 AbortPolicy Strategy
Strategy : Throw exceptions
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}3.6.3 DiscardPolicy Strategy
Strategy : Directly discard and do not deal with
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}3.6.4 DiscardOldestPolicy Strategy
Strategy : Abandon the oldest task
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}4. Executors Tool class
concurrent The package provided. Executors Tool class , It can be used to create various types of thread pools .
4.1 Thread pool comparison
Single threaded thread pool :
public static ExecutorService newSingleThreadExecutor() {
return new Executors.FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}A fixed number of thread pools :
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}Every request received , Just create a thread to execute :
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}Single thread thread thread pool with periodic scheduling function :
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new Executors.DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}Multithreading , Thread pool with scheduling function :
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}Different types of thread pools , In fact, they are all configured by the previous key configuration parameters .
4.2 Best practices
stay 《 Alibaba Java Development Manual 》 in , The use of Executors Creating a thread pool , And require developers to use ThreadPoolExector or ScheduledThreadPoolExecutor Create . This is to force developers to specify the running strategy of thread pool , Avoid the risk of resource exhaustion due to improper use .
边栏推荐
- On ThreadLocal and inheritablethreadlocal, source code analysis
- 11 string function
- Distributed ID generation
- ThreadPoolExecutor线程池实现原理与源码解析
- Ignore overlength parameter violation
- Test APK exception control nettraffic attacker development
- 爬虫框架
- [kubernetes] download address of the latest version of each major version of kubernetes
- 数学知识:快速幂—快速幂
- Apache Solr arbitrary file read replication
猜你喜欢
随机推荐
Tensorboard的使用
openni. utils. OpenNIError: (OniStatus.ONI_STATUS_ERROR, b‘DeviceOpen using default: no devices found‘
socket编程——select模型
C# 内存法复制图像bitmap
odoo项目 发送信息到微信公众号或企业微信的做法
MFC radio button grouping
一篇文章学会er图绘制
Apache Solr arbitrary file read replication
【Kubernetes】Kubernetes各大版本的最新版本下载地址
Friends of the week
vtk.js鼠标左键滑动改变窗位和窗宽
2022 final examination of software project management of School of software, Shandong University (recall version)
What is edge cloud?
socket编程(多进程)
Take you to tiktok. That's it
unity 音频可视化方案
爬虫框架
浅谈ThreadLocal和InheritableThreadLocal,源码解析
套接字socket编程
快速排序 + 冒泡排序 + 插入排序 + 选择排序









