当前位置:网站首页>Implementation principle and source code analysis of ThreadPoolExecutor thread pool

Implementation principle and source code analysis of ThreadPoolExecutor thread pool

2022-06-23 08:02:00 Ricardo, Ricardo

Catalog

1. Thread pool implementation principle

2. Thread pool class inheritance system

3. The code analysis

        3.1 Core data structure

        3.2 Core configuration parameters

        3.3 The thread pool closes gracefully

        3.3.1 The life cycle of the thread pool

        3.3.2 Steps to properly close the thread pool

        3.3.3 shutdown() and shutdowNow() The difference between  

        3.4 Task submission process analysis

        3.5 Task execution process analysis

        3.5.1  shutdown() Integrated analysis with task execution process

        3.5.2  shutdownNow() Integrated analysis with task execution process

        3.6 Four rejection strategies of thread pool

        3.6.1 CallerRunsPolicy Strategy

        3.6.2 AbortPolicy Strategy

        3.6.3 DiscardPolicy Strategy

        3.6.4 DiscardOldestPolicy Strategy

4. Executors Tool class

        4.1 Thread pool comparison

        4.2 Best practices


1. Thread pool implementation principle

         The following figure shows the implementation principle of thread pool : Callers constantly submit tasks to the thread pool ; There is a set of threads in the thread pool , Ceaselessly Fetch a task from a queue , This is a typical producer — Consumer model .

To implement such a thread pool , There are several questions to consider :

  1. Queue settings ? If it is unbounded , The caller keeps putting tasks into the queue , May cause memory exhaustion . If it is bounded , When the queue is full , How the caller handles ?
  2. The number of threads in the thread pool is fixed , Or dynamic ?
  3. Each time a new task is submitted , Yes, put in the queue ? Or start a new thread ?
  4. When there is no task , A thread is a short period of sleep ? Or into a jam ? If the entry is blocked , How to wake up ?

Aiming at problems 4, Yes 3 Methods :

  1. Do not block queues , Use only normal thread safe queues , No obstruction / Wake up mechanism . When the queue is empty , Line     Threads in the process pool can only sleep for a while , Then wake up to see if there are any new tasks in the queue , So constantly polling .
  2. Do not block queues , But outside the queue 、 Blocking is implemented inside the thread pool / Wake up mechanism .
  3. Use blocking queue . 

         It is obvious that 3 The most perfect , Avoid self implementation blocking inside the thread pool / The trouble of wake-up mechanism , It also avoids the practice of 1 Sleep sleep / Resource consumption and delay caused by polling . Because of this ThreadPoolExector/ScheduledThreadPoolExecutor Are based on blocking queues , Not a general queue .

2. Thread pool class inheritance system

         ad locum , There are two core classes : ThreadPoolExector  and  ScheduledThreadPoolExecutor , The latter can not only perform a task , You can also perform tasks periodically .

         Each task submitted to the thread pool , All must be realized  Runnable  Interface , Through the top  Executor  Interface
execute(Runnable command)  Submit tasks to the thread pool .

         then , stay  ExecutorService  in , Defines the closing interface of thread pool  shutdown() , It also defines tasks that can have return values , That is to say  Callable.

3. The code analysis

        3.1 Core data structure

public class ThreadPoolExecutor extends AbstractExecutorService {

    // Thread pool state   and   Number of valid threads 
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 
    
    //  Blocking queues , For storing tasks  
    private final BlockingQueue<Runnable> workQueue; 
    
    //  Mutually exclusive access control of various variables in the thread pool  
    private final ReentrantLock mainLock = new ReentrantLock(); 
    
    //  Thread set  
    private final HashSet<Worker> workers = new HashSet<Worker>();

    //....

}

 Worker yes ThreadPoolExector The inner class of , Every Worker Store a thread

private final class Worker extends AbstractQueuedSynchronizer 
    implements Runnable{

        // Encapsulating threads 
        final Thread thread;

        // Initial task to run . May be empty .
        Runnable firstTask;
        
        //worker Number of completed tasks 
        volatile long completedTasks;

        //.....

    }

        Worker Inherited from AQS, in other words Worker Itself is a lock . This lock is used to close the thread pool 、 The process of a thread executing a task .

        3.2 Core configuration parameters

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

Parameter interpretation : 

1. corePoolSize: Number of core threads , The number of threads that are always maintained in the thread pool .
2. maxPoolSize: stay corePooSize Is full 、 When the queue is full , Extend thread to this value .
3. keepAliveTime/TimeUnit:maxPoolSize Idle threads in , The time required for destruction , The bus number shrinks
    return corePoolSize.
4. blockingQueue: The queue type used by the thread pool .
5. threadFactory: Thread creation factory , You can customize , The default value is Executors.defaultThreadFactory() .
6. RejectedExecutionHandler:corePoolSize Is full , The queue is full ,maxPoolSize Is full , Final refusal
    No strategy .

Configure the running process of parameters during task submission :

Step one : Determine whether the current number of threads is greater than or equal to the number of core threads corePoolSize. If it is less than , New thread                 perform ; If it is greater than , Step 2 .

Step two : Determine if the queue is full . If not full , Put it in the queue ; If full , Go to step 3 .

Step three : Judge whether the current number of threads is greater than or equal to maxPoolSize. If it is less than , New thread execution ; If                   Greater than , Step 4 .

Step four : Reject the task according to the rejection policy .

summary : First judgement corePoolSize, Second, judgment blockingQueue Whether is full , Then judge maxPoolSize,                Finally, the rejection strategy is used .

        3.3 The thread pool closes gracefully

         Thread pool closure , More complex than thread shutdown .

         When closing a thread pool , Some threads are still executing a task , Some callers are submitting tasks to the thread pool , And there may be unexecuted tasks in the queue .

         therefore , The closing process cannot be instantaneous , It requires a smooth transition , This involves the full lifecycle management of the thread pool .

        3.3.1 The life cycle of the thread pool

         stay JDK 7 in , Number of threads (workerCount) And thread pool status (runState) These two variables are packaged and stored in a field , namely ctl Variable . As shown in the figure below , The highest 3 Bit storage thread pool status , rest 29 Number of bit storage threads . And in the JDK 6 in , These two variables are stored separately .

  

         As you can see from the code ,ctl The variable is split in two , The highest 3 Bit represents the state of the thread pool , Low 29 Bit represents the number of threads .

         There are five states of thread pool , Namely RUNNING、SHUTDOWN、STOP、TIDYING and TERMINATED.

The process of migrating between states , As shown in the figure :

         There are two ways to close a thread pool ,shutdown()  and  shutdownNow(), These two methods will switch the thread pool to different states . When the queue is empty , After the thread pool is also empty , Get into TIDYING state ; Finally, execute a hook method terminated(), Get into TERMINATED state , The thread pool is really closed .
         The state transition here has a very key feature : From small to large ,-1,0,1,2,3, Only small state values will migrate to large state values , No reverse migration . for example , When the thread pool state is TIDYING = 2 when , Next, you can only migrate to TERMINATED = 3, It is impossible to migrate back to STOP = 1 Or other states .
         except terminated() outside , Thread pools also provide several other hook methods , The implementation of these methods is empty . If you want to implement your own thread pool , You can override these methods :

protected void beforeExecute(Thread t, Runnable r) { }

protected void afterExecute(Runnable r, Throwable t) { }

protected void terminated() { }

        3.3.2 Steps to properly close the thread pool

         The process of closing the thread pool is : Calling shutdown()  perhaps  shutdownNow()  after , Thread pools do not shut down immediately , Next you need to call awaitTermination() To wait for the thread pool to close . The correct steps to close the thread pool are as follows :

executor.shutdown();

try {
   boolean flag = true;
   do {
      flag = ! executor.awaitTermination(500, TimeUnit.MICROSECONDS);
   }while (flag);
}catch (InterruptedException e){
   //.....
}

        awaitTermination(...)  The internal implementation of the method is very simple , As shown below . Continuously cycle to judge whether the process pool has reached the final state TERMINATED, If it is , Just go back to ; If not , Through  termination  Conditional variables are blocked for a period of time , Then continue to judge .

public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                if (nanos <= 0)
                    return false;
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
}

        3.3.3 shutdown() and shutdowNow() The difference between  

  • shutdown()  The task queue will not be emptied , Will wait until all tasks are completed ,shutdownNow()  Clear the task queue .
  • shutdown()  Only idle threads will be interrupted ,shutdownNow()  Will break all threads .
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    // Lock to ensure thread safety 
    mainLock.lock();
    try {
        // Check if you have permission to close the thread 
        checkShutdownAccess();
        // Change the thread pool state to SHUTDOWN
        advanceRunState(SHUTDOWN);
        // Interrupt idle thread 
        interruptIdleWorkers();
        // Hook method with empty cube 
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    // Lock to ensure thread safety 
    mainLock.lock();
    try {
        // Check for permissions on the closed thread pool 
        checkShutdownAccess();
        // Set the thread pool state to STOP
        advanceRunState(STOP);
        // Interrupt all threads 
        interruptWorkers();
        // Clear the task queue 
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

Next, let's look at the difference between interrupting idle threads and interrupting all threads :

        shutdown()  Methods  interruptIdleWorkers()  Method implementation :

// Interrupt idle thread 
private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}

// Interrupt idle thread 
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (ThreadPoolExecutor.Worker w : workers) {
            Thread t = w.thread;
            // If tryLock success , Indicates that the thread is idle ;
            // If it doesn't work , Indicates that the thread holds the lock , Performing a task 
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

         shutdownNow()  Methods interruptWorkers()  Method implementation :

private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (ThreadPoolExecutor.Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}

The key difference is tryLock()

         A thread executes a task before , Will be locked , This means by whether to hold the lock , You can determine whether the thread is idle .

        tryLock()  If the call succeeds , Indicates that the thread is idle , Send interrupt signal to it ; Otherwise do not send .

public boolean tryLock()  { return tryAcquire(1); }

protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}

shutdownNow() Called interruptWorkers() Method :

private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (ThreadPoolExecutor.Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}
void interruptIfStarted() {
    Thread t;
    // All interrupts that are in operation and are not interrupted 
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}

shutdown() and shutdownNow()  They all called. tryTerminate() Method , As shown below :

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        // When workQueue It's empty ,workCount = 0  when 
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //  Switch the state to TIDYING state 
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // Call hook function 
                    terminated();
                } finally {
                    // Change the status from  TIDYING  Change it to  TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    // notice awaitTermination(.....)
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
    }
}

        tryTerminate() The thread pool will not be forcibly terminated , Just a test : When workerCount by 0,workerQueue by   Space time , First switch the state to  TIDYING, Then call the hook method.  terminated(). When the hook method execution is complete , Change the state from TIDYING Change it to TERMINATED, Then call  termination.sinaglAll(), Blocking in front of notification  awaitTermination  All caller threads .

         therefore ,TIDYING  and TREMINATED  The difference is that a hook method is executed between the two terminated(), At present, it is an empty implementation .

        3.4 Task submission process analysis

         The task submission process is as follows :

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    //  The current thread is less than corePoolSize,  Then start a new thread 
    if (workerCountOf(c) < corePoolSize) {
        //  add to worker,  And will comman Set to Worker The first task of begins 
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //  If the current thread is greater than or equal to corePoolSize,  Call workerQueue.offer Put in queue 
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        //  If the thread pool is stopping , Will command The task is removed from the queue , And refuse command Mission request 
        if (! isRunning(recheck) && remove(command))
            reject(command);
        //  It is found that there is no thread executing the task when it is put into the queue , Start a new thread 
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //  The queue is full , And the number of threads is greater than maxPoolSize,  Execute reject strategy 
    else if (!addWorker(command, false))
        reject(command);
}


//  Used to start a new thread , If core Parameter is true,  Then use corePoolSize As a ceiling , Otherwise use maxPoolSize As a ceiling 
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        //  If the thread pool status value is at least  SHUTDOWN  and  STOP, Or the first task is not null, Or the work queue is empty  
        //  Then add worker Failure , return false
        if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                        firstTask == null &&
                        ! workQueue.isEmpty()))
            return false;

        for (;;) {
            //  Maximum number of worker threads reached , Or  corePoolSize  Or  maximumPoolSize, Failed to start thread 
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //  increase worker Quantity successful , Back to retry sentence 
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();
            //  If the running state of the thread pool is at least SHUTDOWN, Retry retry Tag statement ,CAS
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    // worker Quantity plus 1 After success , Then run :
    boolean workerStarted = false;
    boolean workerAdded = false;
    ThreadPoolExecutor.Worker w = null;
    try {
        //  newly build worker object 
        w = new ThreadPoolExecutor.Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                    //  Because the thread is already running , Can't start , Throw exceptions 
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //  Put the thread corresponding to worker Join in worker aggregate 
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            //  If you add worker success , Start the worker The corresponding thread 
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //  If starting a new thread fails 
        if (! workerStarted)
            // workCount - 1
            addWorkerFailed(w);
    }
    return workerStarted;
}

        3.5 Task execution process analysis

         In the task submission process above , May open a new Worker, And take the task itself as firstTask To be given Worker. But for a Worker Come on , Not just one task , Instead, it continuously takes tasks from the queue to execute , It's a cyclical process .

         The following term  Woker  Of  run()  Method implementation .

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {

        //  At present Worker Object encapsulated thread 
        final Thread thread;

        //  The first task that the thread needs to run . It can be null, If it is null, Then the thread gets the task from the queue 
        Runnable firstTask;

        //  Record the number of tasks completed by the thread , One counter per thread 
        volatile long completedTasks;

        /** 
          *  Use the given first task and create a thread factory Worker example  
          * @param firstTask  The first task of the thread , without , Set it to null,
          *                   At this point, the thread will be removed from the queue   Access to task .
          */
        Worker(Runnable firstTask) {
            //  Thread is blocked , call runWorker Interrupt when 
            setState(-1);
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        //  call ThreadPoolExecutor Of runWorker Method to execute the running of the thread 
        public void run() {
            runWorker(this);
        }


        final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;

            //  interrupt Worker Encapsulated threads 
            w.unlock();
            boolean completedAbruptly = true;
            try {

                //  If the initial task of the thread is not null, Or the task obtained from the queue is not null, Indicates that the thread should execute any   service .
                while (task != null || (task = getTask()) != null) {
                    //  Get thread lock     
                    w.lock();
                    //  If the thread pool stops , Make sure the thread is interrupted  
                    //  If the thread pool is running , Ensure that the thread is not interrupted 
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                            (Thread.interrupted() &&
                                    runStateAtLeast(ctl.get(), STOP))) &&
                            !wt.isInterrupted())
                        //  After getting the task , Check the thread pool status again , If you find that the thread pool has stopped , Send it to yourself   Interrupt signal 
                        wt.interrupt();
                    try {
                        //  Hook method before task execution , Implementation is empty 
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; 
                            throw x;
                        } catch (Error x) {
                            thrown = x; 
                            throw x;
                        } catch (Throwable x) {
                            thrown = x; 
                            throw new Error(x);
                        } finally {
                            //  Hook method after task execution , Implementation is empty 
                            afterExecute(task, thrown);
                        }
                    } finally {
                        //  Task execution completed , take task Set to null
                        task = null;
                        //  The number of tasks completed by the thread plus 1
                        w.completedTasks++;
                        //  Release thread lock 
                        w.unlock();
                    }
                }
                //  Determine whether the thread exits normally 
                completedAbruptly = false;
            } finally {
                // Worker sign out 
                processWorkerExit(w, completedAbruptly);
            }
        }
}

        3.5.1  shutdown() Integrated analysis with task execution process

         The execution process of the task and The above thread pool closing process is combined for analysis , When calling shutdown() When , The following scenarios may occur :

        1. When calling shutdown() When , All threads are idle . This means that the task queue must be empty . here , All threads will block in getTask()  Method place . then , All threads will receive interruptIdleWorkers()  Interrupt signal from ,getTask()  return null, all Worker Will quit while loop , After that processWorkerExit.

        2. When calling  shutdown()  When , All threads are busy . here , The queue may be empty , It may also be non empty .interruptIdleWorkers()  Inside tryLock Call failed , Do nothing , All threads will continue to perform their current tasks . After that, all threads will complete the tasks in the queue , Until the queue is empty ,getTask()  Will return null. after , Just like the scene 1 The same , sign out while loop .

        3. When calling shutdown() When , Some threads are busy , Some threads are idle . Some threads are idle , Indicates that the queue must be empty , These threads must be blocking getTask()  Method place . These idle threads will be related to the scene 1 Handle it the same way , Threads that are not idle will be associated with the scene 2 Handle it the same way .

        getTask() Internal details of the method :

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        //  If the thread pool calls shutdownNow(), return null 
        //  If the thread pool calls shutdown(), And the task queue is empty , Also returned null
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            //  Reduce the number of worker threads by one 
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            //  If the queue is empty , It will block pool perhaps take, The former has a timeout , The latter has no timeout  
            //  Once interrupted , Throw an exception here , Corresponding to the above scenario 1.
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

        3.5.2  shutdownNow() Integrated analysis with task execution process

         And the above  shutdown()  similar , Just one more link , That is, clear the task queue . If a thread is executing some business code , Even if an interrupt signal is sent to it , It doesn't work , We can only wait for it to finish executing the code . therefore , The difference between interrupting an idle thread and interrupting all threads is not great , Unless the thread is currently blocking somewhere .

         When one  Worker  When you finally quit , Clean up will be carried out :

private void processWorkerExit(ThreadPoolExecutor.Worker w, boolean completedAbruptly) {
    
    //  If the thread exits normally , Not execute if The sentence of , This is usually an abnormal exit , Need to put worker Quantity minus one 
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        //  To his own worker Remove from collection 
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    //  Each thread will call this method at the end , See if you can stop the thread pool 
    tryTerminate();

    int c = ctl.get();

    //  If before the thread exits , The thread pool has not been closed yet 
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;

            //  If there are no other threads in the thread pool , And the task queue is not empty 
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;

            //  If the number of worker threads is greater than min, Indicates that tasks in the queue can be executed by other threads , Exit the current thread 
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }

        //  If the thread pool does not end before the current thread exits , The task queue is not empty , There are no other threads to execute  
        //  Just start another thread to process .
        addWorker(null, false);
    }
}

        3.6 Four rejection strategies of thread pool

         stay  execute(Runnable command)  Last , Called  reject(command)  Execute reject strategy , The code is as follows :

final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

        handler  This is the rejection policy manager that we can set :

/**
 * Handler called when saturated or shutdown in execute.
 */
private volatile RejectedExecutionHandler handler;

// Default rejection policy 
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
        RejectedExecutionHandler  It's an interface , Four implementations are defined , Corresponding to four different rejection strategies , The default is   AbortPolicy.
public interface RejectedExecutionHandler {

    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

        3.6.1 CallerRunsPolicy Strategy

        Strategy : Who requests who handles

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code CallerRunsPolicy}.
     */
    public CallerRunsPolicy() { }

    /**
     * Executes task r in the caller's thread, unless the executor
     * has been shut down, in which case the task is discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

        3.6.2 AbortPolicy Strategy

        Strategy : Throw exceptions

/**
 * A handler for rejected tasks that throws a
 * {@code RejectedExecutionException}.
 */
public static class AbortPolicy implements RejectedExecutionHandler {
    /**
     * Creates an {@code AbortPolicy}.
     */
    public AbortPolicy() { }

    /**
     * Always throws RejectedExecutionException.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     * @throws RejectedExecutionException always
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                " rejected from " +
                e.toString());
    }
}

        3.6.3 DiscardPolicy Strategy

        Strategy : Directly discard and do not deal with

/**
 * A handler for rejected tasks that silently discards the
 * rejected task.
 */
public static class DiscardPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code DiscardPolicy}.
     */
    public DiscardPolicy() { }

    /**
     * Does nothing, which has the effect of discarding task r.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

        3.6.4 DiscardOldestPolicy Strategy

        Strategy : Abandon the oldest task

/**
 * A handler for rejected tasks that discards the oldest unhandled
 * request and then retries {@code execute}, unless the executor
 * is shut down, in which case the task is discarded.
 */
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code DiscardOldestPolicy} for the given executor.
     */
    public DiscardOldestPolicy() { }

    /**
     * Obtains and ignores the next task that the executor
     * would otherwise execute, if one is immediately available,
     * and then retries execution of task r, unless the executor
     * is shut down, in which case task r is instead discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}

4. Executors Tool class

        concurrent The package provided. Executors Tool class , It can be used to create various types of thread pools .

        4.1 Thread pool comparison

         Single threaded thread pool :

public static ExecutorService newSingleThreadExecutor() {
    return new Executors.FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>()));
}

         A fixed number of thread pools :

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
}

         Every request received , Just create a thread to execute :        

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
            60L, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>());
}

         Single thread thread thread pool with periodic scheduling function :

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new Executors.DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
}

         Multithreading , Thread pool with scheduling function :

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

         Different types of thread pools , In fact, they are all configured by the previous key configuration parameters . 

        4.2 Best practices

         stay 《 Alibaba Java Development Manual 》 in , The use of Executors Creating a thread pool , And require developers to use  ThreadPoolExector  or  ScheduledThreadPoolExecutor  Create . This is to force developers to specify the running strategy of thread pool , Avoid the risk of resource exhaustion due to improper use .

原网站

版权声明
本文为[Ricardo, Ricardo]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/174/202206230740320927.html