当前位置:网站首页>Application and principle of ThreadPoolExecutor thread pool

Application and principle of ThreadPoolExecutor thread pool

2022-07-01 05:10:00 People below two meters are mortals

1、 Custom thread pool

A thread is a system resource , Every time you create , Will consume certain system resources , If you are in a high concurrency scenario , Each task creates a new thread , So this will occupy a considerable amount of the system , There may even be a memory overflow problem , And the number of threads cannot be too large , If too many , Frequent context switching , There will also be a great burden , For these two reasons , We can make full use of existing threads , Instead of creating new threads every time

Blocking Queue
Thread Pool
poll
poll
poll
put
task 1
task 2
task 3
t1
t2
t3
main

The first is blocking queues

@Slf4j
class BlockingQueue<T> {
    
    /** *  Default blocking task queue size  */
    private static final Integer DEFAULT_QUEUE_CAPACITY = 5;

    /** *  Blocking queues  */
    private Queue<T> queue;

    /** *  Blocking queues may have multiple thread operations , Need to ensure thread safety , So a lock is needed  */
    private final ReentrantLock lock = new ReentrantLock();

    /** *  If the queue is full , Then you can't add any more tasks , The thread adding the task needs to be blocked  */
    private final Condition putWaitSet = lock.newCondition();

    /** *  If the queue is already empty , Then you can't take any more tasks , The thread that fetches the task needs to be blocked  */
    private final Condition getWaitSet = lock.newCondition();

    /** *  Blocking task queue capacity  */
    private final int capacity;

    /** *  No arguments structure  */
    public BlockingQueue() {
    
        this.capacity = DEFAULT_QUEUE_CAPACITY;
        initQueue();
    }

    /** *  A parametric structure that carries the initial capacity  * * @param capacity  Custom capacity size  */
    public BlockingQueue(int capacity) {
    
        this.capacity = capacity;
        initQueue();
    }

    /** *  Create a task queue  */
    private void initQueue() {
    
        queue = new ArrayDeque<>(this.capacity);
    }

    /** *  Block acquisition task , Without timeout  * * @return T  Task object  */
    public T get() {
    
        lock.lock();
        try {
    
            while (queue.size() == 0) {
    
                try {
    
                    getWaitSet.await();
                } catch (InterruptedException e) {
    
                    e.printStackTrace();
                }
            }
            T t = queue.poll();
            putWaitSet.signalAll();
            return t;
        } finally {
    
            lock.unlock();
        }
    }

    /** *  Block acquisition task , With timeout  * * @param timeout  Timeout time  * @param timeUnit  Time out unit  * @return T  Task object  */
    public T get(long timeout, TimeUnit timeUnit) {
    
        lock.lock();
        try {
    
            long nanosTime = timeUnit.toNanos(timeout);
            while (queue.size() == 0) {
    
                try {
    
                    if (nanosTime <= 0) {
    
                        return null;
                    }
                    nanosTime = getWaitSet.awaitNanos(nanosTime);
                } catch (InterruptedException e) {
    
                    e.printStackTrace();
                }
            }
            T t = queue.poll();
            putWaitSet.signalAll();
            return t;
        } finally {
    
            lock.unlock();
        }
    }

    /** *  Block add task  * * @param t  Task object  */
    public void put(T t) {
    
        lock.lock();
        try {
    
            while (queue.size() == capacity) {
    
                try {
    
                    log.info(" Mission :{}  Waiting to join the blocking task queue ", t);
                    putWaitSet.await();
                } catch (InterruptedException e) {
    
                    e.printStackTrace();
                }
            }
            queue.add(t);
            getWaitSet.signalAll();
            log.info(" Mission :{}  Join the blocking task queue ", t);
        } finally {
    
            lock.unlock();
        }
    }

    /** *  Get task queue size  * * @return int  Task queue size  */
    public int size() {
    
        lock.lock();
        try {
    
            return queue.size();
        } finally {
    
            lock.unlock();
        }
    }
}

Then there is the custom thread pool

@Slf4j
class ThreadPool {
    
    /** *  Blocked task queue  */
    private final BlockingQueue<Runnable> blockingQueue;
    /** *  The worker thread  */
    private final Set<Worker> workers;
    /** *  The default number of worker threads  */
    private static final int DEFAULT_POOL_SIZE = 5;
    /** *  The default number of blocked task queue tasks  */
    private static final int DEFAULT_QUEUE_SIZE = 5;

    /** *  The default waiting time unit  */
    private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS;
    /** *  Default blocking wait time  */
    private static final int DEFAULT_TIMEOUT = 5;

    /** *  Number of worker threads  */
    private final int poolSize;
    /** *  The maximum number of tasks in the task queue  */
    private final int queueSize;

    /** *  Waiting time unit  */
    private final TimeUnit timeUnit;
    /** *  Blocking waiting time  */
    private final int timeout;

    /** *  No arguments structure  */
    public ThreadPool() {
    
        poolSize = DEFAULT_POOL_SIZE;
        queueSize = DEFAULT_QUEUE_SIZE;
        timeUnit = DEFAULT_TIME_UNIT;
        timeout = DEFAULT_TIMEOUT;
        blockingQueue = new BlockingQueue<>(queueSize);
        workers = new HashSet<>(poolSize);
    }

    /** *  Belt structure  * * @param queueSize  Maximum number of blocked task queue tasks  * @param poolSize  Number of worker threads  */
    public ThreadPool(int queueSize, int poolSize, TimeUnit timeUnit, int timeout) {
    
        this.poolSize = poolSize;
        this.queueSize = queueSize;
        this.timeUnit = timeUnit;
        this.timeout = timeout;
        this.blockingQueue = new BlockingQueue<>(queueSize);
        this.workers = new HashSet<>(poolSize);
    }

    /** *  Perform a task  * * @param task  Task object  */
    public void execute(Runnable task) {
    
        synchronized (workers) {
    
            if (workers.size() < poolSize) {
    
                Worker worker = new Worker(task);
                log.info(" Create a new task worker thread : {} , Perform tasks :{}", worker, task);
                workers.add(worker);
                worker.start();
            } else {
    
                blockingQueue.put(task);
            }
        }
    }

    /** *  Wrapper classes for worker threads  */
    @Data
    @EqualsAndHashCode(callSuper = false)
    @NoArgsConstructor
    @AllArgsConstructor
    class Worker extends Thread {
    
        private Runnable task;

        @Override
        public void run() {
    
            while (task != null || (task = blockingQueue.get(timeout, timeUnit)) != null) {
    
                try {
    
                    log.info(" Being implemented :{}", task);
                    task.run();
                } catch (Exception e) {
    
                    e.printStackTrace();
                } finally {
    
                    task = null;
                }
            }
            synchronized (workers) {
    
                log.info(" The worker thread is removed :{}", this);
                workers.remove(this);
            }
        }
    }
}

test

@Slf4j
public class TestPool {
    
    public static void main(String[] args) {
    
        ThreadPool pool = new ThreadPool(10, 2, TimeUnit.SECONDS, 2);
        for (int i = 0; i < 15; i++) {
    
            int j = i;
            pool.execute(() -> {
    
                try {
    
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
    
                    e.printStackTrace();
                }
                log.info(String.valueOf(j));
            });
        }
    }
}
[main] INFO com.phz.juc.ThreadPool -  Create a new task worker thread : ThreadPool.Worker(task=com.phz.juc.TestPool$$Lambda$2/1225358173@76fb509a) , Perform tasks :com.phz.juc.TestPool$$Lambda$2/1225358173@76fb509a
[main] INFO com.phz.juc.ThreadPool -  Create a new task worker thread : ThreadPool.Worker(task=com.phz.juc.TestPool$$Lambda$2/1225358173@2e817b38) , Perform tasks :com.phz.juc.TestPool$$Lambda$2/1225358173@2e817b38
[main] INFO com.phz.juc.BlockingQueue -  Mission :com.phz.juc.TestPool$$Lambda$2/1225358173@c4437c4  Join the blocking task queue 
[Thread-0] INFO com.phz.juc.ThreadPool -  Being implemented :com.phz.juc.TestPool$$Lambda$2/1225358173@76fb509a
[main] INFO com.phz.juc.BlockingQueue -  Mission :com.phz.juc.TestPool$$Lambda$2/1225358173@433c675d  Join the blocking task queue 
[Thread-1] INFO com.phz.juc.ThreadPool -  Being implemented :com.phz.juc.TestPool$$Lambda$2/1225358173@2e817b38
[main] INFO com.phz.juc.BlockingQueue -  Mission :com.phz.juc.TestPool$$Lambda$2/1225358173@3f91beef  Join the blocking task queue 
[main] INFO com.phz.juc.BlockingQueue -  Mission :com.phz.juc.TestPool$$Lambda$2/1225358173@1a6c5a9e  Join the blocking task queue 
[main] INFO com.phz.juc.BlockingQueue -  Mission :com.phz.juc.TestPool$$Lambda$2/1225358173@37bba400  Join the blocking task queue 
[main] INFO com.phz.juc.BlockingQueue -  Mission :com.phz.juc.TestPool$$Lambda$2/1225358173@179d3b25  Join the blocking task queue 
[main] INFO com.phz.juc.BlockingQueue -  Mission :com.phz.juc.TestPool$$Lambda$2/1225358173@254989ff  Join the blocking task queue 
[main] INFO com.phz.juc.BlockingQueue -  Mission :com.phz.juc.TestPool$$Lambda$2/1225358173@5d099f62  Join the blocking task queue 
[main] INFO com.phz.juc.BlockingQueue -  Mission :com.phz.juc.TestPool$$Lambda$2/1225358173@37f8bb67  Join the blocking task queue 
[main] INFO com.phz.juc.BlockingQueue -  Mission :com.phz.juc.TestPool$$Lambda$2/1225358173@49c2faae  Join the blocking task queue 
[main] INFO com.phz.juc.BlockingQueue -  Mission :com.phz.juc.TestPool$$Lambda$2/1225358173@20ad9418  Waiting to join the blocking task queue 
[Thread-0] INFO com.phz.juc.TestPool - 0
[Thread-1] INFO com.phz.juc.TestPool - 1
[Thread-0] INFO com.phz.juc.ThreadPool -  Being implemented :com.phz.juc.TestPool$$Lambda$2/1225358173@c4437c4
[main] INFO com.phz.juc.BlockingQueue -  Mission :com.phz.juc.TestPool$$Lambda$2/1225358173@20ad9418  Join the blocking task queue 
[main] INFO com.phz.juc.BlockingQueue -  Mission :com.phz.juc.TestPool$$Lambda$2/1225358173@439f5b3d  Waiting to join the blocking task queue 
[Thread-1] INFO com.phz.juc.ThreadPool -  Being implemented :com.phz.juc.TestPool$$Lambda$2/1225358173@433c675d
[main] INFO com.phz.juc.BlockingQueue -  Mission :com.phz.juc.TestPool$$Lambda$2/1225358173@439f5b3d  Join the blocking task queue 
[main] INFO com.phz.juc.BlockingQueue -  Mission :com.phz.juc.TestPool$$Lambda$2/1225358173@1d56ce6a  Waiting to join the blocking task queue 
[Thread-1] INFO com.phz.juc.TestPool - 3
[Thread-0] INFO com.phz.juc.TestPool - 2
[Thread-1] INFO com.phz.juc.ThreadPool -  Being implemented :com.phz.juc.TestPool$$Lambda$2/1225358173@3f91beef
[Thread-0] INFO com.phz.juc.ThreadPool -  Being implemented :com.phz.juc.TestPool$$Lambda$2/1225358173@1a6c5a9e
[main] INFO com.phz.juc.BlockingQueue -  Mission :com.phz.juc.TestPool$$Lambda$2/1225358173@1d56ce6a  Join the blocking task queue 
[Thread-1] INFO com.phz.juc.TestPool - 4
[Thread-0] INFO com.phz.juc.TestPool - 5
[Thread-1] INFO com.phz.juc.ThreadPool -  Being implemented :com.phz.juc.TestPool$$Lambda$2/1225358173@37bba400
[Thread-0] INFO com.phz.juc.ThreadPool -  Being implemented :com.phz.juc.TestPool$$Lambda$2/1225358173@179d3b25
[Thread-1] INFO com.phz.juc.TestPool - 6
[Thread-0] INFO com.phz.juc.TestPool - 7
[Thread-1] INFO com.phz.juc.ThreadPool -  Being implemented :com.phz.juc.TestPool$$Lambda$2/1225358173@254989ff
[Thread-0] INFO com.phz.juc.ThreadPool -  Being implemented :com.phz.juc.TestPool$$Lambda$2/1225358173@5d099f62
[Thread-0] INFO com.phz.juc.TestPool - 9
[Thread-1] INFO com.phz.juc.TestPool - 8
[Thread-0] INFO com.phz.juc.ThreadPool -  Being implemented :com.phz.juc.TestPool$$Lambda$2/1225358173@37f8bb67
[Thread-1] INFO com.phz.juc.ThreadPool -  Being implemented :com.phz.juc.TestPool$$Lambda$2/1225358173@49c2faae
[Thread-0] INFO com.phz.juc.TestPool - 10
[Thread-1] INFO com.phz.juc.TestPool - 11
[Thread-0] INFO com.phz.juc.ThreadPool -  Being implemented :com.phz.juc.TestPool$$Lambda$2/1225358173@20ad9418
[Thread-1] INFO com.phz.juc.ThreadPool -  Being implemented :com.phz.juc.TestPool$$Lambda$2/1225358173@439f5b3d
[Thread-0] INFO com.phz.juc.TestPool - 12
[Thread-1] INFO com.phz.juc.TestPool - 13
[Thread-0] INFO com.phz.juc.ThreadPool -  Being implemented :com.phz.juc.TestPool$$Lambda$2/1225358173@1d56ce6a
[Thread-1] INFO com.phz.juc.ThreadPool -  The worker thread is removed :ThreadPool.Worker(task=null)
[Thread-0] INFO com.phz.juc.TestPool - 14
[Thread-0] INFO com.phz.juc.ThreadPool -  The worker thread is removed :ThreadPool.Worker(task=null)

2、 Refusal strategy

If the task to be performed takes a long time , So what happens ?

@Slf4j
public class TestPool {
    
    public static void main(String[] args) {
    
        ThreadPool pool = new ThreadPool(10, 2, TimeUnit.SECONDS, 2);
        for (int i = 0; i < 20; i++) {
    
            int j = i;
            pool.execute(() -> {
    
                try {
    
                    TimeUnit.SECONDS.sleep(1000);
                } catch (InterruptedException e) {
    
                    e.printStackTrace();
                }
                log.info(String.valueOf(j));
            });
        }
    }
}

image-20220211173717008

You can see that the main thread has been waiting here , Unable to continue the follow-up task , In this case, should we throw an exception or refuse to execute to optimize , Or just wait for death , Or let the caller perform the task himself … So many possibilities , If applied to code , There will probably be a lot of if Branch it , At this time, we can adopt the strategy mode , To sum up all the rejection strategies

@FunctionalInterface
interface RejectPolicy<T> {
    
    /** *  Reject policy method  * * @param queue  Blocking queues  * @param task  Task object  */
    void reject(ArrayDeque<T> queue, T task);
}

ThreadPool Make corresponding modifications

@Slf4j
class ThreadPool {
    
    /** *  Blocked task queue  */
    private final BlockingQueue<Runnable> blockingQueue;
    /** *  The worker thread  */
    private final Set<Worker> workers;
    /** *  The default number of worker threads  */
    private static final int DEFAULT_POOL_SIZE = 5;
    /** *  The default number of blocked task queue tasks  */
    private static final int DEFAULT_QUEUE_SIZE = 5;

    /** *  The default waiting time unit  */
    private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS;
    /** *  Default blocking wait time  */
    private static final int DEFAULT_TIMEOUT = 5;

    /** *  Number of worker threads  */
    private final int poolSize;
    /** *  The maximum number of tasks in the task queue  */
    private final int queueSize;

    /** *  Waiting time unit  */
    private final TimeUnit timeUnit;
    /** *  Blocking waiting time  */
    private final int timeout;

    /** *  Refusal strategy  */
    private final RejectPolicy<Runnable> rejectPolicy;

    /** *  No arguments structure  */
    public ThreadPool() {
    
        poolSize = DEFAULT_POOL_SIZE;
        queueSize = DEFAULT_QUEUE_SIZE;
        timeUnit = DEFAULT_TIME_UNIT;
        timeout = DEFAULT_TIMEOUT;
        blockingQueue = new BlockingQueue<>(queueSize);
        workers = new HashSet<>(poolSize);
        // The default calling thread executes by itself 
        rejectPolicy = (queue, task) -> task.run();
    }

    /** *  Belt structure  * * @param queueSize  Maximum number of blocked task queue tasks  * @param poolSize  Number of worker threads  */
    public ThreadPool(int queueSize, int poolSize, TimeUnit timeUnit, int timeout, RejectPolicy<Runnable> rejectPolicy) {
    
        this.poolSize = poolSize;
        this.queueSize = queueSize;
        this.timeUnit = timeUnit;
        this.timeout = timeout;
        this.blockingQueue = new BlockingQueue<>(queueSize);
        this.workers = new HashSet<>(poolSize);
        this.rejectPolicy = rejectPolicy;
    }

    /** *  Perform a task  * * @param task  Task object  */
    public void execute(Runnable task) {
    
        synchronized (workers) {
    
            if (workers.size() < poolSize) {
    
                Worker worker = new Worker(task);
                log.info(" Create a new task worker thread : {} , Perform tasks :{}", worker, task);
                workers.add(worker);
                worker.start();
            } else {
    
                blockingQueue.tryPut(rejectPolicy, task);
            }
        }
    }

    /** *  Wrapper classes for worker threads  */
    @Data
    @EqualsAndHashCode(callSuper = false)
    @NoArgsConstructor
    @AllArgsConstructor
    class Worker extends Thread {
    
        private Runnable task;

        @Override
        public void run() {
    
            while (task != null || (task = blockingQueue.get(timeout, timeUnit)) != null) {
    
                try {
    
                    log.info(" Being implemented :{}", task);
                    task.run();
                } catch (Exception e) {
    
                    e.printStackTrace();
                } finally {
    
                    task = null;
                }
            }
            synchronized (workers) {
    
                log.info(" The worker thread is removed :{}", this);
                workers.remove(this);
            }
        }
    }
}

The blocking queue also needs to be modified

@Slf4j
class BlockingQueue<T> {
    
    /** *  Default blocking task queue size  */
    private static final Integer DEFAULT_QUEUE_CAPACITY = 5;

    /** *  Blocking queues  */
    private ArrayDeque<T> queue;

    /** *  Blocking queues may have multiple thread operations , Need to ensure thread safety , So a lock is needed  */
    private final ReentrantLock lock = new ReentrantLock();

    /** *  If the queue is full , Then you can't add any more tasks , The thread adding the task needs to be blocked  */
    private final Condition putWaitSet = lock.newCondition();

    /** *  If the queue is already empty , Then you can't take any more tasks , The thread that fetches the task needs to be blocked  */
    private final Condition getWaitSet = lock.newCondition();

    /** *  Blocking task queue capacity  */
    private final int capacity;

    /** *  No arguments structure  */
    public BlockingQueue() {
    
        this.capacity = DEFAULT_QUEUE_CAPACITY;
        initQueue();
    }

    /** *  A parametric structure that carries the initial capacity  * * @param capacity  Custom capacity size  */
    public BlockingQueue(int capacity) {
    
        this.capacity = capacity;
        initQueue();
    }

    /** *  Create a task queue  */
    private void initQueue() {
    
        queue = new ArrayDeque<>(this.capacity);
    }

    /** *  Block acquisition task , Without timeout  * * @return T  Task object  */
    public T get() {
    
        lock.lock();
        try {
    
            while (queue.size() == 0) {
    
                try {
    
                    getWaitSet.await();
                } catch (InterruptedException e) {
    
                    e.printStackTrace();
                }
            }
            T t = queue.poll();
            putWaitSet.signalAll();
            return t;
        } finally {
    
            lock.unlock();
        }
    }

    /** *  Block acquisition task , With timeout  * * @param timeout  Timeout time  * @param timeUnit  Time out unit  * @return T  Task object  */
    public T get(long timeout, TimeUnit timeUnit) {
    
        lock.lock();
        try {
    
            long nanosTime = timeUnit.toNanos(timeout);
            while (queue.size() == 0) {
    
                try {
    
                    if (nanosTime <= 0) {
    
                        return null;
                    }
                    nanosTime = getWaitSet.awaitNanos(nanosTime);
                } catch (InterruptedException e) {
    
                    e.printStackTrace();
                }
            }
            T t = queue.poll();
            putWaitSet.signalAll();
            return t;
        } finally {
    
            lock.unlock();
        }
    }

    /** *  Block add task  * * @param t  Task object  */
    public void put(T t) {
    
        lock.lock();
        try {
    
            while (queue.size() == capacity) {
    
                try {
    
                    log.info(" Mission :{}  Waiting to join the blocking task queue ", t);
                    putWaitSet.await();
                } catch (InterruptedException e) {
    
                    e.printStackTrace();
                }
            }
            queue.add(t);
            getWaitSet.signalAll();
            log.info(" Mission :{}  Join the blocking task queue ", t);
        } finally {
    
            lock.unlock();
        }
    }

    /** *  Block adding tasks with timeout  * * @param t  Task object  * @param timeout  Time out period  * @param timeUnit  Timeout duration unit  */
    public boolean put(T t, long timeout, TimeUnit timeUnit) {
    
        lock.lock();
        try {
    
            long nanos = timeUnit.toNanos(timeout);
            while (queue.size() == capacity) {
    
                try {
    
                    if (nanos <= 0) {
    
                        log.info(" Join task timeout , Cancel adding ");
                        return false;
                    }
                    log.info(" Mission :{}  Waiting to join the blocking task queue ", t);
                    nanos = putWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
    
                    e.printStackTrace();
                }
            }
            queue.add(t);
            getWaitSet.signalAll();
            log.info(" Mission :{}  Join the blocking task queue ", t);
            return true;
        } finally {
    
            lock.unlock();
        }
    }

    /** *  Get task queue size  * * @return int  Task queue size  */
    public int size() {
    
        lock.lock();
        try {
    
            return queue.size();
        } finally {
    
            lock.unlock();
        }
    }

    /** *  Add task with reject policy  * * @param rejectPolicy  Refusal strategy  * @param task  Task object  */
    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
    
        lock.lock();
        try {
    
            if (queue.size() == capacity) {
    
                rejectPolicy.reject(queue, task);
            } else {
    
                log.info(" Join the task queue : {}", task);
                queue.add(task);
                getWaitSet.signalAll();
            }
        } finally {
    
            lock.unlock();
        }
    }
}

test

ThreadPool pool = new ThreadPool(10, 2, TimeUnit.SECONDS, 2,
                                 // Give up execution 
                                 (blockingQueue, task) -> {
    
                                     log.info(" give up {}", task);
                                 }
                                );

for (int i = 0; i < 20; i++) {
    
    int j = i;
    pool.execute(() -> {
    
        try {
    
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
    
            e.printStackTrace();
        }
        log.info(String.valueOf(j));
    });
}
[main] INFO com.phz.juc.ThreadPool -  Create a new task worker thread : ThreadPool.Worker(task=com.phz.juc.TestPool$$Lambda$3/2121055098@4d405ef7) , Perform tasks :com.phz.juc.TestPool$$Lambda$3/2121055098@4d405ef7
[main] INFO com.phz.juc.ThreadPool -  Create a new task worker thread : ThreadPool.Worker(task=com.phz.juc.TestPool$$Lambda$3/2121055098@3f91beef) , Perform tasks :com.phz.juc.TestPool$$Lambda$3/2121055098@3f91beef
[main] INFO com.phz.juc.BlockingQueue -  Join the task queue : com.phz.juc.TestPool$$Lambda$3/2121055098@1a6c5a9e
[Thread-0] INFO com.phz.juc.ThreadPool -  Being implemented :com.phz.juc.TestPool$$Lambda$3/2121055098@4d405ef7
[Thread-1] INFO com.phz.juc.ThreadPool -  Being implemented :com.phz.juc.TestPool$$Lambda$3/2121055098@3f91beef
[main] INFO com.phz.juc.BlockingQueue -  Join the task queue : com.phz.juc.TestPool$$Lambda$3/2121055098@37bba400
[main] INFO com.phz.juc.BlockingQueue -  Join the task queue : com.phz.juc.TestPool$$Lambda$3/2121055098@179d3b25
[main] INFO com.phz.juc.BlockingQueue -  Join the task queue : com.phz.juc.TestPool$$Lambda$3/2121055098@254989ff
[main] INFO com.phz.juc.BlockingQueue -  Join the task queue : com.phz.juc.TestPool$$Lambda$3/2121055098@5d099f62
[main] INFO com.phz.juc.BlockingQueue -  Join the task queue : com.phz.juc.TestPool$$Lambda$3/2121055098@37f8bb67
[main] INFO com.phz.juc.BlockingQueue -  Join the task queue : com.phz.juc.TestPool$$Lambda$3/2121055098@49c2faae
[main] INFO com.phz.juc.BlockingQueue -  Join the task queue : com.phz.juc.TestPool$$Lambda$3/2121055098@20ad9418
[main] INFO com.phz.juc.BlockingQueue -  Join the task queue : com.phz.juc.TestPool$$Lambda$3/2121055098@31cefde0
[main] INFO com.phz.juc.BlockingQueue -  Join the task queue : com.phz.juc.TestPool$$Lambda$3/2121055098@439f5b3d
[main] INFO com.phz.juc.TestPool -  give up com.phz.juc.TestPool$$Lambda$3/2121055098@1d56ce6a
[main] INFO com.phz.juc.TestPool -  give up com.phz.juc.TestPool$$Lambda$3/2121055098@5197848c
[main] INFO com.phz.juc.TestPool -  give up com.phz.juc.TestPool$$Lambda$3/2121055098@17f052a3
[main] INFO com.phz.juc.TestPool -  give up com.phz.juc.TestPool$$Lambda$3/2121055098@2e0fa5d3
[main] INFO com.phz.juc.TestPool -  give up com.phz.juc.TestPool$$Lambda$3/2121055098@5010be6
[main] INFO com.phz.juc.TestPool -  give up com.phz.juc.TestPool$$Lambda$3/2121055098@685f4c2e
[main] INFO com.phz.juc.TestPool -  give up com.phz.juc.TestPool$$Lambda$3/2121055098@7daf6ecc
[main] INFO com.phz.juc.TestPool -  give up com.phz.juc.TestPool$$Lambda$3/2121055098@2e5d6d97
[Thread-0] INFO com.phz.juc.TestPool - 0
[Thread-1] INFO com.phz.juc.TestPool - 1
[Thread-0] INFO com.phz.juc.ThreadPool -  Being implemented :com.phz.juc.TestPool$$Lambda$3/2121055098@1a6c5a9e
[Thread-1] INFO com.phz.juc.ThreadPool -  Being implemented :com.phz.juc.TestPool$$Lambda$3/2121055098@37bba400
[Thread-1] INFO com.phz.juc.TestPool - 3
[Thread-0] INFO com.phz.juc.TestPool - 2
[Thread-1] INFO com.phz.juc.ThreadPool -  Being implemented :com.phz.juc.TestPool$$Lambda$3/2121055098@179d3b25
[Thread-0] INFO com.phz.juc.ThreadPool -  Being implemented :com.phz.juc.TestPool$$Lambda$3/2121055098@254989ff
[Thread-0] INFO com.phz.juc.TestPool - 5
[Thread-1] INFO com.phz.juc.TestPool - 4
[Thread-0] INFO com.phz.juc.ThreadPool -  Being implemented :com.phz.juc.TestPool$$Lambda$3/2121055098@5d099f62
[Thread-1] INFO com.phz.juc.ThreadPool -  Being implemented :com.phz.juc.TestPool$$Lambda$3/2121055098@37f8bb67
[Thread-1] INFO com.phz.juc.TestPool - 7
[Thread-0] INFO com.phz.juc.TestPool - 6
[Thread-1] INFO com.phz.juc.ThreadPool -  Being implemented :com.phz.juc.TestPool$$Lambda$3/2121055098@49c2faae
[Thread-0] INFO com.phz.juc.ThreadPool -  Being implemented :com.phz.juc.TestPool$$Lambda$3/2121055098@20ad9418
[Thread-1] INFO com.phz.juc.TestPool - 8
[Thread-0] INFO com.phz.juc.TestPool - 9
[Thread-1] INFO com.phz.juc.ThreadPool -  Being implemented :com.phz.juc.TestPool$$Lambda$3/2121055098@31cefde0
[Thread-0] INFO com.phz.juc.ThreadPool -  Being implemented :com.phz.juc.TestPool$$Lambda$3/2121055098@439f5b3d
[Thread-1] INFO com.phz.juc.TestPool - 10
[Thread-0] INFO com.phz.juc.TestPool - 11
[Thread-1] INFO com.phz.juc.ThreadPool -  The worker thread is removed :ThreadPool.Worker(task=null)
[Thread-0] INFO com.phz.juc.ThreadPool -  The worker thread is removed :ThreadPool.Worker(task=null)

Other rejection strategies

  • The caller executes
(blockingQueue, task) -> {
    
    task.run();
}
  • Death etc.
BlockingQueue::put
  • Throw an exception
(blockingQueue, task) -> {
    
    throw new RuntimeException(" Task execution failed  " + task);
}
  • Wait with timeout
(blockingQueue, task) -> {
    
    blockingQueue.put(task, 1500, TimeUnit.MILLISECONDS);     
}
  • Give up execution
(blockingQueue, task) -> {
    
    log.info(" give up {}", task);
}

3、ThreadPoolExecutor

image-20220211184911081

3.1、 Thread pool state

ThreadPoolExecutor Use int The height of 3 Bit to represent thread pool status , low 29 Bits indicate the number of threads

Status name high 3 position Receive new tasks Processing blocking queue tasks explain
RUNNING111YY
SHUTDOWN000NY No new missions , However, the remaining blocking queue will be processed Mission
STOP001NN It interrupts the task in progress , And abandon the blocking queue Mission
TIDYING010-- All the tasks have been completed , The active thread is 0 About to enter End
TERMINATED011-- Put an end to the state
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

Compare... Numerically ,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING (111, The highest bit is 1, A negative number ), This information is stored in an atomic variable ctl in , The purpose is to combine the thread pool state with the number of threads , So you can use it once CAS Atomic operations are used to assign values

// rs  Is the top three , According to state , wc  For low  29  position , Indicates the number of threads , Then merge them 
private static int ctlOf(int rs, int wc) {
     return rs | wc; }

//c  For the original value ,ctlOf  The return value is the expected value 
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))

3.2、 Construction method

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
  • corePoolSize Number of core threads ( The maximum number of threads reserved )
  • maximumPoolSize Maximum number of threads ( Subtract the number of core threads = Number of emergency threads )
  • keepAliveTime Time to live - For emergency threads unit Time unit - For emergency threads
  • workQueue Blocking queues
  • threadFactory Thread factory ( You can give a good name for thread creation , It is convenient to debug )
  • handler Refusal strategy

The emergency thread is used for , The tasks performed by the core thread are not over yet , The blocking queue is full , Will see if there is an emergency thread , If there is one, carry it out , After the emergency thread is executed , For a certain time ( Time to live ) It will be destroyed after , But after the execution of the core thread , It's not going to be destroyed , Will survive backstage , If the core thread , Blocking queues , The emergency threads are full , The rejection strategy is executed

Thread pool c = 2, m = 3
Blocking queues
Core thread 1
Core thread 2
Emergency thread 1
Mission 1
Mission 2
Mission 5
size = 2
Mission 3
Mission 4

It works as follows :

  • There are no threads in the thread pool at first , When a task is submitted to the thread pool , The thread pool creates a new thread to perform the task .

  • When the number of threads reaches corePoolSize No threads are idle , Then join the task , New tasks will be added workQueue Queuing , Until there are idle threads .

  • If the queue selects Bounded queues , So when the task exceeds the queue size , Will create maximumPoolSize - corePoolSize Number of threads to save .

  • If the thread arrives maximumPoolSize If there are still new tasks, the reject policy will be executed . Refusal strategy JDK Provides 4 Kind of implementation , Other well-known frameworks also provide implementations

    • AbortPolicy Let the caller throw RejectedExecutionException abnormal , This is the default strategy
    • CallerRunsPolicy Let the caller run the task
    • DiscardPolicy Give up this mission
    • DiscardOldestPolicy Abandon the earliest task in the queue , This task replaces

    image-20220211193243651

    • Dubbo The implementation of the , Throwing out RejectedExecutionException The log will be recorded before the exception , and dump Thread stack information , Easy to locate problems
    • Netty The implementation of the , Is to create a new thread to execute the task
    • ActiveMQ The implementation of the , With overtime waiting (60s) Attempt to put in queue , Similar to the rejection policy we customized before
    • PinPoint ( Link tracking ) The implementation of the , It uses a rejection policy chain , Each rejection policy in the policy chain will be tried one by one
  • When the peak is over , exceed corePoolSize If there is no task to do for a period of time , Need to end saving resources , This time is by keepAliveTime and unit To control .

According to this construction method ,JDK Executors Class provides many factory methods to create thread pools for various purposes

3.3、 Several factory methods

Relevant methods can enter Executors Class

3.3.1、newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
  • Number of core threads == Maximum number of threads ( No emergency threads have been created ), Therefore, no timeout is required
  • The blocking queue is unbounded , You can put any number of tasks

It can be used when the amount of task is known , Relatively time-consuming tasks

3.3.2、newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
    
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
  • The number of core threads is 0, The maximum number of threads is Integer.MAX_VALUE, The idle lifetime of the emergency thread is 60s, signify
    • It's all emergency threads (60s It can be recycled )
    • Emergency threads can be created indefinitely
  • The queue uses SynchronousQueue The implementation features are , It has no capacity , You can't put it in without a thread ( Hand in hand 、 Delivery on one hand )
SynchronousQueue<Integer> integers = new SynchronousQueue<>();
new Thread(() -> {
    
    try {
    
        log.info("putting {} ", 1);
        integers.put(1);
        log.info("{} putted...", 1);
        log.info("putting...{} ", 2);
        integers.put(2);
        log.info("{} putted...", 2);
    } catch (InterruptedException e) {
    
        e.printStackTrace();
    }
}, "t1").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
    
    try {
    
        log.info("taking {}", 1);
        integers.take();
    } catch (InterruptedException e) {
    
        e.printStackTrace();
    }
}, "t2").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
    
    try {
    
        log.info("taking {}", 2);
        integers.take();
    } catch (InterruptedException e) {
    
        e.printStackTrace();
    }
}, "t3").start();

image-20220211194902586

The whole thread pool shows that the number of threads will increase according to the number of tasks , There is no upper limit , When the task is finished , Free 1 Release thread in minutes . Suitable for intensive tasks , But when each task takes a short time

3.2.3、newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
    
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

It is applicable to tasks that you want to queue for execution . The number of threads is fixed to 1, There are more tasks than 1 when , Will be placed in an unbounded queue . Task completed , The only thread will not be released .

Only one thread , Can it also be called thread pool ?

  • Create a single thread to execute the task serially , If the task fails and terminates, there is no remedy , The thread pool will also create a new thread , Ensure the normal operation of the pool

  • Executors.newSingleThreadExecutor() The number of threads is always 1, Do not modify

    • Back to a FinalizableDelegatedExecutorService , Parameters are created ThreadPoolExecutor , The decorator mode is applied here , Only exposed ExecutorService Interface , It limits that we can only call methods of the interface , Instead of calling ThreadPoolExecutor The unique method in
    static class FinalizableDelegatedExecutorService
        extends DelegatedExecutorService {
          
        FinalizableDelegatedExecutorService(ExecutorService executor) {
          
            super(executor);
        }
        protected void finalize() {
          
            super.shutdown();
        }
    }
    

    image-20220211200330626

  • Executors.newFixedThreadPool(1) When the initial for 1, It can be modified later

    • What's exposed is ThreadPoolExecutor object , Can be strong after the call. setCorePoolSize And so on

3.4、 Submit tasks

//  Perform tasks 
void execute(Runnable command);

//  Submit tasks  task, Use return value  Future  Get mission results 
<T> Future<T> submit(Callable<T> task);

//  Submit  tasks  All tasks in 
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
 throws InterruptedException;

//  Submit  tasks  All tasks in 
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException;

//  Submit  tasks  All tasks in , With timeout 
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                              long timeout, TimeUnit unit)
    throws InterruptedException;

//  Submit  tasks  All tasks in , Which task is successfully completed first , Returns the result of this task , Other tasks cancelled 
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException;

//  Submit  tasks  All tasks in , Which task is successfully completed first , Returns the result of this task , Other tasks cancelled , With timeout 
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
                long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;

3.5、 Close thread pool

  • shutdown
// The thread pool state changes to  SHUTDOWN, No new missions , But the submitted task will be finished ( Includes executing and blocking queues ), This method does not block the execution of the calling thread 
public void shutdown() {
    
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
    
        checkShutdownAccess();
        // Modify thread pool state 
        advanceRunState(SHUTDOWN);
        // Only idle threads will be interrupted 
        interruptIdleWorkers();
        onShutdown(); // The extension point   To subclass ScheduledThreadPoolExecutor With 
    } finally {
    
        mainLock.unlock();
    }
    // Try to end 
    tryTerminate();
}
  • shutdownNow
// The thread pool state changes to  STOP, No new missions , The task in the task queue will be returned , And use  interrupt  To interrupt a task in progress 
public List<Runnable> shutdownNow() {
    
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
    
        checkShutdownAccess();
        //  Modify thread pool state 
        advanceRunState(STOP);
        //  Break all threads 
        interruptWorkers();
        //  Get the remaining tasks in the queue 
        tasks = drainQueue();
    } finally {
    
        mainLock.unlock();
    }
    //  Try to end 
    tryTerminate();
    return tasks;
}
  • Other methods
//  be not in  RUNNING  Thread pool for state , This method returns  true
boolean isShutdown();

//  Whether the thread pool state is  TERMINATED
boolean isTerminated();

//  call  shutdown  after , Because the calling thread does not wait for all tasks to run , So if it wants to pool  TERMINATED  Do something later , You can use this method to wait for 
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

4、 Task scheduling thread pool

4.1、 Delayed execution

stay 『 Task scheduling thread pool 』 Before adding the function , have access to java.util.Timer To realize the timing function ,Timer The advantage is that it's easy to use , But because all tasks are scheduled by the same thread , So all tasks are executed serially , There can only be one task at a time , The delay or exception of the previous task will affect the later task .

Timer timer = new Timer();
TimerTask task1 = new TimerTask() {
    
    @SneakyThrows
    @Override
    public void run() {
    
        log.info("task 1");
        TimeUnit.SECONDS.sleep(2);
    }
};
TimerTask task2 = new TimerTask() {
    
    @Override
    public void run() {
    
        log.info("task 2");
    }
};
//  Use  timer  Add two tasks , I hope they're all there  1s  After execution 
//  But because of  timer  There is only one thread in the queue to execute the tasks in sequence , therefore 『 Mission 1』 Time delay of , Affected 『 Mission 2』 Implementation 
timer.schedule(task1, 1000);
timer.schedule(task2, 1000);

image-20220211212158053

Use task scheduling thread pool to transform :

ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
//  Add two tasks , I hope they're all there  1s  After execution 
executor.schedule(() -> {
    
    System.out.println(" Mission 1, execution time :" + new Date());
    try {
    
        Thread.sleep(2000);
    } catch (InterruptedException e) {
    
        e.printStackTrace();
    }
}, 1000, TimeUnit.MILLISECONDS);
executor.schedule(() -> {
    
    System.out.println(" Mission 2, execution time :" + new Date());
}, 1000, TimeUnit.MILLISECONDS);

If one of the threads reports an error , It will not affect the execution of other threads

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
//  Add two tasks , I hope they're all there  1s  After execution 
executor.schedule(() -> {
    
    System.out.println(" Mission 1, execution time :" + new Date());
    int i = 1 / 0;
}, 1000, TimeUnit.MILLISECONDS);
executor.schedule(() -> {
    
    System.out.println(" Mission 2, execution time :" + new Date());
}, 1000, TimeUnit.MILLISECONDS);

image-20220211212757495

But the expected exception did not appear , What's going on? ? For anomalies , We can have the following solutions

  • Catch yourself
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
//  Add two tasks , I hope they're all there  1s  After execution 
executor.schedule(() -> {
    
    try {
    

        System.out.println(" Mission 1, execution time :" + new Date());
        int i = 1 / 0;
    } catch (Exception e) {
    
        e.printStackTrace();
    }
}, 1000, TimeUnit.MILLISECONDS);
executor.schedule(() -> {
    
    System.out.println(" Mission 2, execution time :" + new Date());
}, 1000, TimeUnit.MILLISECONDS);

image-20220211214103262

  • Use future
ExecutorService pool = Executors.newFixedThreadPool(1);
Future<Boolean> f = pool.submit(() -> {
    
    log.info("task1");
    int i = 1 / 0;
    return true;
});
log.info("result:{}", f.get());

image-20220211214636107

4.2、 Timing execution

ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start...");
pool.scheduleAtFixedRate(() -> {
    
    log.info("running...");
}, 1, 1, TimeUnit.SECONDS);

image-20220211213202595

What if the task itself takes longer than the interval ?

ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start...");
pool.scheduleAtFixedRate(() -> {
    
    log.info("running..." + new Date());
    try {
    
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
    
        e.printStackTrace();
    }
}, 1, 1, TimeUnit.SECONDS);

image-20220211213444824

  • In limine , Time delay 1s, Next , Due to task execution time > Time interval between , Interval is 『 support 』 here we are 2s, That is, take the maximum value of interval time and actual time

If you want two tasks to be executed at a certain interval , have access to scheduleWithFixedDelay Method

ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start...");
pool.scheduleWithFixedDelay(() -> {
    
    log.info("running..." + new Date());
    try {
    
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
    
        e.printStackTrace();
    }
}, 1, 1, TimeUnit.SECONDS);

image-20220211213847033

  • Time delay 1s,scheduleWithFixedDelay What's the interval End of last task <-> Time delay <-> The next task begins So the intervals are 3s

5、Tomcat Thread pool

connector(NIO EndPoint)
Executor
Reading
Reading
socketProcessor
socketProcessor
Acceptor
LimitLatch
Socket Channel 1
Socket Channel 2
Poller
worker1
worker2
  • LimitLatch For current limiting , You can control the maximum number of connections , similar JUC Medium Semaphore
  • Acceptor Only responsible for accepting new Socket Connect
  • Poller Only monitor Socket Channel Whether there is readable IO event , Once readable , Encapsulates a socketProcessor Task object , Submit to Executor Thread pool processing
  • Executor The worker thread in is ultimately responsible for processing requests

Tomcat Thread pools are also called ThreadPoolExecutor , It's based on JDK From the modification in , So and JDK One of them is slightly different

  • If the number of bus passes reaches maximumPoolSize, It will not be thrown immediately RejectedExecutionException abnormal , Instead, try to put the task in the queue , If it still fails , Will throw this exception

Source code tomcat-catalina-9.0.58

public void execute(Runnable command, long timeout, TimeUnit unit) {
    
    submittedCount.incrementAndGet();
    try {
    
        //  If the thread is full , Will throw  RejectedExecutionException  abnormal 
        executeInternal(command);
    } catch (RejectedExecutionException rx) {
    
        if (getQueue() instanceof TaskQueue) {
    
            //  If  Executor  Approaching the maximum pool size , Concurrent invocations  execute()  May lead to ( because  Tomcat  Use  TaskQueue) Some tasks are rejected instead of queued . If this happens , Please add them to the queue .
            //  The blocking queue has also been extended 
            final TaskQueue queue = (TaskQueue) getQueue();
            try {
    
                //  Try to join the queue 
                if (!queue.force(command, timeout, unit)) {
    
                    //  An exception will be thrown if the join fails 
                    submittedCount.decrementAndGet();
                    throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
                }
            } catch (InterruptedException x) {
    
                submittedCount.decrementAndGet();
                throw new RejectedExecutionException(x);
            }
        } else {
    
            submittedCount.decrementAndGet();
            throw rx;
        }
    }
}

Connector Configuration item , Corresponding Tomcatserver.xml file Connector label

Configuration item The default value is explain
acceptorThreadCount1acceptor Number of threads
pollerThreadCount1poller Number of threads
minSpareThreads10 Number of core threads , namely corePoolSize
maxThreads200 Maximum number of threads , namely maximumPoolSize
executor-Executor name , Used to reference the following Executor
  • acceptor: The number of threads is 1 Just fine , It is blocked when there is no request , There is no need to use too many threads
  • poller: The idea of multiplexing is adopted , Can monitor a lot of Channel Read and write events , therefore 1 One is OK
  • executor: A tag refers to the following configuration , If the , It will cover minSpareThreads and maxThreads

Executor Thread configuration , Corresponding Tomcatserver.xml file Executor label ( The default is commented out )

Configuration item The default value is explain
threadPriority5 Thread priority
daemontrue Whether to guard threads
minSpareThreads25 Number of core threads , namely corePoolSize
maxThreads200 Maximum number of threads , namely maximumPoolSize
maxIdleTime60000 Thread lifetime , In milliseconds , The default value is 1 minute
maxQueueSizeInteger.MAX_VALUE The queue length
prestartminSpareThreadsfalse Whether the core thread starts when the server starts
  • daemontomcat All threads in are daemon threads , Once the main service is shut down , The rest of the threads will shut down
  • maxIdleTime: That is, the survival time of the emergency thread
  • maxQueueSize: The length of the blocking queue , The default is unbounded queue , If the server is under too much pressure , It may cause the accumulation of tasks
  • prestartminSpareThreads: Whether the creation of core threads is lazy

Since the blocking queue is unbounded , Is the emergency thread useless , Because in JDK in , If the blocking queue is bounded , Only when the blocking queue is full , Will create an emergency thread , The blocking queue here is

An unbounded , The emergency thread will soon be useless

In fact, it's just Tomcat Source code ,TaskQueue Extensions have been made ,Tomcat The task being executed will be counted in , Submission will add one , One will be deducted at the end , If the number of tasks submitted is still less than the number of core threads , Directly join the blocking queue for execution , If it is larger than the core thread , It will not join the blocking queue immediately , It will first determine whether the number of tasks submitted is greater than the maximum number of threads , If it is less than , Create an emergency thread to execute , If it is greater than , Will join the blocking queue

yes
no
yes
no
Add a new task
Submit tasks < Core thread
Join the queue
Submit tasks < Maximum thread
Create an emergency thread

原网站

版权声明
本文为[People below two meters are mortals]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/02/202202160227219156.html