当前位置:网站首页>Chapter 7 - thread pool of shared model

Chapter 7 - thread pool of shared model

2022-07-06 06:42:00 Ape feather

Chapter vii. - Thread pool of shared model

Thread pool ( a key )

Pool technology There's a lot of , such as Thread pool Database connection pool HTTP Connection pool And so on are the application of this idea . The idea of pooling technology is to reduce the consumption of resources every time , Improve the utilization of resources .

Thread pools provide a Restrict and manage resources ( Including performing a task ). Each thread pool also maintains some basic statistics , For example, the number of tasks completed .

Borrow here 《Java The art of concurrent programming 》 Let's talk about the benefits of using thread pool :

  • Reduce resource consumption . Reduces the cost of thread creation and destruction by reusing created threads .( Threads created , In fact, the final mapping should be done with the threads of the operating system , It's a drain on resources )
  • Improve response time . When the mission arrives , Tasks can be executed immediately without waiting for a thread to be created .
  • Improve the manageability of threads . Threads are scarce resources , If unlimited creation , Not only does it consume system resources , It also reduces the stability of the system , Uniform allocation is possible using thread pools , Tune and monitor .

Custom thread pool

Untitled

Untitled

  • The blocking queue is maintained by the main thread ( Or other threads ) The resulting task
  • The main thread is similar to producer , Generate the task and put it into the blocking queue
  • Thread pools are similar to consumer , Get the existing tasks in the blocking queue and execute

Implementation steps of custom thread pool :

  • step 1: Customize the rejection policy interface
  • step 2: Custom task blocking queue
  • step 3: Custom thread pool
  • step 4: test
/** * Description:  Customize a simple thread pool  * * @author xiexu * @create 2022-02-08 8:06 PM */

@Slf4j(topic = "c.TestPool")
public class TestPool {
    

    public static void main(String[] args) {
    
        /** *  The first parameter : Maximum number of threads  *  The second parameter : Timeout time  *  The third parameter : Time unit  *  Fourth parameter : Queue capacity  */
        ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1, new RejectPolicy<Runnable>() {
    
            @Override
            public void reject(BlockingQueue<Runnable> queue, Runnable task) {
    
                //  Refusal strategy 
                // 1、 Death etc. 
                //queue.put(task);

                // 2、 With overtime waiting 
                queue.offer(task, 500, TimeUnit.MILLISECONDS);

                // 3、 Let the caller give up the task execution 
                // log.debug(" give up -{}", task);

                // 4、 Let the caller discard the exception 
                // throw new RuntimeException(" Task execution failed " + task);

                // 5、 Let the caller do the task himself 
                // task.run();
            }
        });
        //  establish 5 A mission 
        for (int i = 0; i < 3; i++) {
    
            int j = i;
            threadPool.execute(new Runnable() {
    
                @Override
                public void run() {
    
                    try {
    
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
    
                        e.printStackTrace();
                    }
                    log.debug("{}", j);
                }
            });
        }
    }

}

@FunctionalInterface //  Refusal strategy 
interface RejectPolicy<T> {
    
    void reject(BlockingQueue<T> queue, T task);
}

/** *  Thread pool  */
@Slf4j(topic = "c.ThreadPool")
class ThreadPool {
    

    //  Blocking the task queue 
    private BlockingQueue<Runnable> taskQueue;

    //  Thread set 
    private HashSet<Worker> workers = new HashSet<>();

    //  Number of core threads 
    private int coreSize;

    //  Get the timeout of the task 
    private long timeout;

    private TimeUnit timeUnit;

    //  Refusal strategy 
    private RejectPolicy<Runnable> rejectPolicy;

    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) {
    
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockingQueue<>(queueCapacity);
        this.rejectPolicy = rejectPolicy;
    }

    //  Perform tasks 
    public void execute(Runnable task) {
    
        synchronized (workers) {
    
            //  When the number of tasks does not exceed the number of threads ,  Show the current worker Threads can consume these tasks ,  There is no need to add tasks to the blocking queue 
            if (workers.size() < coreSize) {
    
                Worker worker = new Worker(task);
                log.debug(" newly added  worker {}, {}", worker, task);
                //  Add a newly created thread to the thread collection 
                workers.add(worker);
                worker.start();
            } else {
     //  When the number of tasks exceeds the number of threads , Join the task queue for staging 

                // taskQueue.put(task); //  Only one kind of dead waiting can be set , So we can use the rejection strategy 
                //  Refusal strategy 
                // 1、 Death etc. 
                // 2、 With overtime waiting 
                // 3、 Let the caller give up the task execution 
                // 4、 Let the caller discard the exception 
                // 5、 Let the caller do the task himself 
                taskQueue.tryPut(rejectPolicy, task);
            }
        }
    }

    //  Thread class 
    class Worker extends Thread {
    
        private Runnable task;

        public Worker(Runnable task) {
    
            this.task = task;
        }

        @Override
        public void run() {
    
            //  Perform tasks 
            // 1):  When task Not empty ,  Perform tasks 
            // 2):  When task completion of enforcement ,  Then get the task from the blocking queue and execute 
            //while (task != null || (task = taskQueue.take()) != null) {
    
            while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
    
                try {
    
                    log.debug(" Being implemented ...{}", task);
                    task.run();
                } catch (Exception e) {
    
                    e.printStackTrace();
                } finally {
    
                    //  Set the executed task to null
                    task = null;
                }
            }

            //  Remove the current thread from the thread collection 
            synchronized (workers) {
    
                log.debug(" At present worker Removed  {}", this);
                workers.remove(this);
            }
        }
    }

}

/** *  Blocking queue for storing tasks  * * @param <T> Runnable,  The task is abstracted as Runnable */
@Slf4j(topic = "c.BlockingQueue")
class BlockingQueue<T> {
    
    // 1、 Task queue 
    private Deque<T> queue = new ArrayDeque<>();

    // 2、 lock 
    private ReentrantLock lock = new ReentrantLock();

    // 3、 Producer's conditional variable  ( When the blocking queue is full of tasks ,  There's no space ,  Now enter the condition variable and wait )
    private Condition fullWaitSet = lock.newCondition();

    // 4、 Consumer conditional variables  ( When there are no tasks to consume ,  Enter the condition variable and wait )
    private Condition emptyWaitSet = lock.newCondition();

    // 5、 The capacity of the blocking queue 
    private int capacity;

    public BlockingQueue(int capacity) {
    
        this.capacity = capacity;
    }

    //  Get task from blocking queue ,  If there is no task ,  Will wait for the specified time 
    public T poll(long timeout, TimeUnit unit) {
    
        lock.lock();
        try {
    
            //  take timeout Unified conversion to nanoseconds 
            long nanos = unit.toNanos(timeout);
            while (queue.isEmpty()) {
    
                try {
    
                    //  Said the timeout ,  No need to wait ,  Go straight back to null
                    if (nanos <= 0) {
    
                        return null;
                    }
                    //  Time of return value ( Returns the remaining time ) =  Waiting time  -  After time   So there is no false awakening ( It refers to being awakened before enough time , Then wait for the same time again )
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
    
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal(); //  Wake up producers to produce ,  At this time, the blocking queue is not full 
            return t;
        } finally {
    
            lock.unlock();
        }
    }

    //  Get task from blocking queue ,  If there is no task , Will be waiting 
    public T take() {
    
        lock.lock();
        try {
    
            //  Whether the blocking queue is empty 
            while (queue.isEmpty()) {
    
                //  Enter the condition variable of the consumer and wait , There are no tasks for consumption at this time 
                try {
    
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
    
                    e.printStackTrace();
                }
            }
            //  The blocking queue is not empty ,  Get queue header task 
            T t = queue.removeFirst();
            fullWaitSet.signal(); //  Wake up producers to produce ,  At this time, the blocking queue is not full 
            return t;
        } finally {
    
            lock.unlock(); //  Release the lock 
        }
    }

    //  Add tasks to the blocking queue 
    public void put(T task) {
    
        lock.lock();
        try {
    
            //  Determine whether the blocking queue is full 
            while (queue.size() == capacity) {
    
                try {
    
                    log.debug(" Waiting to join the blocking queue ...");
                    //  Enter the condition variable of the producer and wait ,  There is no capacity for production at this time 
                    fullWaitSet.await();
                } catch (InterruptedException e) {
    
                    e.printStackTrace();
                }
            }
            //  Add new tasks to the queue 
            queue.addLast(task);
            log.debug(" Join the task blocking queue  {}", task);
            emptyWaitSet.signal(); //  At this time, there are tasks in the blocking queue ,  Wake up consumers for consumption tasks 
        } finally {
    
            lock.unlock();
        }
    }

    //  Add tasks to the blocking queue ( With timeout )
    public boolean offer(T task, long timeout, TimeUnit timeUnit) {
    
        lock.lock();
        try {
    
            long nanos = timeUnit.toNanos(timeout);
            while (queue.size() == capacity) {
    
                try {
    
                    if (nanos <= 0) {
    
                        return false;
                    }
                    log.debug(" Wait to enter the blocking queue  {}...", task);
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
    
                    e.printStackTrace();
                }
            }
            log.debug(" Join the task blocking queue  {}", task);
            queue.addLast(task);
            emptyWaitSet.signal(); //  At this time, there are tasks in the blocking queue ,  Wake up consumers for consumption tasks 
            return true;
        } finally {
    
            lock.unlock();
        }
    }

    //  Get queue size 
    public int size() {
    
        lock.lock();
        try {
    
            return queue.size();
        } finally {
    
            lock.unlock();
        }
    }

    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
    
        lock.lock();
        try {
    
            //  Determine if the queue is full 
            if (queue.size() == capacity) {
    
                rejectPolicy.reject(this, task);
            } else {
    
                //  Have free time 
                log.debug(" Join the task queue  {}", task);
                queue.addLast(task);
                emptyWaitSet.signal(); //  At this time, there are tasks in the blocking queue ,  Wake up consumers for consumption tasks 
            }
        } finally {
    
            lock.unlock();
        }
    }
}

Untitled

  • Blocking queues BlockingQueue Used to temporarily store tasks that are too late to be executed by threads
    • It can also be said to balance the difference in execution speed between producers and consumers
    • The acquisition task and put task in it use producer - Consumer model
  • Threads in the thread pool Thread It was encapsulated again , Package for Worker
    • Calling Task object (Runnable、Callable) Of run When the method is used , The thread will perform the task , After execution, new tasks will be obtained from the blocking queue to execute
  • The main methods of executing tasks in the thread pool are execute Method
    • When executing, judge whether the number of executing threads is greater than the thread pool capacity

ThreadPoolExecutor

Untitled

Thread pool state

ThreadPoolExecutor Use int The height of 3 Bit to represent thread pool status , low 29 Bits indicate the number of threads

//  Thread pool state 
// runState is stored in the high-order bits
// RUNNING  high 3 Position as 111
private static final int RUNNING    = -1 << COUNT_BITS;

// SHUTDOWN  high 3 Position as 000
private static final int SHUTDOWN   =  0 << COUNT_BITS;

//  high 3 position  001
private static final int STOP       =  1 << COUNT_BITS;

//  high 3 position  010
private static final int TIDYING    =  2 << COUNT_BITS;

//  high 3 position  011
private static final int TERMINATED =  3 << COUNT_BITS;

Untitled

Status name high 3 The value of a describe
RUNNING111 Receive new tasks , Simultaneously process the tasks in the task queue
SHUTDOWN000 No new assignments , But processing tasks in the task queue
STOP001 Interrupt the task being performed , At the same time, discard the tasks in the blocking queue
TIDYING010 Task completed , The active thread is 0 when , Coming to an end
TERMINATED011 Put an end to the state
  • Compare... Numerically ,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING

Thread pool status and the number of threads in the thread pool An atomic integer ctl To express together

  • The main reason for using a number to represent two values is : You can pass once CAS Change the values of both properties at the same time
//  Atomic integer , front 3 Bit holds the state of the thread pool , The remaining bits hold the number of threads 
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

//  Not all platforms int All are 32 position .
//  Remove the first three bits to save the thread state , The rest is used to save the number of threads 
//  high 3 Position as 0, The remaining digits are all 1
private static final int COUNT_BITS = Integer.SIZE - 3;

// 2^COUNT_BITS Power , Indicates the maximum number of threads that can be saved 
// CAPACITY  The height of 3 Position as  0
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
  • Gets the thread pool state 、 The number of threads and the operation of merging the two values
// Packing and unpacking ctl
//  Get running state 
//  This operation will cause the height to be removed 3 All numbers other than bits become 0
private static int runStateOf(int c)     {
     return c & ~CAPACITY; }

//  Get the number of running threads 
//  This operation will make high 3 Position as 0
private static int workerCountOf(int c)  {
     return c & CAPACITY; }

//  Calculation ctl The new value 
private static int ctlOf(int rs, int wc) {
     return rs | wc; }
  • This information is stored in an atomic variable ctl in , The purpose is to combine the thread pool state with the number of threads , So you can use it once CAS Atomic operations are used to assign values
// c  Is the old value , ctlOf  The returned result is a new value 
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));

// rs  For the high  3  Bits represent the thread pool state , wc  For low  29  Bits represent the number of threads ,ctl  Is to merge them 
private static int ctlOf(int rs, int wc) {
     return rs | wc; }

Properties of thread pool

//  The worker thread , It's encapsulated inside Thread
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable {
    
    ...
}

//  Blocking queues , Used to store tasks that are too late to be executed by the core thread 
private final BlockingQueue<Runnable> workQueue;

//  lock 
private final ReentrantLock mainLock = new ReentrantLock();

//  Container for storing core threads , Only when the lock is held can the element be obtained ( Core thread )
private final HashSet<Worker> workers = new HashSet<Worker>();

Untitled

Construction method ( a key )

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
  • corePoolSize: Number of core threads
  • maximumPoolSize: Maximum number of threads
    • maximumPoolSize - corePoolSize = Number of emergency threads
    • Be careful : The emergency thread uses the emergency thread only when there are no idle core threads and the task queue is full
  • keepAliveTime: The maximum lifetime of an emergency thread when it is idle ( The core thread can run all the time )
  • unit: Time unit ( For emergency threads )
  • workQueue: Blocking queues ( Storage task )
    • Bounded blocking queue ArrayBlockingQueue
    • Infinite blocking queue LinkedBlockingQueue
    • At most one synchronization element SynchronousQueue
    • Priority queue PriorityBlockingQueue
  • threadFactory: Thread factory ( Name the thread )
  • handler: Refusal strategy

Operation mode

Untitled

  • There are no threads in the thread pool at first , When a task is submitted to the thread pool , The thread pool creates a new thread to perform the task .
  • When the number of threads reaches corePoolSize ( Number of core threads ) And when no thread is idle , If you join the task at this time , New tasks will be added workQueue ( Blocking queues ) In line , Until there are idle threads .
  • If the queue is bounded , So when the task exceeds the queue size , Will create maximumPoolSize - corePoolSize ( Maximum number of threads - Number of core threads ) Number of threads to save .
  • If the number of threads reaches maximumPoolSize ( Maximum number of threads ) There are still new tasks , At this time, the rejection policy will be executed . Refusal strategy jdk Provides 4 Kind of implementation , Other well-known frameworks also provide implementations
    • AbortPolicy Let the caller throw RejectedExecutionException abnormal , This is the default strategy
    • CallerRunsPolicy Let the caller run the task
    • DiscardPolicy Give up this mission
    • DiscardOldestPolicy Abandon the earliest task in the queue , This task replaces
    • Dubbo The implementation of the , Throwing out RejectedExecutionException The log will be recorded before the exception , and dump Thread stack information , Easy to locate problems
    • Netty The implementation of the , Is to create a new thread to execute the task
    • ActiveMQ The implementation of the , With overtime waiting (60s) Attempt to put in queue , Similar to the rejection policy we customized before
    • PinPoint The implementation of the , It uses a rejection policy chain , Each rejection policy in the policy chain will be tried one by one
  • When the peak is over , exceed corePoolSize( Number of core threads ) If there is no task to do for a period of time , Need to end saving resources , This time is by keepAliveTime and unit To control .

Untitled

  • According to this construction method ,JDK Executors Class provides many factory methods to create thread pools for various purposes

Operation mode

Untitled

  • When a task is passed to the thread pool , There may be the following possibilities
    • Assign tasks to a core thread to execute
    • Core threads are performing tasks , Put the task in the blocking queue workQueue Waiting to be executed
    • The blocking queue is full , Use emergency threads to perform tasks
    • When the emergency thread runs out , Beyond survival time (keepAliveTime) Will be released
      • Total tasks Greater than Maximum number of threads (maximumPoolSize) And the maximum blocking queue capacity (workQueue.capacity), Use reject policy

Refusal strategy

  • If the number of threads reaches maximumPoolSize ( Maximum number of threads ) There are still new tasks , At this time, the rejection policy will be executed . Refusal strategy jdk Provides 4 Kind of implementation

Untitled

  • Pithy formula : Reject the old tune
    • ( Thread pool rejection policy : Suspension strategy 、 Discard strategy 、 Abandoning the old strategy 、 The caller runs the strategy )
  • Simple answer :
    1. Suspension strategy : No special scenes .
    2. Discard strategy : Unimportant tasks ( Blog reading ).
    3. Abandoning the old strategy : Release the news .
    4. The caller runs the strategy : Failure scenarios are not allowed ( The performance requirements are not high 、 The amount of concurrency is small ).

AbortPolicy Suspension strategy : Discard the task and throw it out RejectedExecutionException abnormal . This is the default strategy

  • This is the default deny policy for thread pools , When the task can no longer be submitted , Throw an exception , Timely feedback program running status . If it is a more critical business , This rejection policy is recommended , In this way, when the subsystem can't carry more concurrency , Be able to discover through exception in time .
  • function : When a rejection policy is triggered , Throw an exception that refuses to execute , Abort strategy means to interrupt the current execution process .
  • Use scenarios : There is no special scene for this , But one thing is to handle the thrown exception correctly .ThreadPoolExecutor The default strategy is AbortPolicy,ExecutorService Interface series ThreadPoolExecutor Because there is no set deny policy displayed , So the default is this . But notice ,ExecutorService The thread pool instance queue in is unbounded , That is to say, the memory burst will not trigger the rejection strategy . When you customize the thread pool instance , When using this strategy, you must handle the exception thrown when the strategy is triggered , Because he will interrupt the current execution process .

DiscardPolicy Discard strategy : Discarding the task , But no exception is thrown . If the thread queue is full , Then the subsequent submitted tasks will be discarded , And it's a silent abandonment .

  • Use this policy , It may prevent us from discovering the abnormal state of the system . The suggestion is that some unimportant businesses adopt this strategy . for example , My blog website statistics reading is to use this kind of refusal strategy .
  • function : Simply and quietly discard the task , It doesn't trigger any action .
  • Use scenarios : If the task you submit doesn't matter , You can use it . Because it's an empty implementation , Will quietly devour your mission . So this strategy is basically not used .

DiscardOldestPolicy Abandoning the old strategy : Discard the top task in the queue , Then resubmit the rejected task .

  • This rejection policy , It's a rejection strategy that likes the new and dislikes the old . Whether to adopt this kind of refusal strategy , It has to be carefully measured by whether the actual business allows the old tasks to be discarded .
  • function : If the thread pool is not closed , Pop up the element at the head of the queue , And then try to execute
  • Use scenarios : This strategy still discards the task , There's no sound when you throw it away , But the feature is that it discards old tasks that have not been performed , And it's a higher priority task to be performed . Based on this feature , The scene I think of is , Publish and modify messages , When the news comes out , Not yet implemented , At this time, the updated news comes again , At this time, the version of the message that has not been executed is lower than that of the message submitted now and can be discarded . Because there may be messages with lower version in the queue that will be queued for execution , Therefore, we must do a good job of comparing the versions of messages when processing messages .

CallerRunsPolicy The caller runs the strategy : The calling thread processes the task .

  • function : When a rejection policy is triggered , As long as the thread pool is not closed , It is handled by the current thread that submitted the task .
  • Use scenarios : Generally, failure is not allowed 、 The performance requirements are not high 、 It can be used in the scenario of less concurrency , Because thread pools don't normally close , In other words, the submitted task must be run , But because it's the caller thread itself , When a task is submitted more than once , It's blocking subsequent tasks , Performance and efficiency are naturally slow .

newFixedThreadPool

Internally called constructor

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}
  • characteristic
    • Number of core threads = Maximum number of threads ( No emergency threads have been created ), Therefore, no timeout is required
    • The blocking queue is unbounded , You can put any number of tasks
    • It can be used when the amount of task is known , Relatively time-consuming tasks

This is Executors class Provide factory methods to create thread pools !Executors yes Executor Framework tool class !

/** * @author xiexu * @create 2022-02-09 4:09 PM */
@Slf4j(topic = "c.TestThreadPoolExecutors")
public class TestThreadPoolExecutors {
    

    public static void main(String[] args) {
    
        //  The number of core threads created is 2 Thread pool of 
        //  adopt  ThreadFactory  You can add a name to the thread 
        ExecutorService pool = Executors.newFixedThreadPool(2, new ThreadFactory() {
    
            private AtomicInteger t = new AtomicInteger(1);

            @Override
            public Thread newThread(Runnable r) {
    
                return new Thread(r, "mypool_t" + t.getAndIncrement());
            }
        });

        pool.execute(() -> {
    
            log.debug("1");
        });

        pool.execute(() -> {
    
            log.debug("2");
        });

        pool.execute(() -> {
    
            log.debug("3");
        });
    }

}

Untitled

  • Thread pool size is 2,3 A mission ,t1 The thread is finished 1 after , Just go ahead and do it 3 了
  • The created threads are all non daemon threads by default ,main The thread execution is not over

newCachedThreadPool

Internal construction method

public static ExecutorService newCachedThreadPool() {
    
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
  • No core threads , The maximum number of threads is Integer.MAX_VALUE, All created threads are emergency threads ( Can be created indefinitely ), The free time is 60 second
  • Blocking queues use SynchronousQueue
    • SynchronousQueue It's a special kind of queue
      • No capacity , You can't put it in without a thread ( Hand in hand 、 Delivery on one hand )
      • Only when a thread fetches a task , Will put the task into the blocking queue
  • The whole thread pool shows that the number of threads will increase according to the number of tasks , There is no upper limit , When the task is finished , Free 1 Release thread in minutes .
  • Suitable for intensive tasks , But when each task takes a short time

SynchronousQueue demonstration

@Slf4j(topic = "c.TestSynchronousQueue")
public class TestSynchronousQueue {
    

    public static void main(String[] args) {
    
        SynchronousQueue<Integer> integers = new SynchronousQueue<>();
        new Thread(() -> {
    
            try {
    
                log.debug("putting {} ", 1);
                integers.put(1);
                log.debug("{} putted...", 1);

                log.debug("putting...{} ", 2);
                integers.put(2);
                log.debug("{} putted...", 2);
            } catch (InterruptedException e) {
    
                e.printStackTrace();
            }
        }, "t1").start();

        sleep(1);

        new Thread(() -> {
    
            try {
    
                log.debug("taking {}", 1);
                integers.take();
            } catch (InterruptedException e) {
    
                e.printStackTrace();
            }
        }, "t2").start();

        sleep(1);

        new Thread(() -> {
    
            try {
    
                log.debug("taking {}", 2);
                integers.take();
            } catch (InterruptedException e) {
    
                e.printStackTrace();
            }
        }, "t3").start();
    }
}

Untitled

  • use newCachedThreadPool( ) Create a lead pool , The implementation of its blocking queue is SynchronousQueue.
  • The initial maximum number of threads in the thread pool is Integer The maximum of , But it's all emergency threads , And the thread is lazy to initialize ( That is, not all of them will be created at the beginning , But you can create so many with it )
  • Then the capacity of the blocking queue is empty , If there is no thread to get it, it cannot be stored , It acts as a buffer , There is no need to block , Because the emergency thread has no upper limit , I will come and take away your task soon .
  • Wait until the execution of the thread task in the thread pool is completed , Free 1 The emergency thread created before will be released in minutes .

newSingleThreadExecutor

Internal construction method

public static ExecutorService newSingleThreadExecutor() {
    
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

Use scenarios :

  • I want multiple tasks to queue up . The number of threads is fixed to 1; There are more tasks than 1 when , Will be placed in an unbounded queue . Task completed , The only thread will not be released .

difference :

  • Create a single thread to execute the task serially , If the task fails and terminates, there is no remedy , and newSingleThreadExecutor The thread pool will also create a new thread , Ensure the normal operation of the pool
  • Executors.newSingleThreadExecutor() The number of threads is always 1, Do not modify
    • FinalizableDelegatedExecutorService The application is decorator mode , Only exposed ExecutorService Interface , So you can't call ThreadPoolExecutor The unique method in
  • Executors.newFixedThreadPool(1) When the initial for 1, It can be modified later
    • What's exposed is ThreadPoolExecutor object , Can be strong after the call. setCorePoolSize And so on

Code example

public static void test2() {
    
    ExecutorService pool = Executors.newSingleThreadExecutor();
    pool.execute(() -> {
    
        log.debug("1");
        int i = 1 / 0;
    });

    pool.execute(() -> {
    
        log.debug("2");
    });

    pool.execute(() -> {
    
        log.debug("3");
    });
}

Untitled

  • Threads 1 After hanging up , Another thread has been created 2 To perform the task , Always ensure that there is an available thread in the thread pool .

Executors The disadvantages of returning thread pool objects are as follows ( a key )

Be careful : Executors The disadvantages of returning thread pool objects are as follows :

  • FixedThreadPool and SingleThreadExecutor : The queue length allowed for requests is Integer.MAX_VALUE ( Infinite blocking queue ) , It's possible to pile up a lot of requests , Which leads to OOM.
  • CachedThreadPool and ScheduledThreadPool : The number of threads allowed to be created Integer.MAX_VALUE , A large number of threads may be created , Which leads to OOM.
  • It is recommended to use ThreadPoolExecutor To create threads

Avoid the above measures : Use bounded queues , Control the number of threads created .

perform / Submit tasks execute/submit

//  Perform tasks 
void execute(Runnable command);

//  Submit tasks  task, Use return value  Future  Get mission results ,Future The principle of is to use the protective pause mode we mentioned earlier to accept the returned results , The main thread can execute  FutureTask.get() Method to wait for the task to complete 
<T> Future<T> submit(Callable<T> task);

//  Submit  tasks  All tasks in 
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
 
//  Submit  tasks  All tasks in , With timeout 
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException;

//  Submit  tasks  All tasks in , Which task is successfully completed first , Returns the result of this task , Other tasks cancelled 
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;

//  Submit  tasks  All tasks in , Which task is successfully completed first , Returns the result of this task , Other tasks cancelled , With timeout 
<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

execute( ) Method

execute(Runnable command)
  • Pass in a Runnable object , Carry out the run Method
  • The source code parsing
public void execute(Runnable command) {
    
    if (command == null)
        throw new NullPointerException();

    //  obtain ctl
    int c = ctl.get();
    
    //  Judge whether the number of currently enabled threads is less than the number of core threads 
    if (workerCountOf(c) < corePoolSize) {
    
        //  Assign a thread to the task 
        if (addWorker(command, true))
            //  If the allocation is successful, it returns 
            return;
        
        //  Allocation failed. Get again ctl
        c = ctl.get();
    }
    
    //  After allocation and information thread failure 
    //  If the pool status is RUNNING And successfully inserted into the task queue 
    if (isRunning(c) && workQueue.offer(command)) {
    
        
        //  Double detection , The state of the thread pool may change to non after adding RUNNING
        int recheck = ctl.get();
        
        //  If the pool status is non RUNNING, The new task will not be executed 
        //  Remove the task from the blocking queue 
        if (! isRunning(recheck) && remove(command))
            //  Call the deny policy , Reject the execution of the task 
            reject(command);
        
        //  If there are no running threads 
        else if (workerCountOf(recheck) == 0)
            //  Create a new thread to perform the task 
            addWorker(null, false);
    }
    
    //  If the addition fails ( The task queue is full ), Call the reject policy 
    else if (!addWorker(command, false))
        reject(command);
}
  • Which calls **addWoker( )** Method , Look at this method again
private boolean addWorker(Runnable firstTask, boolean core) {
    
    retry:
    for (;;) {
    
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        //  If the pool status is non RUNNING state 、 Thread pool is SHUTDOWN And the task is empty   Or there are already tasks in the blocking queue 
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            //  Failed to create new thread 
            return false;

        for (;;) {
    
            //  Get the current number of worker threads 
            int wc = workerCountOf(c);

            //  Parameters in  core  by true
            // CAPACITY  by  1 << COUNT_BITS-1, Usually not more than 
            //  If the number of worker threads is greater than the number of core threads , The creation fails 
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //  adopt CAS Operation change c Value 
            if (compareAndIncrementWorkerCount(c))
                //  If the change is successful, you will jump out of multiple loops , And no longer run the cycle 
                break retry;
            //  Change failed , Recapture ctl Value 
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                //  Jump out of multiple loops , And re-enter the cycle 
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    //  Used to mark work Is the task in successfully executed 
    boolean workerStarted = false;
    //  Used to mark worker Successfully joined the thread pool 
    boolean workerAdded = false;
    Worker w = null;
    try {
    
        //  Create new threads to perform tasks 
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
    
            final ReentrantLock mainLock = this.mainLock;
            //  Lock 
            mainLock.lock();
            try {
    
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                //  Check again while locking 
                //  Avoid calling before releasing the lock. shut down
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
    
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //  Add thread to thread pool 
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    //  The success flag bit is changed to true
                    workerAdded = true;
                }
            } finally {
    
                mainLock.unlock();
            }
            //  If worker Successfully joined the thread pool , Just perform the task 
            if (workerAdded) {
    
                t.start();
                //  Successful launch 
                workerStarted = true;
            }
        }
    } finally {
    
        //  If execution fails 
        if (! workerStarted)
            //  Call the function that failed to add 
            addWorkerFailed(w);
    }
    return workerStarted;
}

submit( ) Method

Future<T> submit(Callable<T> task)
  • Pass in a Callable object , use Future Come on Capture the return value
private static void method1(ExecutorService pool) throws InterruptedException, ExecutionException {
    
				//  adopt submit perform Callable Medium call Method 
				//  adopt Future To capture the return value 
        Future<String> future = pool.submit(new Callable<String>() {
    
            @Override
            public String call() throws Exception {
    
                log.debug("running");
                Thread.sleep(1000);
                return "ok";
            }
        });

        log.debug("{}", future.get());
    }

Untitled

invokeAll

private static void method2(ExecutorService pool) throws InterruptedException {
    
        List<Future<String>> futures = pool.invokeAll(Arrays.asList(() -> {
    
            log.debug("begin");
            Thread.sleep(1000);
            return "1";
        }, () -> {
    
            log.debug("begin");
            Thread.sleep(500);
            return "2";
        }, () -> {
    
            log.debug("begin");
            Thread.sleep(2000);
            return "3";
        }));

        futures.forEach(f -> {
    
            try {
    
                log.debug("{}", f.get());
            } catch (InterruptedException | ExecutionException e) {
    
                e.printStackTrace();
            }
        });
    }

Untitled

invokeAny

private static void method3(ExecutorService pool) throws InterruptedException, ExecutionException {
    
        String result = pool.invokeAny(Arrays.asList(() -> {
    
            log.debug("begin 1");
            Thread.sleep(1000);
            log.debug("end 1");
            return "1";
        }, () -> {
    
            log.debug("begin 2");
            Thread.sleep(500);
            log.debug("end 2");
            return "2";
        }, () -> {
    
            log.debug("begin 3");
            Thread.sleep(2000);
            log.debug("end 3");
            return "3";
        }));
        log.debug("{}", result);
    }

Untitled

Close thread pool shutdown( )

shutdown( )

  • Change the state of the thread pool to SHUTDOWN
  • Stop taking on new tasks , However, the tasks in the blocking queue will be completed
public void shutdown() {
    
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
    
        checkShutdownAccess();
      //  Modify thread pool state 
        advanceRunState(SHUTDOWN);
      //  Only idle threads will be interrupted 
        interruptIdleWorkers();
       //  The extension point  ScheduledThreadPoolExecutor
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
    
        mainLock.unlock();
    }
  //  Try to end ( Threads that are not running can terminate immediately , If there are running threads, they won't wait )
    tryTerminate();
}

shutdownNow( )

  • Change the state of the thread pool to STOP
  • Stop taking on new tasks , It will not block the tasks in the queue
  • The unexecuted tasks in the blocking queue will be returned to the caller
  • And use interrupt To interrupt a task in progress
public List<Runnable> shutdownNow() {
    
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
    
        checkShutdownAccess();
        
        //  Change the status to STOP, Don't do anything 
        advanceRunState(STOP);
        
        //  Interrupt all threads 
        interruptWorkers();
        
        //  Remove unexecuted tasks from the queue , And then back to the caller 
        tasks = drainQueue();
    } finally {
    
        mainLock.unlock();
    }
    //  Try to end , It will succeed , Because the blocking queue is empty 
    tryTerminate();
    return tasks;
}

Other methods

//  be not in  RUNNING  Thread pool for state , This method returns  true
boolean isShutdown();

//  Whether the thread pool state is  TERMINATED
boolean isTerminated();

//  call  shutdown  after , Because calling the method to end the thread is asynchronous, it does not wait for all tasks to run and return ,
//  So if it wants to pool  TERMINATED  Then do something else , You can use this method to wait for 
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

Code example

@Slf4j(topic = "c.TestShutDown")
public class TestShutDown {
    

    public static void main(String[] args) throws ExecutionException, InterruptedException {
    
        ExecutorService pool = Executors.newFixedThreadPool(2);

        Future<Integer> result1 = pool.submit(() -> {
    
            log.debug("task 1 running...");
            Thread.sleep(1000);
            log.debug("task 1 finish...");
            return 1;
        });

        Future<Integer> result2 = pool.submit(() -> {
    
            log.debug("task 2 running...");
            Thread.sleep(1000);
            log.debug("task 2 finish...");
            return 2;
        });

        Future<Integer> result3 = pool.submit(() -> {
    
            log.debug("task 3 running...");
            Thread.sleep(1000);
            log.debug("task 3 finish...");
            return 3;
        });

        log.debug("shutdown");
// pool.shutdown();
// pool.awaitTermination(3, TimeUnit.SECONDS);
        List<Runnable> runnables = pool.shutdownNow();
        log.debug("other.... {}", runnables);
    }
}

shutdown Output

Untitled

shotdownNow Output

Untitled

Worker threads in asynchronous mode

Definition

  • Let limited worker threads (Worker Thread) To take turns processing an infinite number of tasks asynchronously . It can also be classified as The mode of division of labor , Its typical implementation is thread pool , It also reflects the Heyuan mode in the classic design pattern .
  • for example , Undersea fishing waiter ( Threads ), Take turns with each guest's order ( Mission ), If each guest is equipped with a dedicated waiter , Then the cost is too high ( Contrast another multithreading design pattern :Thread-Per-Message)
  • Be careful , Different task types should use different thread pools , It's a way to avoid starvation , And improve efficiency
  • for example , If a restaurant worker wants to greet guests ( Task type A), I'm going to cook in the back kitchen again ( Task type B) Obviously not very efficient , It's divided into waiters ( Thread pool A) With the chef ( Thread pool B) More reasonable , Of course, you can think of a more detailed division of labor

hunger

A fixed size thread pool will be hungry

  • Two workers are two threads in the same thread pool
  • What they're going to do is : Ordering for guests and cooking in the back kitchen , It's two stages of work
    • The guests order : You have to order first , Wait for the dishes to be ready , Serve , In the meantime, the workers handling the order have to wait
    • Back kitchen cooking : Nothing to say. , Just do it
  • Like workers A I've taken care of the order , Next it has to wait Worker B Do the dishes well , And then the dishes , They also cooperated very well
  • But now there are two guests at the same time , At this time, the workers A And workers B We've all gone to order , At this time, no one cooked , hunger

Hunger example

@Slf4j(topic = "c.TestDeadLock")
public class TestDeadLock {
    

    static final List<String> MENU = Arrays.asList(" Sauteed Potato, Green Pepper and Eggplant ", " Kung Pao Chicken ", " Sauteed Chicken Dices with Chili Peppers ", " Roast chicken wings ");
    static Random RANDOM = new Random();

    static String cooking() {
    
        return MENU.get(RANDOM.nextInt(MENU.size()));
    }

    public static void main(String[] args) {
    
        ExecutorService pool = Executors.newFixedThreadPool(2);

        pool.execute(() -> {
    
            log.debug(" Handle order ...");
            Future<String> future = pool.submit(() -> {
    
                log.debug(" cook a dish ");
                return cooking();
            });
            try {
    
                log.debug(" Serve : {}", future.get());
            } catch (InterruptedException | ExecutionException e) {
    
                e.printStackTrace();
            }
        });

// pool.execute(() -> {
    
// log.debug(" Handle order ...");
// Future<String> future = pool.submit(() -> {
    
// log.debug(" cook a dish ");
// return cooking();
// });
// try {
    
// log.debug(" Serve : {}", future.get());
// } catch (InterruptedException | ExecutionException e) {
    
// e.printStackTrace();
// }
// });
    }

}

Only one guest , Threads 1 Responsible for ordering and serving , Threads 2 Responsible for cooking

Untitled

When you cancel the comment , It's equivalent to two guests , Threads 1 And thread 2 We all went to order , No one is cooking , Then there is hunger

Untitled

Hunger resolution

The solution is to increase the size of the thread pool , But not the fundamental solution , As mentioned earlier , Different types of tasks , Use different thread pools

@Slf4j(topic = "c.TestDeadLock")
public class TestDeadLock {
    

    static final List<String> MENU = Arrays.asList(" Sauteed Potato, Green Pepper and Eggplant ", " Kung Pao Chicken ", " Sauteed Chicken Dices with Chili Peppers ", " Roast chicken wings ");
    static Random RANDOM = new Random();

    static String cooking() {
    
        return MENU.get(RANDOM.nextInt(MENU.size()));
    }

    public static void main(String[] args) {
    
        ExecutorService waiterPool = Executors.newFixedThreadPool(1);
        ExecutorService cookPool = Executors.newFixedThreadPool(1);

        waiterPool.execute(() -> {
    
            log.debug(" Handle order ...");
            Future<String> future = cookPool.submit(() -> {
    
                log.debug(" cook a dish ");
                return cooking();
            });
            try {
    
                log.debug(" Serve : {}", future.get());
            } catch (InterruptedException | ExecutionException e) {
    
                e.printStackTrace();
            }
        });

        waiterPool.execute(() -> {
    
            log.debug(" Handle order ...");
            Future<String> future = cookPool.submit(() -> {
    
                log.debug(" cook a dish ");
                return cooking();
            });
            try {
    
                log.debug(" Serve : {}", future.get());
            } catch (InterruptedException | ExecutionException e) {
    
                e.printStackTrace();
            }
        });
    }

}

Untitled

It is better to set the number of threads in the thread pool ?

  • Too small will lead to the program can not make full use of system resources 、 It's easy to lead to hunger
  • Too much will lead to more thread context switching , Use more memory

CPU Intensive operations

Usually used **cpu Check the number + 1** Be able to achieve optimal CPU utilization ,+1 Is to ensure that when the thread fails due to page loss ( operating system ) Or other reasons , The extra thread can be pushed up , Guarantee CPU Clock cycles are not wasted

I/O Intensive operations

CPU Not always busy , for example , When you perform business calculations , At this time will use CPU resources , But when you execute I/O In operation 、 long-range RPC Invocation time , Including database operation , Now CPU I'm free , You can use multithreading to improve its utilization .

The empirical formula is as follows

 Number of threads  =  Check the number  *  expect  CPU  utilization  *  Total time (CPU computing time + Waiting time ) / CPU  computing time 
  • for example 4 nucleus CPU The calculation time is 50% , Other waiting times are 50%, expect cpu By 100% utilize , To paraphrase formula
4 * 100% * 100% / 50% = 8
  • for example 4 nucleus CPU The calculation time is 10% , Other waiting times are 90%, expect cpu By 100% utilize , To paraphrase formula
4 * 100% * 100% / 10% = 40

Task scheduling thread pool

  • stay 『 Task scheduling thread pool 』 Before adding the function , have access to java.util.Timer To realize the timing function ,Timer The advantage is that it's easy to use , But because all tasks are done by Same thread To dispatch , So all tasks are Serial Executive , There can only be one task at a time , The delay or exception of the previous task will affect the later task .
@Slf4j(topic = "c.TestTimer")
public class TestTimer {
    

    public static void main(String[] args) {
    
        Timer timer = new Timer();
        TimerTask task1 = new TimerTask() {
    
            @Override
            public void run() {
    
                log.debug("task 1");
                Sleeper.sleep(2);
            }
        };

        TimerTask task2 = new TimerTask() {
    
            @Override
            public void run() {
    
                log.debug("task 2");
            }
        };
        log.debug("start...");
        //  Use timer Add two tasks ,  I hope they are all 1s After execution 
        //  But because of  timer  There is only one thread in the queue to execute the tasks in sequence , therefore 『 Mission 1』 Time delay of , Affected 『 Mission 2』 Implementation 
        timer.schedule(task1, 1000);
        timer.schedule(task2, 1000);
    }

}

Untitled

ScheduledExecutorService ( a key )

ScheduledExecutorService in schedule Use of methods

public static void method2() {
    
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
        //  Add two tasks , I hope they're all there  1s  After execution 
        pool.schedule(() -> {
    
            System.out.println(" Mission 1, execution time :" + new Date());
            try {
    
                Thread.sleep(2000);
            } catch (InterruptedException e) {
    
            }
        }, 1, TimeUnit.SECONDS);
        pool.schedule(() -> {
    
            System.out.println(" Mission 2, execution time :" + new Date());
        }, 1, TimeUnit.SECONDS);
    }

Untitled

  • The execution of two threads does not interfere with each other
  • Of course , If the thread pool size is 1, Tasks are still executed serially

ScheduledExecutorService in scheduleAtFixedRate Use of methods

public static void method3() {
    
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
        log.debug("start...");
        //  Delay 1s after ,  Press 1s The rate of printing running
        pool.scheduleAtFixedRate(() -> {
    
            log.debug("running...");
        }, 1, 1, TimeUnit.SECONDS);
    }

Untitled

If the task execution time exceeds the interval

public static void method3() {
    
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
        log.debug("start...");
        //  Delay 1s after ,  Press 1s The rate of printing running
        pool.scheduleAtFixedRate(() -> {
    
            log.debug("running...");
            Sleeper.sleep(2);
        }, 1, 1, TimeUnit.SECONDS);
    }

Untitled

  • Output analysis : In limine , Time delay 1s, Next , Due to task execution time > Time interval between , Interval is 『 support 』 here we are 2s

ScheduledExecutorService in scheduleWithFixedDelay Use of methods

public static void method4() {
    
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
        log.debug("start...");
        pool.scheduleWithFixedDelay(() -> {
    
            log.debug("running...");
            sleep(2);
        }, 1, 1, TimeUnit.SECONDS);
    }

Untitled

  • Output analysis : In limine , Time delay 1s,scheduleWithFixedDelay What's the interval End of last task + Time delay = The next task begins , So the intervals are 3s
  • The whole thread pool appears as : The number of threads is fixed , When the number of tasks is more than the number of threads , Will be placed in an unbounded queue . Task completed , These threads will not be released . Used to perform tasks that are delayed or repeated .

eg: How to make every Thursday 18:00:00 Scheduled tasks ?

public class TestSchedule {
    

    //  How to make every Thursday  18:00:00  Scheduled tasks ?
    public static void main(String[] args) {
    
        //  Get the current time 
        LocalDateTime now = LocalDateTime.now();
        System.out.println(now);
        //  Get Thursday time 
        LocalDateTime time = now.withHour(18).withMinute(0).withSecond(0).withNano(0).with(DayOfWeek.THURSDAY);
        //  If   current time  >  This Thursday , Must find next Thursday 
        if (now.compareTo(time) > 0) {
    
            time = time.plusWeeks(1); //  Plus a week 
        }
        System.out.println(time);
        // initailDelay  Represents the time difference between the current time and Thursday 
        long initailDelay = Duration.between(now, time).toMillis();
        // period  The interval of a week 
        long period = 1000 * 60 * 60 * 24 * 7;
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
        pool.scheduleAtFixedRate(() -> {
    
            System.out.println("running...");
        }, initailDelay, period, TimeUnit.MILLISECONDS);
    }

}

Handle the exception of executing task correctly

Method 1: Active catch exception

ExecutorService pool = Executors.newFixedThreadPool(1);
pool.submit(() -> {
    
    try {
    
        log.debug("task1");
        int i = 1 / 0;
    } catch (Exception e) {
    
        log.error("error:", e);
    }
});

Untitled

Method 2: Use Future

ExecutorService pool = Executors.newFixedThreadPool(1);
Future<Boolean> future = pool.submit(() -> {
    
    log.debug("task1");
    int i = 1 / 0;
    return true;
});
log.debug("result:{}", future.get());

Untitled

  • Use Future Receive return value , If normal, receive Callable Interface return value
  • If there is anything unusual , Will catch exceptions

Tomcat Thread pool

Tomcat Where is the thread pool used ?

Untitled

  • LimitLatch For current limiting , You can control the maximum number of connections , similar J.U.C Medium Semaphore I'll talk about it later
  • Acceptor Only responsible for 【 Receive new socket Connect 】
  • Poller Only monitor socket channel Is there a 【 Readable I/O event 】
  • Once readable , Encapsulate a task object (socketProcessor), Submit to Executor Thread pool processing
  • Executor The worker threads in the thread pool are ultimately responsible for 【 Processing requests 】

It reflects that different thread pools do different jobs

Tomcat Thread pool extends ThreadPoolExecutor, The behavior is slightly different

  • If the number of bus passes reaches maximumPoolSize
    • It won't be thrown immediately RejectedExecutionException abnormal
    • Instead, try putting the task in the queue again , If it still fails , Just throw RejectedExecutionException abnormal

Source code tomcat-7.0.42

public void execute(Runnable command, long timeout, TimeUnit unit) {
    
    submittedCount.incrementAndGet();
    try {
    
        super.execute(command);
    } catch (RejectedExecutionException rx) {
    
        if (super.getQueue() instanceof TaskQueue) {
    
            final TaskQueue queue = (TaskQueue)super.getQueue();
            try {
    
                //  Re enter the task into the blocking queue 
                if (!queue.force(command, timeout, unit)) {
    
                    submittedCount.decrementAndGet();
                    throw new RejectedExecutionException("Queue capacity is full.");
                }
            } catch (InterruptedException x) {
    
                submittedCount.decrementAndGet();
                Thread.interrupted();
                throw new RejectedExecutionException(x);
            }
        } else {
    
            submittedCount.decrementAndGet();
            throw rx;
        }
    }
}
  • TaskQueue.java
public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
    
    if ( parent.isShutdown() )
        throw new RejectedExecutionException(
                "Executor not running, can't force a command into the queue"
        );
    return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected
}
  • Connector To configure

Untitled

  • Executor Thread configuration

The daemon thread means that the thread will end with the end of the main thread

Untitled

  • There is something wrong in the figure below , Submit tasks < Core thread , It should be directly handed over to the core thread for execution .

Untitled

Fork/Join ( be familiar with )

Concept

  • Fork/Join yes JDK 1.7 Join the new thread pool implementation , It embodies the idea of divide and rule , It is suitable for those who can split tasks cpu Intensive operations
  • So called task splitting , It is to divide a large task into small tasks with the same algorithm , Until it can't be split, it can be solved directly . Some calculations related to recursion , Such as merge order 、 Fibonacci sequence 、 Can be solved with the idea of divide and conquer
  • Fork/Join On the basis of divide and conquer, multithreading is added , The decomposition and merging of each task can be handed over to different threads to complete , Further improve the operation efficiency
  • Fork/Join And are created by default cpu Thread pool with the same number of cores

Use

  • Submit to Fork/Join The tasks of the thread pool need to be inherited RecursiveTask( There is a return value ) or RecursiveAction( no return value ), For example, the following defines a pair of 1~n The task of summing integers between
@Slf4j(topic = "c.TestForkJoin2")
public class TestForkJoin2 {
    

    public static void main(String[] args) {
    
        ForkJoinPool pool = new ForkJoinPool(4);
        System.out.println(pool.invoke(new MyTask(5)));

        // new MyTask(5) 5+ new MyTask(4) 4 + new MyTask(3) 3 + new MyTask(2) 2 + new MyTask(1)
    }
}

// 1~n  The sum of integers between 
@Slf4j(topic = "c.MyTask")
class MyTask extends RecursiveTask<Integer> {
    

    private int n;

    public MyTask(int n) {
    
        this.n = n;
    }

    @Override
    public String toString() {
    
        return "{" + n + '}';
    }

    @Override
    protected Integer compute() {
    
        //  If  n  Have been to  1, We can get the result 
        if (n == 1) {
    
            log.debug("join() {}", n);
            return n;
        }

        //  Split the task (fork)
        AddTask1 t1 = new AddTask1(n - 1);
        t1.fork(); //  Let a thread perform this task 
        log.debug("fork() {} + {}", n, t1);

        //  Merge (join) result 
        int result = n + t1.join(); //  Get task results 
        log.debug("join() {} + {} = {}", n, t1, result);
        return result;
    }
}

Untitled

Graphic effect

Untitled

improvement

@Slf4j(topic = "c.AddTaskTest")
public class AddTaskTest {
    

    public static void main(String[] args) {
    
        ForkJoinPool pool = new ForkJoinPool(4);
        System.out.println(pool.invoke(new AddTask3(1, 5)));
    }

}

@Slf4j(topic = "c.AddTask3")
class AddTask3 extends RecursiveTask<Integer> {
    

    int begin;
    int end;

    public AddTask3(int begin, int end) {
    
        this.begin = begin;
        this.end = end;
    }

    @Override
    public String toString() {
    
        return "{" + begin + "," + end + '}';
    }

    @Override
    protected Integer compute() {
    
        if (begin == end) {
    
            log.debug("join() {}", begin);
            return begin;
        }
        if (end - begin == 1) {
    
            log.debug("join() {} + {} = {}", begin, end, end + begin);
            return end + begin;
        }
        int mid = (end + begin) / 2;

        AddTask3 t1 = new AddTask3(begin, mid);
        t1.fork();
        AddTask3 t2 = new AddTask3(mid + 1, end);
        t2.fork();
        log.debug("fork() {} + {} = ?", t1, t2);

        int result = t1.join() + t2.join();
        log.debug("join() {} + {} = {}", t1, t2, result);
        return result;
    }

}

Untitled

Graphical results

Untitled

原网站

版权声明
本文为[Ape feather]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/02/202202132008496616.html

随机推荐