当前位置:网站首页>CompletableFuture使用详解
CompletableFuture使用详解
2022-07-07 03:00:00 【鬼罚olo】
CompletableFuture使用详解
Future的局限性,他没法直接对多个任务进行链式组合等处理,需要借助并发工具类才能完成,实现逻辑比较复杂
而CompletableFutures是对Future的扩展和增强,CompltableFuture实现Future接口,并在此基础上进行了丰富的扩展,完美弥补了Future的局限性,同时CompletableFuture实现了对任务的编排的能力。借助这项能力,可以轻松的组织不同任务的运行顺序,规则以及方式,从某种程序上说,这项能力时他的核心能力。而在以往,虽然通过CountDownlatch等工具类也可以实现任务的编排,但需要复杂的逻辑处理
ComplatableFuture的基础结构如下

CompletionStage接口定义了任务编排的方法,执行某一阶段,可以向下执行后续阶段。异步执行的
默认线程池是**ForkJoinPool.commonPool(),但为了业务之间互不影响,且便于定位问题,强烈推荐使用自定义线程池**。
CompletableFuture中默认线程池如下:
// 根据commonPool的并行度来选择,而并行度的计算是在ForkJoinPool的静态代码段完成的
private static final boolean useCommonPool =
(ForkJoinPool.getCommonPoolParallelism() > 1);
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
ForkJoinPool中初始化commonPool的参数
static {
// initialize field offsets for CAS etc
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ForkJoinPool.class;
CTL = U.objectFieldOffset
(k.getDeclaredField("ctl"));
RUNSTATE = U.objectFieldOffset
(k.getDeclaredField("runState"));
STEALCOUNTER = U.objectFieldOffset
(k.getDeclaredField("stealCounter"));
Class<?> tk = Thread.class;
……
} catch (Exception e) {
throw new Error(e);
}
commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
defaultForkJoinWorkerThreadFactory =
new DefaultForkJoinWorkerThreadFactory();
modifyThreadPermission = new RuntimePermission("modifyThread");
// 调用makeCommonPool方法创建commonPool,其中并行度为逻辑核数-1
common = java.security.AccessController.doPrivileged
(new java.security.PrivilegedAction<ForkJoinPool>() {
public ForkJoinPool run() { return makeCommonPool(); }});
int par = common.config & SMASK; // report 1 even if threads disabled
commonParallelism = par > 0 ? par : 1;
}
功能
常用方法
依赖关系
- thenApply():把前面任务的执行结果,交给后面的Function
- thenCompose():用来连接两个依赖关系的任务,结果由第二个任务返回
and集合关系
- thenCombine():合并任务,有返回值、
- thenAcceptBoth():两个热播我要执行完成后,将结果交给thenAcceptBotj处理,无返回值
- runAfterBoth();两个任务都执行完成后,执行下一步操作
or聚合关系
applyToEither():两个任务哪个执行的快,就使用哪一个结果,有返回值acceptEither():两个任务哪个执行的快,就消费哪一个结果,无返回值runAfterEither():任意一个任务执行完成,进行下一步操作(Runnable类型任务)
并行执行
allOf():当所有给定的 CompletableFuture 完成时,返回一个新的 CompletableFutureanyOf():当任何一个给定的CompletablFuture完成时,返回一个新的CompletableFuture
结果处理
- whenComplete:当任务完成时,将使用结果(或 null)和此阶段的异常(或 null如果没有)执行给定操作
- exceptionally:返回一个新的CompletableFuture,当前面的CompletableFuture完成时,它也完成,当它异常完成时,给定函数的异常触发这个CompletableFuture的完成
异步操作
CompletableFuture提供了四个静态方法创建一个异步操作
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
四个方法的区别
- runAsync()以Runnable函数接口类型为参数,没有返回结果,supplyAsync()以Supplier函数式接口类型未参数,返回结果类型未U;Supplier接口的get()是由返回值会阻塞
- 使用没有指定Executor的方法时,内部使用Fork Join Pool。commonPool()作为他的线程池执行异步代码。如果没有指定线程池,则使用指定的线程池运行
- 默认情况下CompletableFuture会使用公共的ForkJoinPool线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置ForkJoinPool线程池的线程数)。如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰
public static void main(String[] args) throws ExecutionException, InterruptedException {
Runnable runnable = ()-> System.out.println("无返回结果任务");
CompletableFuture.runAsync(runnable);
CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
System.out.println("有返回值的异步任务");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello world";
});
String result = future.get();
System.out.println();
}
获取结果(join&get)
join()和get()方法都是用来获取CompletableFuture异步之后的返回值,join()方法抛出的是uncheck异常
,不会强制开发者抛出。get()方法抛出的是经过检查的异常,ExecutionException, InterruptedException 需要用户手动处理(抛出或者 try catch)
结果处理
当CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的 Action。主要是下面的方法:
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
- Action的类型是BiConsumer<? super T,? super Throwable>,它可以处理正常的计算结果,或者异常情况。
- 方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。
- 这几个方法都会返回CompletableFuture,当Action执行完毕后它的结果返回原始的CompletableFuture的计算结果或者返回异常
测试
package ThreadPoolExecultorTest;
import java.util.*;
import java.util.concurrent.*;
/**
* @Author lijiaxin
* @Description TODO
* @Date 2022/07/05/9:23
* @Version 1.0
*/
public class CompletableFuture1Test {
public static void main(String[] args) {
for (int i = 0; i < 4 ; i++) {
/*开启主线程*/
int finalI = i;
CompletableFuture.runAsync(()->{
final CompletableFuture<List> listmates = CompletableFuture.supplyAsync(()->{
List list = null;
if (finalI == 1){
list = test1();
}else if(finalI == 2){
list = test2();
}else if(finalI == 3){
list = test3();
}else {
list = test4();
}
return list;
},threadPoolExecutor());
listmates.thenAcceptAsync(list->{
linkedLists((LinkedList) list);
},threadPoolExecutor());
},threadPoolExecutor());
}
}
public static ThreadPoolExecutor threadPoolExecutor() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 1,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(20),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()); //任务不能丢弃-同步执行
return executor;
}
public static List<String> test1(){
System.out.println("1");
List<String> list = new LinkedList<>();
list.add("1");
return list;
}
public static List<String> test2(){
System.out.println("2");
List<String> list = new LinkedList<>();
list.add("2");
return list;
}
public static List<String> test4(){
System.out.println("4");
List<String> list = new LinkedList<>();
list.add("4");
return list;
}
public static List<String> test3(){
System.out.println("3");
List<String> list = new LinkedList<>();
list.add("3");
return list;
}
public static LinkedList<LinkedList<String>> linkedLists(LinkedList linkedList){
System.out.println(linkedList);
LinkedList linkedList1 = null;
linkedList1.add(linkedList);
for (Object list:linkedList1) {
System.out.println(list.toString());
}
return linkedList1;
}
}
边栏推荐
- Matlab tips (30) nonlinear fitting lsqcurefit
- 二十岁的我4面拿到字节跳动offer,至今不敢相信
- Jmeter 5.5版本发布说明
- JWT的基础介绍
- .net core 访问不常见的静态文件类型(MIME 类型)
- 根据IP获取地市
- MySQL卸载文档-Windows版
- 学术报告系列(六) - Autonomous Driving on the journey to full autonomy
- Answer to the first stage of the assignment of "information security management and evaluation" of the higher vocational group of the 2018 Jiangsu Vocational College skills competition
- Install mongodb database
猜你喜欢
随机推荐
一条慢SQL拖死整个系统
途家、木鸟、美团……民宿暑期战事将起
化工园区危化品企业安全风险智能化管控平台建设四大目标
How can clothing stores make profits?
循环肿瘤细胞——Abnova 解决方案来啦
网络基础 —— 报头、封装和解包
场馆怎么做体育培训?
关于数据库数据转移的问题,求各位解答下
MATLAB小技巧(30)非线性拟合 lsqcurefit
请教一个问题,flink oracle cdc,读取一个没有更新操作的表,隔十几秒就重复读取全量数据
Anr principle and Practice
Get the city according to IP
Etcd database source code analysis -- starting from the start function of raftnode
AddressSanitizer 技术初体验
Abnova循环肿瘤DNA丨全血分离,基因组DNA萃取分析
Cloudcompare point pair selection
Redhat5 installing vmware tools under virtual machine
[GNN] graphic gnn:a gender Introduction (including video)
算法---比特位计数(Kotlin)
Abnova 体外转录 mRNA工作流程和加帽方法介绍









