当前位置:网站首页>CompletableFuture使用详解
CompletableFuture使用详解
2022-07-07 03:00:00 【鬼罚olo】
CompletableFuture使用详解
Future的局限性,他没法直接对多个任务进行链式组合等处理,需要借助并发工具类才能完成,实现逻辑比较复杂
而CompletableFutures是对Future的扩展和增强,CompltableFuture实现Future接口,并在此基础上进行了丰富的扩展,完美弥补了Future的局限性,同时CompletableFuture实现了对任务的编排的能力。借助这项能力,可以轻松的组织不同任务的运行顺序,规则以及方式,从某种程序上说,这项能力时他的核心能力。而在以往,虽然通过CountDownlatch等工具类也可以实现任务的编排,但需要复杂的逻辑处理
ComplatableFuture的基础结构如下

CompletionStage接口定义了任务编排的方法,执行某一阶段,可以向下执行后续阶段。异步执行的
默认线程池是**ForkJoinPool.commonPool(),但为了业务之间互不影响,且便于定位问题,强烈推荐使用自定义线程池**。
CompletableFuture中默认线程池如下:
// 根据commonPool的并行度来选择,而并行度的计算是在ForkJoinPool的静态代码段完成的
private static final boolean useCommonPool =
(ForkJoinPool.getCommonPoolParallelism() > 1);
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
ForkJoinPool中初始化commonPool的参数
static {
// initialize field offsets for CAS etc
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ForkJoinPool.class;
CTL = U.objectFieldOffset
(k.getDeclaredField("ctl"));
RUNSTATE = U.objectFieldOffset
(k.getDeclaredField("runState"));
STEALCOUNTER = U.objectFieldOffset
(k.getDeclaredField("stealCounter"));
Class<?> tk = Thread.class;
……
} catch (Exception e) {
throw new Error(e);
}
commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
defaultForkJoinWorkerThreadFactory =
new DefaultForkJoinWorkerThreadFactory();
modifyThreadPermission = new RuntimePermission("modifyThread");
// 调用makeCommonPool方法创建commonPool,其中并行度为逻辑核数-1
common = java.security.AccessController.doPrivileged
(new java.security.PrivilegedAction<ForkJoinPool>() {
public ForkJoinPool run() { return makeCommonPool(); }});
int par = common.config & SMASK; // report 1 even if threads disabled
commonParallelism = par > 0 ? par : 1;
}
功能
常用方法
依赖关系
- thenApply():把前面任务的执行结果,交给后面的Function
- thenCompose():用来连接两个依赖关系的任务,结果由第二个任务返回
and集合关系
- thenCombine():合并任务,有返回值、
- thenAcceptBoth():两个热播我要执行完成后,将结果交给thenAcceptBotj处理,无返回值
- runAfterBoth();两个任务都执行完成后,执行下一步操作
or聚合关系
applyToEither():两个任务哪个执行的快,就使用哪一个结果,有返回值acceptEither():两个任务哪个执行的快,就消费哪一个结果,无返回值runAfterEither():任意一个任务执行完成,进行下一步操作(Runnable类型任务)
并行执行
allOf():当所有给定的 CompletableFuture 完成时,返回一个新的 CompletableFutureanyOf():当任何一个给定的CompletablFuture完成时,返回一个新的CompletableFuture
结果处理
- whenComplete:当任务完成时,将使用结果(或 null)和此阶段的异常(或 null如果没有)执行给定操作
- exceptionally:返回一个新的CompletableFuture,当前面的CompletableFuture完成时,它也完成,当它异常完成时,给定函数的异常触发这个CompletableFuture的完成
异步操作
CompletableFuture提供了四个静态方法创建一个异步操作
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
四个方法的区别
- runAsync()以Runnable函数接口类型为参数,没有返回结果,supplyAsync()以Supplier函数式接口类型未参数,返回结果类型未U;Supplier接口的get()是由返回值会阻塞
- 使用没有指定Executor的方法时,内部使用Fork Join Pool。commonPool()作为他的线程池执行异步代码。如果没有指定线程池,则使用指定的线程池运行
- 默认情况下CompletableFuture会使用公共的ForkJoinPool线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置ForkJoinPool线程池的线程数)。如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰
public static void main(String[] args) throws ExecutionException, InterruptedException {
Runnable runnable = ()-> System.out.println("无返回结果任务");
CompletableFuture.runAsync(runnable);
CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
System.out.println("有返回值的异步任务");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello world";
});
String result = future.get();
System.out.println();
}
获取结果(join&get)
join()和get()方法都是用来获取CompletableFuture异步之后的返回值,join()方法抛出的是uncheck异常
,不会强制开发者抛出。get()方法抛出的是经过检查的异常,ExecutionException, InterruptedException 需要用户手动处理(抛出或者 try catch)
结果处理
当CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的 Action。主要是下面的方法:
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
- Action的类型是BiConsumer<? super T,? super Throwable>,它可以处理正常的计算结果,或者异常情况。
- 方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。
- 这几个方法都会返回CompletableFuture,当Action执行完毕后它的结果返回原始的CompletableFuture的计算结果或者返回异常
测试
package ThreadPoolExecultorTest;
import java.util.*;
import java.util.concurrent.*;
/**
* @Author lijiaxin
* @Description TODO
* @Date 2022/07/05/9:23
* @Version 1.0
*/
public class CompletableFuture1Test {
public static void main(String[] args) {
for (int i = 0; i < 4 ; i++) {
/*开启主线程*/
int finalI = i;
CompletableFuture.runAsync(()->{
final CompletableFuture<List> listmates = CompletableFuture.supplyAsync(()->{
List list = null;
if (finalI == 1){
list = test1();
}else if(finalI == 2){
list = test2();
}else if(finalI == 3){
list = test3();
}else {
list = test4();
}
return list;
},threadPoolExecutor());
listmates.thenAcceptAsync(list->{
linkedLists((LinkedList) list);
},threadPoolExecutor());
},threadPoolExecutor());
}
}
public static ThreadPoolExecutor threadPoolExecutor() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 1,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(20),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()); //任务不能丢弃-同步执行
return executor;
}
public static List<String> test1(){
System.out.println("1");
List<String> list = new LinkedList<>();
list.add("1");
return list;
}
public static List<String> test2(){
System.out.println("2");
List<String> list = new LinkedList<>();
list.add("2");
return list;
}
public static List<String> test4(){
System.out.println("4");
List<String> list = new LinkedList<>();
list.add("4");
return list;
}
public static List<String> test3(){
System.out.println("3");
List<String> list = new LinkedList<>();
list.add("3");
return list;
}
public static LinkedList<LinkedList<String>> linkedLists(LinkedList linkedList){
System.out.println(linkedList);
LinkedList linkedList1 = null;
linkedList1.add(linkedList);
for (Object list:linkedList1) {
System.out.println(list.toString());
}
return linkedList1;
}
}
边栏推荐
- 7天零基础能考证HCIA吗?华为认证系统学习路线分享
- Brand · consultation standardization
- Data of all class a scenic spots in China in 2022 (13604)
- 【mysqld】Can't create/write to file
- 2018年江苏省职业院校技能大赛高职组“信息安全管理与评估”赛项任务书第二阶段答案
- MySQL SQL的完整处理流程
- FPGA课程:JESD204B的应用场景(干货分享)
- 算法---比特位计数(Kotlin)
- 请问 flinksql对接cdc时 如何实现计算某个字段update前后的差异 ?
- 2018年江苏省职业院校技能大赛高职组“信息安全管理与评估”赛项任务书第一阶段答案
猜你喜欢

Installing redis and windows extension method under win system

Abnova 免疫组化服务解决方案

Answer to the first stage of the assignment of "information security management and evaluation" of the higher vocational group of the 2018 Jiangsu Vocational College skills competition

Learning notes | data Xiaobai uses dataease to make a large data screen

网络基础 —— 报头、封装和解包

2018年江苏省职业院校技能大赛高职组“信息安全管理与评估”赛项任务书第二阶段答案

Basic introduction of JWT

Redhat5 installing vmware tools under virtual machine

Comment les entreprises gèrent - elles les données? Partager les leçons tirées des quatre aspects de la gouvernance des données

MySQL的安装
随机推荐
MySQL SQL的完整处理流程
Brand · consultation standardization
Stack and queue-p79-10 [2014 unified examination real question]
Stack and queue-p79-9
POI export to excel: set font, color, row height adaptation, column width adaptation, lock cells, merge cells
带你刷(牛客网)C语言百题(第一天)
偏执的非合格公司
MySQL的安装
Jetpack Compose 远不止是一个UI框架这么简单~
MySQL卸载文档-Windows版
Etcd database source code analysis -- starting from the start function of raftnode
DB2获取表信息异常:Caused by: com.ibm.db2.jcc.am.SqlException: [jcc][t4][1065][12306][4.25.13]
大咖云集|NextArch基金会云开发Meetup来啦
[solution] final app status- undefined, exitcode- 16
leetcode 509. Fibonacci Number(斐波那契数字)
请问如何查一篇外文文献的DOI号?
MOS tube parameters μ A method of Cox
毕业设计游戏商城
Postgresql源码(59)分析事务ID分配、溢出判断方法
基于JS的迷宫小游戏