Preface
The first one who has been busy in life recently OKR, Wait, let's talk about it later , Starting today, I will resume the original article every week , Thank you for your persistence . This article is also recently in Code Review When , See everyone's code , I want to push down everyone's idea of asynchronous programming as a whole , Written from this .
Why use CompletableFuture
In some business scenarios, we need to use multithreading to execute tasks asynchronously , Speed up task execution . JDK5 Added Future Interface , Used to describe the result of an asynchronous calculation . although Future And related usage methods provide the ability to perform tasks asynchronously , But it's not convenient to get the results , We have to use Future.get Blocking the calling thread , Or use polling to judge Future.isDone Is the mission over , And get the results . Neither of these treatments is very elegant , The relevant code is as follows :
@Test
public void testFuture() {
ExecutorService executorService = Executors.newFixedThreadPool(5);
Future<String> future = executorService.submit(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
});
try {
System.out.println(future.get());
System.out.println("end");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
In addition, it cannot solve the scenario where multiple asynchronous tasks need to be interdependent , In short, it's , The main thread needs to wait for the sub thread to execute after the task is completed , At this time, you may think of CountDownLatch, Yes, it can solve , The code is as follows , however Java8 I no longer think this is an elegant solution , Now let's take a look at CompletableFuture Use .
@Test
public void testCountDownLatch() {
ExecutorService executorService = Executors.newFixedThreadPool(5);
CountDownLatch downLatch = new CountDownLatch(2);
Future<String> orderFuture = executorService.submit(() -> {
OrderService orderService = new OrderServiceImpl();
String result = orderService.queryOrderInfo();
downLatch.countDown();
return result;
});
Future<String> trailFuture = executorService.submit(() -> {
TrailService trailService = new TrailServiceImpl();
String result = trailService.queryTrail();
downLatch.countDown();
return result;
});
try {
downLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
System.out.println(orderFuture.get() + trailFuture.get());
System.out.println("end");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
Introduction
Create examples

About CompletableFuture The creation of provides 5 Ways of planting , The first is to create a with default results CompletableFuture, Infrequent use , What we often use is runAsync and supplyAsync, Focus on these two static methods .runAsync Has no return value ,supplyAsync Has a return value , Both methods provide one or two creation forms , One is to use public by default ForkJoinPool Thread pool execution , The default number of threads in this thread pool is CPU The number of nuclear .

Another use is to provide an active thread pool , This is compared to ForkJoinPool Are the benefits of , You can customize thread pools for different scenarios , To divide the business and find problems , Another advantage is for ForkJoinPool For this shared thread , Once blocked , It will be regarded as that other threads cannot obtain execution opportunities .
CompletionStage

About CompletableFuture Core competence is through inheritance Future and CompletionStage To achieve , About Future Is to provide some asynchronous capabilities , If the single deposit is like this CompletableFuture It won't be so powerful , So our core is to introduce CompletionStage Some internal methods are commonly used .

About CompletionStage Methods , There are two types of interfaces , The core methods provide asynchronous methods , The asynchronous method is not introduced here , Basically, the principle is similar , A new thread pool is provided to implement tasks .
Asynchronous callback
Asynchronous callbacks can be divided into two categories , One is the return with parameters , The other is the return without parameters , The returned parameters include thenApply and thenCompose, Returns without parameters include thenRun and thenAccept.
There are parameters to return
thenApply and thenCompose Indicates the action to be performed after a task is completed , Callback method , The execution result of the task, that is, the return value of the method, will be passed to the callback method as an input parameter , It can also be understood as serial , The only difference is thenCompose Need to return to a new CompletionStage, The overall use is as follows :
@Test
public void testCompletableFuture() {
long start = System.currentTimeMillis();
CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "aaaaaaaaaaaaaaaaaaaaa";
}).thenApply(x -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return x + "bbbbbbbbbbbbbbbbbbbbbbbb";
}).thenCompose(x -> CompletableFuture.supplyAsync(x::toUpperCase));
System.out.println(first.join());
long end = System.currentTimeMillis();
System.out.println(" Time consuming " + (end - start) / 1000 + "");
}
No parameters return
thenAccep It is also the action of consuming the last task , Pass the execution result of the task, that is, the return value of the method, as an input parameter to the callback method , Just no return value ,thenAccep And thenRun The difference between methods is that there is no input parameter and no return value .
@Test
public void testCompletableFuture() {
long start = System.currentTimeMillis();
CompletableFuture<Void> first = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "a";
}).thenAccept(x -> {
System.out.println(x);
});
first.join();
long end = System.currentTimeMillis();
System.out.println(" Time consuming " + (end - start) / 1000 + " second ");
}
abnormal
CompletableFuture Method execution if there is an exception , Only get perhaps join Method to get the exception , In this case CompletableFuture Provides three ways to handle exceptions .
exceptionally
exceptionally The use of is similar to try catch Medium catch Exception handling in code block .exceptionally Callback method executed when a task execution is abnormal , Pass the thrown exception as a parameter to the callback method , If the task is performed normally ,exceptionally Method CompletionStage Of result Is the result of the normal execution of the task .
@Test
public void testCompletableFuture() {
long start = System.currentTimeMillis();
CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
throw new RuntimeException("test");
});
CompletableFuture<String> two = first.exceptionally((x) -> {
x.printStackTrace();
return "123";
});
two.join();
long end = System.currentTimeMillis();
System.out.println(" Time consuming " + (end - start) / 1000 + " second ");
}
whenComplete
whenComplete The use of is similar to try..catch..finanlly in finally Code block , Whether or not there is an exception , Will be implemented .whenComplete Callback method executed after a task is completed , The execution result or the exception thrown during execution will be passed to the callback method , If it is executed normally, the exception is null, The callback method corresponds to CompletableFuture Of result Consistent with the task , If the task is performed normally , be get Method returns the execution result , If it is an execution exception , be get Method throws an exception .
@Test
public void testCompletableFuture() {
long start = System.currentTimeMillis();
CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "aa";
}).whenComplete((x, throwable) -> {
// If the exception exists , Printing exception , And return the default value
if (throwable != null) {
throwable.printStackTrace();
System.out.println(" Failure ");
} else {
System.out.println(" success ");
}
});
System.out.println(first.join());
long end = System.currentTimeMillis();
System.out.println(" Time consuming " + (end - start) / 1000 + " second ");
}
handle
Follow whenComplete Almost the same , The difference lies in handle The callback method of has a return value , And handle Method CompletableFuture Of result Is the execution result of the callback method or the exception thrown during the execution of the callback method , With primordial CompletableFuture Of result irrelevant .
@Test
public void testCompletableFuture() {
long start = System.currentTimeMillis();
CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
throw new RuntimeException("test");
}).handle((x, throwable) -> {
// If the exception exists , Printing exception , And return the default value
if (throwable != null) {
throwable.printStackTrace();
return " abnormal ";
} else {
return x + "aa";
}
});
System.out.println(first.join());
long end = System.currentTimeMillis();
System.out.println(" Time consuming " + (end - start) / 1000 + " second ");
}
synthetic relation
There are two types of combinatorial relationships , One is the relationship with , A yes or relationship , And relationship is to continue to perform when all tasks are completed , Be similar to CountDownLatch, Or relationship is that as long as a task is completed, it can be executed downward .
And relationship
thenCombine /thenAcceptBoth / runAfterBoth/allOf
These four methods can combine multiple CompletableFuture combined , Will be multiple CompletableFuture When it's all done , To perform the following operations , The difference lies in ,thenCombine The execution result of the task will be passed to the specified method as a method input parameter , And the method has a return value ;thenAcceptBoth Similarly, the execution result of the task is entered as a method , But no return value ;runAfterBoth No participation , There is no return value . Note that there is only one execution exception in multiple tasks , The exception information is taken as the execution result of the specified task .allOf Multiple tasks will be executed only after they are completed , As long as there is a task execution exception , The return of the CompletableFuture perform get Method will throw an exception , If it's all normal , be get return null.
@Test
public void testCompletableFuture() {
CompletableFuture<String> order = CompletableFuture.supplyAsync(() -> {
OrderService orderService = new OrderServiceImpl();
return orderService.queryOrderInfo();
});
CompletableFuture<String> trail = CompletableFuture.supplyAsync(() -> {
TrailService trailService = new TrailServiceImpl();
return trailService.queryTrail();
});
CompletableFuture<String> future = order.thenCombine(trail, (a, b) -> a + b);
CompletableFuture<Void> afterBoth = future.runAfterBoth(trail, () -> {
System.out.println(future.join());
});
CompletableFuture<Void> result = CompletableFuture.allOf(afterBoth);
try {
System.out.println(result.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
applyToEither / acceptEither / runAfterEither/anyOf
These four methods can combine multiple CompletableFuture combined , Just one of them CompletableFuture After execution , You can perform the following operations ,applyToEither The execution result of the completed task will be entered as a method , And it has a return value ;acceptEither Similarly, take the execution result of the completed task as the method parameter , But there is no return value ;runAfterEither There is no way to enter parameters , There is no return value , Note that there is only one execution exception in multiple tasks , The exception information is taken as the execution result of the specified task .anyOf Multiple tasks, as long as one task is completed , Follow up tasks can be performed .
@Test
public void testCompletableFuture() {
CompletableFuture<String> order = CompletableFuture.supplyAsync(() -> {
OrderService orderService = new OrderServiceImpl();
return orderService.queryOrderInfo();
});
CompletableFuture<String> trail = CompletableFuture.supplyAsync(() -> {
TrailService trailService = new TrailServiceImpl();
return trailService.queryTrail();
});
CompletableFuture<String> future = order.applyToEither(trail, (result) -> result);
CompletableFuture<Void> afterBoth = future.runAfterEither(trail, () -> {
System.out.println(future.join());
});
CompletableFuture<Object> result = CompletableFuture.anyOf(afterBoth,order);
try {
System.out.println(result.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
How to use in a project
CompletableFuture When customizing threads , The default thread pool is ForkJoinPool.commonPool(), For us to use java Often do IO Intensive task , The default thread pool is far from enough ; On dual core and below machines , The default thread pool will degenerate into creating one thread for each task , Equivalent to no thread pool . So for CompletableFuture The thread pool must be customized for use in the project , At the same time, pay attention to the custom thread pool , Thread pool has a full capacity reject policy , If the rejection policy of discard policy is adopted , also allOf Methods and get Method will wait indefinitely if no timeout is set , Next, we use... Through custom threads CompletableFuture.
Custom thread pool , Here through inheritance ThreadPoolExecutor, Rewrote shutdown() 、shutdownNow() 、beforeExecute() and afterExecute() Method to count the execution of thread pool , Here can also be combined with Spring and appllo Implement custom extended thread pool , The next article can talk about the extension idea and Implementation Scheme , Different thread pools are used for different business scenarios , One is to facilitate the troubleshooting of problems , Another is similar to Hystrix Isolation plan ;
package com.zto.lbd;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Thread pool monitoring class
*
* @author wangtongzhou 18635604249
* @since 2022-02-23 07:27
*/
public class ThreadPoolMonitor extends ThreadPoolExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolMonitor.class);
/**
* Save the start time of the task , When the task is over , Calculate the task execution time by subtracting the start time from the end time of the task
*/
private ConcurrentHashMap<String, Date> startTimes;
/**
* Thread pool name , Usually named after the business name , Easy to distinguish
*/
private String poolName;
/**
* Call the constructor of the parent class , And initialization HashMap And thread pool name
*
* @param corePoolSize Number of core threads in thread pool
* @param maximumPoolSize Maximum number of threads in the thread pool
* @param keepAliveTime The maximum idle time of the thread
* @param unit A unit of free time
* @param workQueue Save the queue of submitted tasks
* @param poolName Thread pool name
*/
public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), poolName);
}
/**
* Call the constructor of the parent class , And initialization HashMap And thread pool name
*
* @param corePoolSize Number of core threads in thread pool
* @param maximumPoolSize Maximum number of threads in the thread pool
* @param keepAliveTime The maximum idle time of the thread
* @param unit A unit of free time
* @param workQueue Save the queue of submitted tasks
* @param threadFactory Thread factory
* @param poolName Thread pool name
*/
public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, String poolName) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
this.startTimes = new ConcurrentHashMap<>();
this.poolName = poolName;
}
/**
* When thread pool is delayed to close ( Wait for all tasks in the thread pool to be executed ), Statistics thread pool situation
*/
@Override
public void shutdown() {
// Statistics of executed tasks 、 Executing task 、 Number of tasks not performed
LOGGER.info("{} Close thread pool , Task executed : {}, Executing task : {}, Number of tasks not performed : {}",
this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
super.shutdown();
}
/**
* When the thread pool is shut down immediately , Statistics thread pool situation
*/
@Override
public List<Runnable> shutdownNow() {
// Statistics of executed tasks 、 Executing task 、 Number of tasks not performed
LOGGER.info("{} Close thread pool now , Task executed : {}, Executing task : {}, Number of tasks not performed : {}",
this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
return super.shutdownNow();
}
/**
* Before the mission , Record the start time of the task
*/
@Override
protected void beforeExecute(Thread t, Runnable r) {
startTimes.put(String.valueOf(r.hashCode()), new Date());
}
/**
* After task execution , Calculate the end time of the task
*/
@Override
protected void afterExecute(Runnable r, Throwable t) {
Date startDate = startTimes.remove(String.valueOf(r.hashCode()));
Date finishDate = new Date();
long diff = finishDate.getTime() - startDate.getTime();
// The statistical task takes time 、 Initial number of threads 、 Number of core threads 、 Number of tasks in progress 、
// Number of completed tasks 、 Total tasks 、 Number of tasks cached in the queue 、 The maximum number of threads that exist in the pool 、
// Maximum number of threads allowed 、 Thread idle time 、 Whether the thread pool is closed 、 Whether the thread pool is terminated
LOGGER.info("{}-pool-monitor: " +
" The task takes time : {} ms, Initial number of threads : {}, Number of core threads : {}, Number of tasks in progress : {}, " +
" Number of completed tasks : {}, Total tasks : {}, Number of tasks in the queue : {}, The maximum number of threads that exist in the pool : {}, " +
" Maximum number of threads : {}, Thread idle time : {}, Whether the thread pool is closed : {}, Whether the thread pool is terminated : {}",
this.poolName,
diff, this.getPoolSize(), this.getCorePoolSize(), this.getActiveCount(),
this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(),
this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated());
}
/**
* The thread used to generate the thread pool , Rewrites the default thread factory of thread pool , Pass in the thread pool name , Easy to track the problem
*/
static class MonitorThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
/**
* Initialize thread factory
*
* @param poolName Thread pool name
*/
MonitorThreadFactory(String poolName) {
SecurityManager s = System.getSecurityManager();
group = Objects.nonNull(s) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = poolName + "-pool-" + poolNumber.getAndIncrement() + "-thread-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon()) {
t.setDaemon(false);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
}
Using a custom thread pool CompletableFuture;
private final static BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100);
private final static ThreadPoolMonitor threadPoolMonitor = new ThreadPoolMonitor(5, 10, 100L,
TimeUnit.SECONDS, workQueue, "monitor");
@Test
public void testCompletableFuture() {
CompletableFuture<String> order = CompletableFuture.supplyAsync(() -> {
OrderService orderService = new OrderServiceImpl();
return orderService.queryOrderInfo();
},threadPoolMonitor);
String result=order.join();
assertTrue(Objects.nonNull(result));
}
end
Welcome to pay attention to , A little bit of praise ! 




![leetcode:730. Statistics of different palindrome subsequences [traversed by point and surface interval DP + 3D DP + diagonal]](/img/90/79c51f944709ef230f394e874aa9ac.png)




