当前位置:网站首页>Chapter 7 - thread pool of shared model
Chapter 7 - thread pool of shared model
2022-07-06 06:42:00 【Ape feather】
Chapter vii. - Thread pool of shared model
Thread pool ( a key )
Pool technology
There's a lot of , such asThread pool
、Database connection pool
、HTTP Connection pool
And so on are the application of this idea .The idea of pooling technology is to reduce the consumption of resources every time , Improve the utilization of resources .
Thread pools provide a Restrict and manage resources ( Including performing a task ). Each thread pool also maintains some basic statistics , For example, the number of tasks completed .
Borrow here 《Java The art of concurrent programming 》 Let's talk about the benefits of using thread pool :
Reduce resource consumption .
Reduces the cost of thread creation and destruction by reusing created threads .( Threads created , In fact, the final mapping should be done with the threads of the operating system , It's a drain on resources )Improve response time .
When the mission arrives , Tasks can be executed immediately without waiting for a thread to be created .Improve the manageability of threads .
Threads are scarce resources , If unlimited creation , Not only does it consume system resources , It also reduces the stability of the system , Uniform allocation is possible using thread pools , Tune and monitor .
Custom thread pool
- The blocking queue is maintained by the main thread ( Or other threads ) The resulting task
- The main thread is similar to producer , Generate the task and put it into the blocking queue
- Thread pools are similar to consumer , Get the existing tasks in the blocking queue and execute
Implementation steps of custom thread pool :
- step 1: Customize the rejection policy interface
- step 2: Custom task blocking queue
- step 3: Custom thread pool
- step 4: test
/** * Description: Customize a simple thread pool * * @author xiexu * @create 2022-02-08 8:06 PM */
@Slf4j(topic = "c.TestPool")
public class TestPool {
public static void main(String[] args) {
/** * The first parameter : Maximum number of threads * The second parameter : Timeout time * The third parameter : Time unit * Fourth parameter : Queue capacity */
ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1, new RejectPolicy<Runnable>() {
@Override
public void reject(BlockingQueue<Runnable> queue, Runnable task) {
// Refusal strategy
// 1、 Death etc.
//queue.put(task);
// 2、 With overtime waiting
queue.offer(task, 500, TimeUnit.MILLISECONDS);
// 3、 Let the caller give up the task execution
// log.debug(" give up -{}", task);
// 4、 Let the caller discard the exception
// throw new RuntimeException(" Task execution failed " + task);
// 5、 Let the caller do the task himself
// task.run();
}
});
// establish 5 A mission
for (int i = 0; i < 3; i++) {
int j = i;
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("{}", j);
}
});
}
}
}
@FunctionalInterface // Refusal strategy
interface RejectPolicy<T> {
void reject(BlockingQueue<T> queue, T task);
}
/** * Thread pool */
@Slf4j(topic = "c.ThreadPool")
class ThreadPool {
// Blocking the task queue
private BlockingQueue<Runnable> taskQueue;
// Thread set
private HashSet<Worker> workers = new HashSet<>();
// Number of core threads
private int coreSize;
// Get the timeout of the task
private long timeout;
private TimeUnit timeUnit;
// Refusal strategy
private RejectPolicy<Runnable> rejectPolicy;
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(queueCapacity);
this.rejectPolicy = rejectPolicy;
}
// Perform tasks
public void execute(Runnable task) {
synchronized (workers) {
// When the number of tasks does not exceed the number of threads , Show the current worker Threads can consume these tasks , There is no need to add tasks to the blocking queue
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
log.debug(" newly added worker {}, {}", worker, task);
// Add a newly created thread to the thread collection
workers.add(worker);
worker.start();
} else {
// When the number of tasks exceeds the number of threads , Join the task queue for staging
// taskQueue.put(task); // Only one kind of dead waiting can be set , So we can use the rejection strategy
// Refusal strategy
// 1、 Death etc.
// 2、 With overtime waiting
// 3、 Let the caller give up the task execution
// 4、 Let the caller discard the exception
// 5、 Let the caller do the task himself
taskQueue.tryPut(rejectPolicy, task);
}
}
}
// Thread class
class Worker extends Thread {
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
// Perform tasks
// 1): When task Not empty , Perform tasks
// 2): When task completion of enforcement , Then get the task from the blocking queue and execute
//while (task != null || (task = taskQueue.take()) != null) {
while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
try {
log.debug(" Being implemented ...{}", task);
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
// Set the executed task to null
task = null;
}
}
// Remove the current thread from the thread collection
synchronized (workers) {
log.debug(" At present worker Removed {}", this);
workers.remove(this);
}
}
}
}
/** * Blocking queue for storing tasks * * @param <T> Runnable, The task is abstracted as Runnable */
@Slf4j(topic = "c.BlockingQueue")
class BlockingQueue<T> {
// 1、 Task queue
private Deque<T> queue = new ArrayDeque<>();
// 2、 lock
private ReentrantLock lock = new ReentrantLock();
// 3、 Producer's conditional variable ( When the blocking queue is full of tasks , There's no space , Now enter the condition variable and wait )
private Condition fullWaitSet = lock.newCondition();
// 4、 Consumer conditional variables ( When there are no tasks to consume , Enter the condition variable and wait )
private Condition emptyWaitSet = lock.newCondition();
// 5、 The capacity of the blocking queue
private int capacity;
public BlockingQueue(int capacity) {
this.capacity = capacity;
}
// Get task from blocking queue , If there is no task , Will wait for the specified time
public T poll(long timeout, TimeUnit unit) {
lock.lock();
try {
// take timeout Unified conversion to nanoseconds
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
try {
// Said the timeout , No need to wait , Go straight back to null
if (nanos <= 0) {
return null;
}
// Time of return value ( Returns the remaining time ) = Waiting time - After time So there is no false awakening ( It refers to being awakened before enough time , Then wait for the same time again )
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal(); // Wake up producers to produce , At this time, the blocking queue is not full
return t;
} finally {
lock.unlock();
}
}
// Get task from blocking queue , If there is no task , Will be waiting
public T take() {
lock.lock();
try {
// Whether the blocking queue is empty
while (queue.isEmpty()) {
// Enter the condition variable of the consumer and wait , There are no tasks for consumption at this time
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// The blocking queue is not empty , Get queue header task
T t = queue.removeFirst();
fullWaitSet.signal(); // Wake up producers to produce , At this time, the blocking queue is not full
return t;
} finally {
lock.unlock(); // Release the lock
}
}
// Add tasks to the blocking queue
public void put(T task) {
lock.lock();
try {
// Determine whether the blocking queue is full
while (queue.size() == capacity) {
try {
log.debug(" Waiting to join the blocking queue ...");
// Enter the condition variable of the producer and wait , There is no capacity for production at this time
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// Add new tasks to the queue
queue.addLast(task);
log.debug(" Join the task blocking queue {}", task);
emptyWaitSet.signal(); // At this time, there are tasks in the blocking queue , Wake up consumers for consumption tasks
} finally {
lock.unlock();
}
}
// Add tasks to the blocking queue ( With timeout )
public boolean offer(T task, long timeout, TimeUnit timeUnit) {
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while (queue.size() == capacity) {
try {
if (nanos <= 0) {
return false;
}
log.debug(" Wait to enter the blocking queue {}...", task);
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug(" Join the task blocking queue {}", task);
queue.addLast(task);
emptyWaitSet.signal(); // At this time, there are tasks in the blocking queue , Wake up consumers for consumption tasks
return true;
} finally {
lock.unlock();
}
}
// Get queue size
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
// Determine if the queue is full
if (queue.size() == capacity) {
rejectPolicy.reject(this, task);
} else {
// Have free time
log.debug(" Join the task queue {}", task);
queue.addLast(task);
emptyWaitSet.signal(); // At this time, there are tasks in the blocking queue , Wake up consumers for consumption tasks
}
} finally {
lock.unlock();
}
}
}
- Blocking queues
BlockingQueue
Used to temporarily store tasks that are too late to be executed by threads- It can also be said to balance the difference in execution speed between producers and consumers
- The acquisition task and put task in it use
producer - Consumer model
- Threads in the thread pool Thread It was encapsulated again , Package for Worker
- Calling Task object (Runnable、Callable) Of run When the method is used , The thread will perform the task , After execution, new tasks will be obtained from the blocking queue to execute
- The main methods of executing tasks in the thread pool are
execute
Method- When executing, judge whether the number of executing threads is greater than the thread pool capacity
ThreadPoolExecutor
Thread pool state
ThreadPoolExecutor
Use int The height of 3 Bit to represent thread pool status , low 29 Bits indicate the number of threads
// Thread pool state
// runState is stored in the high-order bits
// RUNNING high 3 Position as 111
private static final int RUNNING = -1 << COUNT_BITS;
// SHUTDOWN high 3 Position as 000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// high 3 position 001
private static final int STOP = 1 << COUNT_BITS;
// high 3 position 010
private static final int TIDYING = 2 << COUNT_BITS;
// high 3 position 011
private static final int TERMINATED = 3 << COUNT_BITS;
Status name | high 3 The value of a | describe |
---|---|---|
RUNNING | 111 | Receive new tasks , Simultaneously process the tasks in the task queue |
SHUTDOWN | 000 | No new assignments , But processing tasks in the task queue |
STOP | 001 | Interrupt the task being performed , At the same time, discard the tasks in the blocking queue |
TIDYING | 010 | Task completed , The active thread is 0 when , Coming to an end |
TERMINATED | 011 | Put an end to the state |
- Compare... Numerically ,
TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING
Thread pool status and the number of threads in the thread pool An atomic integer ctl To express together
- The main reason for using a number to represent two values is : You can pass once CAS Change the values of both properties at the same time
// Atomic integer , front 3 Bit holds the state of the thread pool , The remaining bits hold the number of threads
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Not all platforms int All are 32 position .
// Remove the first three bits to save the thread state , The rest is used to save the number of threads
// high 3 Position as 0, The remaining digits are all 1
private static final int COUNT_BITS = Integer.SIZE - 3;
// 2^COUNT_BITS Power , Indicates the maximum number of threads that can be saved
// CAPACITY The height of 3 Position as 0
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
- Gets the thread pool state 、 The number of threads and the operation of merging the two values
// Packing and unpacking ctl
// Get running state
// This operation will cause the height to be removed 3 All numbers other than bits become 0
private static int runStateOf(int c) {
return c & ~CAPACITY; }
// Get the number of running threads
// This operation will make high 3 Position as 0
private static int workerCountOf(int c) {
return c & CAPACITY; }
// Calculation ctl The new value
private static int ctlOf(int rs, int wc) {
return rs | wc; }
- This information is stored in an atomic variable ctl in , The purpose is to combine the thread pool state with the number of threads , So you can use it once CAS Atomic operations are used to assign values
// c Is the old value , ctlOf The returned result is a new value
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));
// rs For the high 3 Bits represent the thread pool state , wc For low 29 Bits represent the number of threads ,ctl Is to merge them
private static int ctlOf(int rs, int wc) {
return rs | wc; }
Properties of thread pool
// The worker thread , It's encapsulated inside Thread
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
...
}
// Blocking queues , Used to store tasks that are too late to be executed by the core thread
private final BlockingQueue<Runnable> workQueue;
// lock
private final ReentrantLock mainLock = new ReentrantLock();
// Container for storing core threads , Only when the lock is held can the element be obtained ( Core thread )
private final HashSet<Worker> workers = new HashSet<Worker>();
Construction method ( a key )
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize
: Number of core threadsmaximumPoolSize
: Maximum number of threadsmaximumPoolSize - corePoolSize = Number of emergency threads
- Be careful : The emergency thread uses the emergency thread only when there are no idle core threads and the task queue is full
keepAliveTime
: The maximum lifetime of an emergency thread when it is idle ( The core thread can run all the time )unit
: Time unit ( For emergency threads )workQueue
: Blocking queues ( Storage task )- Bounded blocking queue
ArrayBlockingQueue
- Infinite blocking queue
LinkedBlockingQueue
- At most one synchronization element
SynchronousQueue
- Priority queue
PriorityBlockingQueue
- Bounded blocking queue
threadFactory
: Thread factory ( Name the thread )handler
: Refusal strategy
Operation mode
- There are no threads in the thread pool at first , When a task is submitted to the thread pool , The thread pool creates a new thread to perform the task .
- When the number of threads reaches
corePoolSize ( Number of core threads )
And when no thread is idle , If you join the task at this time , New tasks will be addedworkQueue ( Blocking queues )
In line , Until there are idle threads . - If the queue is bounded , So when the task exceeds the queue size , Will create
maximumPoolSize - corePoolSize ( Maximum number of threads - Number of core threads )
Number of threads to save . - If the number of threads reaches
maximumPoolSize ( Maximum number of threads )
There are still new tasks , At this time, the rejection policy will be executed . Refusal strategy jdk Provides 4 Kind of implementation , Other well-known frameworks also provide implementations- AbortPolicy Let the caller throw
RejectedExecutionException
abnormal , This is the default strategy - CallerRunsPolicy Let the caller run the task
- DiscardPolicy Give up this mission
- DiscardOldestPolicy Abandon the earliest task in the queue , This task replaces
- Dubbo The implementation of the , Throwing out RejectedExecutionException The log will be recorded before the exception , and dump Thread stack information , Easy to locate problems
- Netty The implementation of the , Is to create a new thread to execute the task
- ActiveMQ The implementation of the , With overtime waiting (60s) Attempt to put in queue , Similar to the rejection policy we customized before
- PinPoint The implementation of the , It uses a rejection policy chain , Each rejection policy in the policy chain will be tried one by one
- AbortPolicy Let the caller throw
- When the peak is over , exceed
corePoolSize( Number of core threads )
If there is no task to do for a period of time , Need to end saving resources , This time is bykeepAliveTime and unit
To control .
- According to this construction method ,JDK Executors Class provides many factory methods to create thread pools for various purposes
Operation mode
- When a task is passed to the thread pool , There may be the following possibilities
- Assign tasks to a core thread to execute
- Core threads are performing tasks , Put the task in the blocking queue workQueue Waiting to be executed
- The blocking queue is full , Use emergency threads to perform tasks
- When the emergency thread runs out , Beyond survival time (keepAliveTime) Will be released
- Total tasks
Greater than
Maximum number of threads (maximumPoolSize) And the maximum blocking queue capacity (workQueue.capacity), Use reject policy
- Total tasks
Refusal strategy
- If the number of threads reaches
maximumPoolSize ( Maximum number of threads )
There are still new tasks , At this time, the rejection policy will be executed . Refusal strategy jdk Provides 4 Kind of implementation
- Pithy formula : Reject the old tune
- ( Thread pool rejection policy : Suspension strategy 、 Discard strategy 、 Abandoning the old strategy 、 The caller runs the strategy )
- Simple answer :
- Suspension strategy : No special scenes .
- Discard strategy : Unimportant tasks ( Blog reading ).
- Abandoning the old strategy : Release the news .
- The caller runs the strategy : Failure scenarios are not allowed ( The performance requirements are not high 、 The amount of concurrency is small ).
AbortPolicy Suspension strategy
: Discard the task and throw it out RejectedExecutionException abnormal . This is the default strategy
- This is the default deny policy for thread pools , When the task can no longer be submitted , Throw an exception , Timely feedback program running status . If it is a more critical business , This rejection policy is recommended , In this way, when the subsystem can't carry more concurrency , Be able to discover through exception in time .
- function : When a rejection policy is triggered , Throw an exception that refuses to execute , Abort strategy means to interrupt the current execution process .
- Use scenarios : There is no special scene for this , But one thing is to handle the thrown exception correctly .ThreadPoolExecutor The default strategy is AbortPolicy,ExecutorService Interface series ThreadPoolExecutor Because there is no set deny policy displayed , So the default is this . But notice ,ExecutorService The thread pool instance queue in is unbounded , That is to say, the memory burst will not trigger the rejection strategy . When you customize the thread pool instance , When using this strategy, you must handle the exception thrown when the strategy is triggered , Because he will interrupt the current execution process .
DiscardPolicy Discard strategy
: Discarding the task , But no exception is thrown . If the thread queue is full , Then the subsequent submitted tasks will be discarded , And it's a silent abandonment .
- Use this policy , It may prevent us from discovering the abnormal state of the system . The suggestion is that some unimportant businesses adopt this strategy . for example , My blog website statistics reading is to use this kind of refusal strategy .
- function : Simply and quietly discard the task , It doesn't trigger any action .
- Use scenarios : If the task you submit doesn't matter , You can use it . Because it's an empty implementation , Will quietly devour your mission . So this strategy is basically not used .
DiscardOldestPolicy Abandoning the old strategy
: Discard the top task in the queue , Then resubmit the rejected task .
- This rejection policy , It's a rejection strategy that likes the new and dislikes the old . Whether to adopt this kind of refusal strategy , It has to be carefully measured by whether the actual business allows the old tasks to be discarded .
- function : If the thread pool is not closed , Pop up the element at the head of the queue , And then try to execute
- Use scenarios : This strategy still discards the task , There's no sound when you throw it away , But the feature is that it discards old tasks that have not been performed , And it's a higher priority task to be performed . Based on this feature , The scene I think of is , Publish and modify messages , When the news comes out , Not yet implemented , At this time, the updated news comes again , At this time, the version of the message that has not been executed is lower than that of the message submitted now and can be discarded . Because there may be messages with lower version in the queue that will be queued for execution , Therefore, we must do a good job of comparing the versions of messages when processing messages .
CallerRunsPolicy The caller runs the strategy
: The calling thread processes the task .
- function : When a rejection policy is triggered , As long as the thread pool is not closed , It is handled by the current thread that submitted the task .
- Use scenarios : Generally, failure is not allowed 、 The performance requirements are not high 、 It can be used in the scenario of less concurrency , Because thread pools don't normally close , In other words, the submitted task must be run , But because it's the caller thread itself , When a task is submitted more than once , It's blocking subsequent tasks , Performance and efficiency are naturally slow .
newFixedThreadPool
Internally called constructor
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
- characteristic
- Number of core threads = Maximum number of threads ( No emergency threads have been created ), Therefore, no timeout is required
The blocking queue is unbounded , You can put any number of tasks
- It can be used when the amount of task is known , Relatively time-consuming tasks
This is
Executors class
Provide factory methods to create thread pools !Executors
yes Executor Framework tool class !
/** * @author xiexu * @create 2022-02-09 4:09 PM */
@Slf4j(topic = "c.TestThreadPoolExecutors")
public class TestThreadPoolExecutors {
public static void main(String[] args) {
// The number of core threads created is 2 Thread pool of
// adopt ThreadFactory You can add a name to the thread
ExecutorService pool = Executors.newFixedThreadPool(2, new ThreadFactory() {
private AtomicInteger t = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "mypool_t" + t.getAndIncrement());
}
});
pool.execute(() -> {
log.debug("1");
});
pool.execute(() -> {
log.debug("2");
});
pool.execute(() -> {
log.debug("3");
});
}
}
- Thread pool size is 2,3 A mission ,t1 The thread is finished 1 after , Just go ahead and do it 3 了
- The created threads are all non daemon threads by default ,main The thread execution is not over
newCachedThreadPool
Internal construction method
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- No core threads , The maximum number of threads is Integer.MAX_VALUE, All created threads are emergency threads ( Can be created indefinitely ), The free time is 60 second
- Blocking queues use SynchronousQueue
- SynchronousQueue It's a special kind of queue
- No capacity , You can't put it in without a thread ( Hand in hand 、 Delivery on one hand )
- Only when a thread fetches a task , Will put the task into the blocking queue
- SynchronousQueue It's a special kind of queue
- The whole thread pool shows that the number of threads will increase according to the number of tasks , There is no upper limit , When the task is finished , Free 1 Release thread in minutes .
- Suitable for intensive tasks , But when each task takes a short time
SynchronousQueue demonstration
@Slf4j(topic = "c.TestSynchronousQueue")
public class TestSynchronousQueue {
public static void main(String[] args) {
SynchronousQueue<Integer> integers = new SynchronousQueue<>();
new Thread(() -> {
try {
log.debug("putting {} ", 1);
integers.put(1);
log.debug("{} putted...", 1);
log.debug("putting...{} ", 2);
integers.put(2);
log.debug("{} putted...", 2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1").start();
sleep(1);
new Thread(() -> {
try {
log.debug("taking {}", 1);
integers.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t2").start();
sleep(1);
new Thread(() -> {
try {
log.debug("taking {}", 2);
integers.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t3").start();
}
}
- use newCachedThreadPool( ) Create a lead pool , The implementation of its blocking queue is
SynchronousQueue
. - The initial maximum number of threads in the thread pool is Integer The maximum of , But it's all emergency threads , And the thread is lazy to initialize ( That is, not all of them will be created at the beginning , But you can create so many with it )
- Then the capacity of the blocking queue is empty , If there is no thread to get it, it cannot be stored , It acts as a buffer , There is no need to block , Because the emergency thread has no upper limit , I will come and take away your task soon .
- Wait until the execution of the thread task in the thread pool is completed , Free 1 The emergency thread created before will be released in minutes .
newSingleThreadExecutor
Internal construction method
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
Use scenarios :
- I want multiple tasks to queue up . The number of threads is fixed to 1; There are more tasks than 1 when , Will be placed in an unbounded queue . Task completed , The only thread will not be released .
difference :
- Create a single thread to execute the task serially , If the task fails and terminates, there is no remedy , and
newSingleThreadExecutor
The thread pool will also create a new thread , Ensure the normal operation of the pool Executors.newSingleThreadExecutor()
The number of threads is always 1, Do not modifyFinalizableDelegatedExecutorService
The application is decorator mode , Only exposedExecutorService
Interface , So you can't callThreadPoolExecutor
The unique method in
Executors.newFixedThreadPool(1)
When the initial for 1, It can be modified later- What's exposed is
ThreadPoolExecutor
object , Can be strong after the call.setCorePoolSize
And so on
- What's exposed is
Code example
public static void test2() {
ExecutorService pool = Executors.newSingleThreadExecutor();
pool.execute(() -> {
log.debug("1");
int i = 1 / 0;
});
pool.execute(() -> {
log.debug("2");
});
pool.execute(() -> {
log.debug("3");
});
}
- Threads 1 After hanging up , Another thread has been created 2 To perform the task , Always ensure that there is an available thread in the thread pool .
Executors The disadvantages of returning thread pool objects are as follows ( a key
)
Be careful : Executors The disadvantages of returning thread pool objects are as follows :
- FixedThreadPool and SingleThreadExecutor : The queue length allowed for requests is Integer.MAX_VALUE ( Infinite blocking queue ) , It's possible to pile up a lot of requests , Which leads to OOM.
- CachedThreadPool and ScheduledThreadPool : The number of threads allowed to be created Integer.MAX_VALUE , A large number of threads may be created , Which leads to OOM.
- It is recommended to use
ThreadPoolExecutor
To create threads
Avoid the above measures :
Use bounded queues , Control the number of threads created .
perform / Submit tasks execute/submit
// Perform tasks
void execute(Runnable command);
// Submit tasks task, Use return value Future Get mission results ,Future The principle of is to use the protective pause mode we mentioned earlier to accept the returned results , The main thread can execute FutureTask.get() Method to wait for the task to complete
<T> Future<T> submit(Callable<T> task);
// Submit tasks All tasks in
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
// Submit tasks All tasks in , With timeout
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException;
// Submit tasks All tasks in , Which task is successfully completed first , Returns the result of this task , Other tasks cancelled
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
// Submit tasks All tasks in , Which task is successfully completed first , Returns the result of this task , Other tasks cancelled , With timeout
<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
execute( ) Method
execute(Runnable command)
- Pass in a Runnable object , Carry out the run Method
- The source code parsing
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// obtain ctl
int c = ctl.get();
// Judge whether the number of currently enabled threads is less than the number of core threads
if (workerCountOf(c) < corePoolSize) {
// Assign a thread to the task
if (addWorker(command, true))
// If the allocation is successful, it returns
return;
// Allocation failed. Get again ctl
c = ctl.get();
}
// After allocation and information thread failure
// If the pool status is RUNNING And successfully inserted into the task queue
if (isRunning(c) && workQueue.offer(command)) {
// Double detection , The state of the thread pool may change to non after adding RUNNING
int recheck = ctl.get();
// If the pool status is non RUNNING, The new task will not be executed
// Remove the task from the blocking queue
if (! isRunning(recheck) && remove(command))
// Call the deny policy , Reject the execution of the task
reject(command);
// If there are no running threads
else if (workerCountOf(recheck) == 0)
// Create a new thread to perform the task
addWorker(null, false);
}
// If the addition fails ( The task queue is full ), Call the reject policy
else if (!addWorker(command, false))
reject(command);
}
- Which calls **addWoker( )** Method , Look at this method again
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// If the pool status is non RUNNING state 、 Thread pool is SHUTDOWN And the task is empty Or there are already tasks in the blocking queue
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
// Failed to create new thread
return false;
for (;;) {
// Get the current number of worker threads
int wc = workerCountOf(c);
// Parameters in core by true
// CAPACITY by 1 << COUNT_BITS-1, Usually not more than
// If the number of worker threads is greater than the number of core threads , The creation fails
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// adopt CAS Operation change c Value
if (compareAndIncrementWorkerCount(c))
// If the change is successful, you will jump out of multiple loops , And no longer run the cycle
break retry;
// Change failed , Recapture ctl Value
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
// Jump out of multiple loops , And re-enter the cycle
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// Used to mark work Is the task in successfully executed
boolean workerStarted = false;
// Used to mark worker Successfully joined the thread pool
boolean workerAdded = false;
Worker w = null;
try {
// Create new threads to perform tasks
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// Lock
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// Check again while locking
// Avoid calling before releasing the lock. shut down
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// Add thread to thread pool
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// The success flag bit is changed to true
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// If worker Successfully joined the thread pool , Just perform the task
if (workerAdded) {
t.start();
// Successful launch
workerStarted = true;
}
}
} finally {
// If execution fails
if (! workerStarted)
// Call the function that failed to add
addWorkerFailed(w);
}
return workerStarted;
}
submit( ) Method
Future<T> submit(Callable<T> task)
- Pass in a Callable object , use Future Come on Capture the return value
private static void method1(ExecutorService pool) throws InterruptedException, ExecutionException {
// adopt submit perform Callable Medium call Method
// adopt Future To capture the return value
Future<String> future = pool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
log.debug("running");
Thread.sleep(1000);
return "ok";
}
});
log.debug("{}", future.get());
}
invokeAll
private static void method2(ExecutorService pool) throws InterruptedException {
List<Future<String>> futures = pool.invokeAll(Arrays.asList(() -> {
log.debug("begin");
Thread.sleep(1000);
return "1";
}, () -> {
log.debug("begin");
Thread.sleep(500);
return "2";
}, () -> {
log.debug("begin");
Thread.sleep(2000);
return "3";
}));
futures.forEach(f -> {
try {
log.debug("{}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
}
invokeAny
private static void method3(ExecutorService pool) throws InterruptedException, ExecutionException {
String result = pool.invokeAny(Arrays.asList(() -> {
log.debug("begin 1");
Thread.sleep(1000);
log.debug("end 1");
return "1";
}, () -> {
log.debug("begin 2");
Thread.sleep(500);
log.debug("end 2");
return "2";
}, () -> {
log.debug("begin 3");
Thread.sleep(2000);
log.debug("end 3");
return "3";
}));
log.debug("{}", result);
}
Close thread pool shutdown( )
shutdown( )
- Change the state of the thread pool to SHUTDOWN
- Stop taking on new tasks , However, the tasks in the blocking queue will be completed
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// Modify thread pool state
advanceRunState(SHUTDOWN);
// Only idle threads will be interrupted
interruptIdleWorkers();
// The extension point ScheduledThreadPoolExecutor
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// Try to end ( Threads that are not running can terminate immediately , If there are running threads, they won't wait )
tryTerminate();
}
shutdownNow( )
- Change the state of the thread pool to STOP
- Stop taking on new tasks , It will not block the tasks in the queue
- The unexecuted tasks in the blocking queue will be returned to the caller
- And use interrupt To interrupt a task in progress
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// Change the status to STOP, Don't do anything
advanceRunState(STOP);
// Interrupt all threads
interruptWorkers();
// Remove unexecuted tasks from the queue , And then back to the caller
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// Try to end , It will succeed , Because the blocking queue is empty
tryTerminate();
return tasks;
}
Other methods
// be not in RUNNING Thread pool for state , This method returns true
boolean isShutdown();
// Whether the thread pool state is TERMINATED
boolean isTerminated();
// call shutdown after , Because calling the method to end the thread is asynchronous, it does not wait for all tasks to run and return ,
// So if it wants to pool TERMINATED Then do something else , You can use this method to wait for
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
Code example
@Slf4j(topic = "c.TestShutDown")
public class TestShutDown {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(2);
Future<Integer> result1 = pool.submit(() -> {
log.debug("task 1 running...");
Thread.sleep(1000);
log.debug("task 1 finish...");
return 1;
});
Future<Integer> result2 = pool.submit(() -> {
log.debug("task 2 running...");
Thread.sleep(1000);
log.debug("task 2 finish...");
return 2;
});
Future<Integer> result3 = pool.submit(() -> {
log.debug("task 3 running...");
Thread.sleep(1000);
log.debug("task 3 finish...");
return 3;
});
log.debug("shutdown");
// pool.shutdown();
// pool.awaitTermination(3, TimeUnit.SECONDS);
List<Runnable> runnables = pool.shutdownNow();
log.debug("other.... {}", runnables);
}
}
shutdown Output
shotdownNow Output
Worker threads in asynchronous mode
Definition
- Let limited worker threads (Worker Thread) To take turns processing an infinite number of tasks asynchronously . It can also be classified as
The mode of division of labor
, Its typical implementation is thread pool , It also reflects the Heyuan mode in the classic design pattern . - for example , Undersea fishing waiter ( Threads ), Take turns with each guest's order ( Mission ), If each guest is equipped with a dedicated waiter , Then the cost is too high ( Contrast another multithreading design pattern :Thread-Per-Message)
- Be careful , Different task types should use different thread pools , It's a way to avoid starvation , And improve efficiency
- for example , If a restaurant worker wants to greet guests ( Task type A), I'm going to cook in the back kitchen again ( Task type B) Obviously not very efficient , It's divided into waiters ( Thread pool A) With the chef ( Thread pool B) More reasonable , Of course, you can think of a more detailed division of labor
hunger
A fixed size thread pool will be hungry
- Two workers are two threads in the same thread pool
- What they're going to do is : Ordering for guests and cooking in the back kitchen , It's two stages of work
- The guests order : You have to order first , Wait for the dishes to be ready , Serve , In the meantime, the workers handling the order have to wait
- Back kitchen cooking : Nothing to say. , Just do it
- Like workers A I've taken care of the order , Next it has to wait Worker B Do the dishes well , And then the dishes , They also cooperated very well
- But now there are two guests at the same time , At this time, the workers A And workers B We've all gone to order , At this time, no one cooked , hunger
Hunger example
@Slf4j(topic = "c.TestDeadLock")
public class TestDeadLock {
static final List<String> MENU = Arrays.asList(" Sauteed Potato, Green Pepper and Eggplant ", " Kung Pao Chicken ", " Sauteed Chicken Dices with Chili Peppers ", " Roast chicken wings ");
static Random RANDOM = new Random();
static String cooking() {
return MENU.get(RANDOM.nextInt(MENU.size()));
}
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(2);
pool.execute(() -> {
log.debug(" Handle order ...");
Future<String> future = pool.submit(() -> {
log.debug(" cook a dish ");
return cooking();
});
try {
log.debug(" Serve : {}", future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
// pool.execute(() -> {
// log.debug(" Handle order ...");
// Future<String> future = pool.submit(() -> {
// log.debug(" cook a dish ");
// return cooking();
// });
// try {
// log.debug(" Serve : {}", future.get());
// } catch (InterruptedException | ExecutionException e) {
// e.printStackTrace();
// }
// });
}
}
Only one guest , Threads 1 Responsible for ordering and serving , Threads 2 Responsible for cooking
When you cancel the comment , It's equivalent to two guests , Threads 1 And thread 2 We all went to order , No one is cooking , Then there is hunger
Hunger resolution
The solution is to increase the size of the thread pool , But not the fundamental solution , As mentioned earlier ,
Different types of tasks , Use different thread pools
@Slf4j(topic = "c.TestDeadLock")
public class TestDeadLock {
static final List<String> MENU = Arrays.asList(" Sauteed Potato, Green Pepper and Eggplant ", " Kung Pao Chicken ", " Sauteed Chicken Dices with Chili Peppers ", " Roast chicken wings ");
static Random RANDOM = new Random();
static String cooking() {
return MENU.get(RANDOM.nextInt(MENU.size()));
}
public static void main(String[] args) {
ExecutorService waiterPool = Executors.newFixedThreadPool(1);
ExecutorService cookPool = Executors.newFixedThreadPool(1);
waiterPool.execute(() -> {
log.debug(" Handle order ...");
Future<String> future = cookPool.submit(() -> {
log.debug(" cook a dish ");
return cooking();
});
try {
log.debug(" Serve : {}", future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
waiterPool.execute(() -> {
log.debug(" Handle order ...");
Future<String> future = cookPool.submit(() -> {
log.debug(" cook a dish ");
return cooking();
});
try {
log.debug(" Serve : {}", future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
}
}
It is better to set the number of threads in the thread pool ?
- Too small will lead to the program can not make full use of system resources 、 It's easy to lead to hunger
- Too much will lead to more thread context switching , Use more memory
CPU Intensive operations
Usually used **cpu Check the number + 1
** Be able to achieve optimal CPU utilization ,+1 Is to ensure that when the thread fails due to page loss ( operating system ) Or other reasons , The extra thread can be pushed up , Guarantee CPU Clock cycles are not wasted
I/O Intensive operations
CPU Not always busy , for example , When you perform business calculations , At this time will use CPU resources , But when you execute I/O In operation 、 long-range RPC Invocation time , Including database operation , Now CPU I'm free , You can use multithreading to improve its utilization .
The empirical formula is as follows
Number of threads = Check the number * expect CPU utilization * Total time (CPU computing time + Waiting time ) / CPU computing time
- for example 4 nucleus CPU The calculation time is 50% , Other waiting times are 50%, expect cpu By 100% utilize , To paraphrase formula
4 * 100% * 100% / 50% = 8
- for example 4 nucleus CPU The calculation time is 10% , Other waiting times are 90%, expect cpu By 100% utilize , To paraphrase formula
4 * 100% * 100% / 10% = 40
Task scheduling thread pool
- stay 『 Task scheduling thread pool 』 Before adding the function , have access to
java.util.Timer
To realize the timing function ,Timer The advantage is that it's easy to use , But because all tasks are done bySame thread
To dispatch , So all tasks areSerial
Executive , There can only be one task at a time , The delay or exception of the previous task will affect the later task .
@Slf4j(topic = "c.TestTimer")
public class TestTimer {
public static void main(String[] args) {
Timer timer = new Timer();
TimerTask task1 = new TimerTask() {
@Override
public void run() {
log.debug("task 1");
Sleeper.sleep(2);
}
};
TimerTask task2 = new TimerTask() {
@Override
public void run() {
log.debug("task 2");
}
};
log.debug("start...");
// Use timer Add two tasks , I hope they are all 1s After execution
// But because of timer There is only one thread in the queue to execute the tasks in sequence , therefore 『 Mission 1』 Time delay of , Affected 『 Mission 2』 Implementation
timer.schedule(task1, 1000);
timer.schedule(task2, 1000);
}
}
ScheduledExecutorService ( a key )
ScheduledExecutorService in schedule Use of methods
public static void method2() {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
// Add two tasks , I hope they're all there 1s After execution
pool.schedule(() -> {
System.out.println(" Mission 1, execution time :" + new Date());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
}, 1, TimeUnit.SECONDS);
pool.schedule(() -> {
System.out.println(" Mission 2, execution time :" + new Date());
}, 1, TimeUnit.SECONDS);
}
- The execution of two threads does not interfere with each other
- Of course , If the thread pool size is 1, Tasks are still executed serially
ScheduledExecutorService in scheduleAtFixedRate Use of methods
public static void method3() {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start...");
// Delay 1s after , Press 1s The rate of printing running
pool.scheduleAtFixedRate(() -> {
log.debug("running...");
}, 1, 1, TimeUnit.SECONDS);
}
If the task execution time exceeds the interval
public static void method3() {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start...");
// Delay 1s after , Press 1s The rate of printing running
pool.scheduleAtFixedRate(() -> {
log.debug("running...");
Sleeper.sleep(2);
}, 1, 1, TimeUnit.SECONDS);
}
- Output analysis : In limine , Time delay 1s, Next , Due to task execution time > Time interval between , Interval is 『 support 』 here we are 2s
ScheduledExecutorService in scheduleWithFixedDelay Use of methods
public static void method4() {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start...");
pool.scheduleWithFixedDelay(() -> {
log.debug("running...");
sleep(2);
}, 1, 1, TimeUnit.SECONDS);
}
- Output analysis : In limine , Time delay 1s,scheduleWithFixedDelay What's the interval
End of last task + Time delay = The next task begins
, So the intervals are 3s - The whole thread pool appears as : The number of threads is fixed , When the number of tasks is more than the number of threads , Will be placed in an unbounded queue . Task completed , These threads will not be released . Used to perform tasks that are delayed or repeated .
eg: How to make every Thursday 18:00:00 Scheduled tasks ?
public class TestSchedule {
// How to make every Thursday 18:00:00 Scheduled tasks ?
public static void main(String[] args) {
// Get the current time
LocalDateTime now = LocalDateTime.now();
System.out.println(now);
// Get Thursday time
LocalDateTime time = now.withHour(18).withMinute(0).withSecond(0).withNano(0).with(DayOfWeek.THURSDAY);
// If current time > This Thursday , Must find next Thursday
if (now.compareTo(time) > 0) {
time = time.plusWeeks(1); // Plus a week
}
System.out.println(time);
// initailDelay Represents the time difference between the current time and Thursday
long initailDelay = Duration.between(now, time).toMillis();
// period The interval of a week
long period = 1000 * 60 * 60 * 24 * 7;
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
pool.scheduleAtFixedRate(() -> {
System.out.println("running...");
}, initailDelay, period, TimeUnit.MILLISECONDS);
}
}
Handle the exception of executing task correctly
Method 1: Active catch exception
ExecutorService pool = Executors.newFixedThreadPool(1);
pool.submit(() -> {
try {
log.debug("task1");
int i = 1 / 0;
} catch (Exception e) {
log.error("error:", e);
}
});
Method 2: Use Future
ExecutorService pool = Executors.newFixedThreadPool(1);
Future<Boolean> future = pool.submit(() -> {
log.debug("task1");
int i = 1 / 0;
return true;
});
log.debug("result:{}", future.get());
- Use Future Receive return value , If normal, receive Callable Interface return value
- If there is anything unusual , Will catch exceptions
Tomcat Thread pool
Tomcat Where is the thread pool used ?
- LimitLatch For current limiting , You can control the maximum number of connections , similar J.U.C Medium Semaphore I'll talk about it later
- Acceptor Only responsible for 【 Receive new socket Connect 】
- Poller Only monitor socket channel Is there a 【 Readable I/O event 】
- Once readable , Encapsulate a task object (socketProcessor), Submit to Executor Thread pool processing
- Executor The worker threads in the thread pool are ultimately responsible for 【 Processing requests 】
It reflects that different thread pools do different jobs
Tomcat Thread pool extends ThreadPoolExecutor, The behavior is slightly different
- If the number of bus passes reaches maximumPoolSize
- It won't be thrown immediately RejectedExecutionException abnormal
- Instead, try putting the task in the queue again , If it still fails , Just throw RejectedExecutionException abnormal
Source code tomcat-7.0.42
public void execute(Runnable command, long timeout, TimeUnit unit) {
submittedCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
if (super.getQueue() instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue)super.getQueue();
try {
// Re enter the task into the blocking queue
if (!queue.force(command, timeout, unit)) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.");
}
} catch (InterruptedException x) {
submittedCount.decrementAndGet();
Thread.interrupted();
throw new RejectedExecutionException(x);
}
} else {
submittedCount.decrementAndGet();
throw rx;
}
}
}
- TaskQueue.java
public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
if ( parent.isShutdown() )
throw new RejectedExecutionException(
"Executor not running, can't force a command into the queue"
);
return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected
}
- Connector To configure
- Executor Thread configuration
The daemon thread means that the thread will end with the end of the main thread
- There is something wrong in the figure below ,
Submit tasks < Core thread
, It should be directly handed over to the core thread for execution .
Fork/Join ( be familiar with )
Concept
- Fork/Join yes JDK 1.7 Join the new thread pool implementation , It embodies the idea of divide and rule , It is suitable for those who can split tasks cpu Intensive operations
- So called task splitting , It is to divide a large task into small tasks with the same algorithm , Until it can't be split, it can be solved directly . Some calculations related to recursion , Such as merge order 、 Fibonacci sequence 、 Can be solved with the idea of divide and conquer
- Fork/Join On the basis of divide and conquer, multithreading is added , The decomposition and merging of each task can be handed over to different threads to complete , Further improve the operation efficiency
- Fork/Join And are created by default cpu Thread pool with the same number of cores
Use
- Submit to Fork/Join The tasks of the thread pool need to be inherited RecursiveTask( There is a return value ) or RecursiveAction( no return value ), For example, the following defines a pair of 1~n The task of summing integers between
@Slf4j(topic = "c.TestForkJoin2")
public class TestForkJoin2 {
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool(4);
System.out.println(pool.invoke(new MyTask(5)));
// new MyTask(5) 5+ new MyTask(4) 4 + new MyTask(3) 3 + new MyTask(2) 2 + new MyTask(1)
}
}
// 1~n The sum of integers between
@Slf4j(topic = "c.MyTask")
class MyTask extends RecursiveTask<Integer> {
private int n;
public MyTask(int n) {
this.n = n;
}
@Override
public String toString() {
return "{" + n + '}';
}
@Override
protected Integer compute() {
// If n Have been to 1, We can get the result
if (n == 1) {
log.debug("join() {}", n);
return n;
}
// Split the task (fork)
AddTask1 t1 = new AddTask1(n - 1);
t1.fork(); // Let a thread perform this task
log.debug("fork() {} + {}", n, t1);
// Merge (join) result
int result = n + t1.join(); // Get task results
log.debug("join() {} + {} = {}", n, t1, result);
return result;
}
}
Graphic effect
improvement
@Slf4j(topic = "c.AddTaskTest")
public class AddTaskTest {
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool(4);
System.out.println(pool.invoke(new AddTask3(1, 5)));
}
}
@Slf4j(topic = "c.AddTask3")
class AddTask3 extends RecursiveTask<Integer> {
int begin;
int end;
public AddTask3(int begin, int end) {
this.begin = begin;
this.end = end;
}
@Override
public String toString() {
return "{" + begin + "," + end + '}';
}
@Override
protected Integer compute() {
if (begin == end) {
log.debug("join() {}", begin);
return begin;
}
if (end - begin == 1) {
log.debug("join() {} + {} = {}", begin, end, end + begin);
return end + begin;
}
int mid = (end + begin) / 2;
AddTask3 t1 = new AddTask3(begin, mid);
t1.fork();
AddTask3 t2 = new AddTask3(mid + 1, end);
t2.fork();
log.debug("fork() {} + {} = ?", t1, t2);
int result = t1.join() + t2.join();
log.debug("join() {} + {} = {}", t1, t2, result);
return result;
}
}
Graphical results
边栏推荐
- LeetCode - 152 乘积最大子数组
- How do programmers remember code and programming language?
- Simple query cost estimation
- How to do a good job in financial literature translation?
- 钓鱼&文件名反转&office远程模板
- Classification des verbes reconstruits grammaticalement - - English Rabbit Learning notes (2)
- Chinese English comparison: you can do this Best of luck
- Day 245/300 JS foreach data cannot be updated to the object after multi-layer nesting
- [ 英語 ] 語法重塑 之 動詞分類 —— 英語兔學習筆記(2)
- Summary of leetcode's dynamic programming 4
猜你喜欢
[ 英语 ] 语法重塑 之 英语学习的核心框架 —— 英语兔学习笔记(1)
Making interactive page of "left tree and right table" based on jeecg-boot
关于新冠疫情,常用的英文单词、语句有哪些?
My seven years with NLP
Lecture 8: 1602 LCD (Guo Tianxiang)
如何将flv文件转为mp4文件?一个简单的解决办法
万丈高楼平地起,每个API皆根基
(practice C language every day) reverse linked list II
字幕翻译中翻英一分钟多少钱?
Market segmentation of supermarket customers based on purchase behavior data (RFM model)
随机推荐
On the first day of clock in, click to open a surprise, and the switch statement is explained in detail
Oscp raven2 target penetration process
Engineering organisms containing artificial metalloenzymes perform unnatural biosynthesis
[ 英语 ] 语法重塑 之 动词分类 —— 英语兔学习笔记(2)
Biomedical English contract translation, characteristics of Vocabulary Translation
Day 248/300 关于毕业生如何找工作的思考
Mise en œuvre d’une fonction complexe d’ajout, de suppression et de modification basée sur jeecg - boot
机器学习植物叶片识别
How much is the price for the seal of the certificate
Black cat takes you to learn UFS protocol Chapter 4: detailed explanation of UFS protocol stack
ML之shap:基于adult人口普查收入二分类预测数据集(预测年收入是否超过50k)利用Shap值对XGBoost模型实现可解释性案例之详细攻略
Drug disease association prediction based on multi-scale heterogeneous network topology information and multiple attributes
Black cat takes you to learn UFS protocol Chapter 18: how UFS configures logical units (Lu Management)
电子书-CHM-上线CS
Traffic encryption of red blue confrontation (OpenSSL encrypted transmission, MSF traffic encryption, CS modifying profile for traffic encryption)
Delete external table source data
Black cat takes you to learn EMMC Protocol Part 10: EMMC read and write operation details (read & write)
mysql按照首字母排序
LeetCode每日一题(1870. Minimum Speed to Arrive on Time)
LeetCode 731. My schedule II