当前位置:网站首页>Use of completable future
Use of completable future
2022-07-07 06:59:00 【Ghost punishment OLO】
CompletableFuture The use of,
Future The limitations of , He can't directly combine multiple tasks in a chain , It needs the help of concurrent tool classes to complete , Complex implementation logic
and CompletableFutures It's right Future Expansion and enhancement of ,CompltableFuture Realization Future Interface , On this basis, it has carried out rich expansion , It makes up for Future The limitations of , meanwhile CompletableFuture Realize the ability of arranging tasks . With this ability , It can easily organize the running sequence of different tasks , Rules and methods , In a way , This ability is his core ability . And in the past , Though through CountDownlatch And other tool classes can also realize the arrangement of tasks , But it requires complex logical processing
ComplatableFuture The infrastructure of is as follows
CompletionStage Interface defines the method of task arrangement , Perform a certain stage , You can go down to the next stage . Executed asynchronously
The default thread pool is **ForkJoinPool.commonPool()
, But in order not to affect each other , And easy to locate the problem , Custom thread pools are highly recommended **.
CompletableFuture
The default Thread pool as follows :
// according to commonPool To choose the degree of parallelism , The calculation of parallelism is in ForkJoinPool Static code snippets complete
private static final boolean useCommonPool =
(ForkJoinPool.getCommonPoolParallelism() > 1);
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
ForkJoinPool In the initialization commonPool Parameters of
static {
// initialize field offsets for CAS etc
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ForkJoinPool.class;
CTL = U.objectFieldOffset
(k.getDeclaredField("ctl"));
RUNSTATE = U.objectFieldOffset
(k.getDeclaredField("runState"));
STEALCOUNTER = U.objectFieldOffset
(k.getDeclaredField("stealCounter"));
Class<?> tk = Thread.class;
……
} catch (Exception e) {
throw new Error(e);
}
commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
defaultForkJoinWorkerThreadFactory =
new DefaultForkJoinWorkerThreadFactory();
modifyThreadPermission = new RuntimePermission("modifyThread");
// call makeCommonPool Method creation commonPool, The parallelism is the number of logical cores -1
common = java.security.AccessController.doPrivileged
(new java.security.PrivilegedAction<ForkJoinPool>() {
public ForkJoinPool run() { return makeCommonPool(); }});
int par = common.config & SMASK; // report 1 even if threads disabled
commonParallelism = par > 0 ? par : 1;
}
function
Common methods
Dependency relationship
- thenApply(): Put the execution results of the previous tasks , Give it to the back Function
- thenCompose(): Task used to connect two dependencies , The result is returned by the second task
and A collection of relations
- thenCombine(): Merge tasks , There is a return value 、
- thenAcceptBoth(): I want to finish the two hot broadcasts , Give the results to thenAcceptBotj Handle , No return value
- runAfterBoth(); When both tasks are completed , Take the next step
or Aggregate relationship
applyToEither()
: Which of the two tasks performs faster , Just use which result , There is a return valueacceptEither()
: Which of the two tasks performs faster , Just the result of consumption , No return valuerunAfterEither()
: Any task is completed , Proceed to the next step (Runnable
Types of tasks )
Parallel execution
allOf()
: When all given CompletableFuture When finished , Back to a new CompletableFutureanyOf()
: When any givenCompletablFuture
When finished , Back to a new CompletableFuture
The result processing
- whenComplete: When the task is completed , The results will be used ( or null) And exceptions at this stage ( or null without ) Perform the given operation
- exceptionally: Back to a new CompletableFuture, When the front CompletableFuture When finished , It's also done , When it is completed abnormally , The exception of a given function triggers this CompletableFuture Completion
Asynchronous operations
CompletableFuture Four static methods are provided to create an asynchronous operation
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
The difference between the four methods
- runAsync() With Runnable The function interface type is parameter , No results returned ,supplyAsync() With Supplier Functional interface type has no parameters , The return result type is not U;Supplier Interface get() Is blocked by the return value
- Use unspecified Executor Method time , For internal use Fork Join Pool.commonPool() Execute asynchronous code as his thread pool . If no thread pool is specified , Use the specified thread pool to run
- By default CompletableFuture Will use public ForkJoinPool Thread pool , The default number of threads created by this thread pool is CPU The number of nuclear ( It can also be done through JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism To set up ForkJoinPool Number of threads in the thread pool ). If all CompletableFuture Share a thread pool , So once there's a task to perform some very slow I/O operation , Will cause all threads in the thread pool to block in I/O Operationally , This causes thread starvation , And then affect the performance of the whole system . therefore , It is strongly recommended that you create different thread pools according to different business types , To avoid interfering with each other
public static void main(String[] args) throws ExecutionException, InterruptedException {
Runnable runnable = ()-> System.out.println(" No return result task ");
CompletableFuture.runAsync(runnable);
CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
System.out.println(" Asynchronous tasks with return values ");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello world";
});
String result = future.get();
System.out.println();
}
To get the results (join&get)
join() and get() Methods are used to get CompletableFuture Return value after asynchrony ,join() Method throws uncheck abnormal
, Developers will not be forced to throw .get() Method throws a checked exception ,ExecutionException, InterruptedException It needs to be handled manually by the user ( Throw or try catch)
The result processing
When CompletableFuture The result of calculation is finished , Or throw an exception , We can execute specific Action. Mainly the following methods :
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
- Action The type is BiConsumer<? super T,? super Throwable>, It can handle normal calculations , Or something unusual .
- The method is different Async ending , signify Action Execute with the same thread , and Async Other threads may be used to execute ( If you use the same thread pool , It may also be executed by the same thread ).
- These methods will return CompletableFuture, When Action After execution, its results return to the original CompletableFuture Or return an exception
test
package ThreadPoolExecultorTest;
import java.util.*;
import java.util.concurrent.*;
/**
* @Author lijiaxin
* @Description TODO
* @Date 2022/07/05/9:23
* @Version 1.0
*/
public class CompletableFuture1Test {
public static void main(String[] args) {
for (int i = 0; i < 4 ; i++) {
/* Start the main thread */
int finalI = i;
CompletableFuture.runAsync(()->{
final CompletableFuture<List> listmates = CompletableFuture.supplyAsync(()->{
List list = null;
if (finalI == 1){
list = test1();
}else if(finalI == 2){
list = test2();
}else if(finalI == 3){
list = test3();
}else {
list = test4();
}
return list;
},threadPoolExecutor());
listmates.thenAcceptAsync(list->{
linkedLists((LinkedList) list);
},threadPoolExecutor());
},threadPoolExecutor());
}
}
public static ThreadPoolExecutor threadPoolExecutor() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 1,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(20),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()); // Tasks cannot be discarded - Synchronous execution
return executor;
}
public static List<String> test1(){
System.out.println("1");
List<String> list = new LinkedList<>();
list.add("1");
return list;
}
public static List<String> test2(){
System.out.println("2");
List<String> list = new LinkedList<>();
list.add("2");
return list;
}
public static List<String> test4(){
System.out.println("4");
List<String> list = new LinkedList<>();
list.add("4");
return list;
}
public static List<String> test3(){
System.out.println("3");
List<String> list = new LinkedList<>();
list.add("3");
return list;
}
public static LinkedList<LinkedList<String>> linkedLists(LinkedList linkedList){
System.out.println(linkedList);
LinkedList linkedList1 = null;
linkedList1.add(linkedList);
for (Object list:linkedList1) {
System.out.println(list.toString());
}
return linkedList1;
}
}
边栏推荐
- 循环肿瘤细胞——Abnova 解决方案来啦
- Stack and queue-p79-9
- Get the city according to IP
- Distributed ID solution
- 毕业设计游戏商城
- ViewModelProvider.of 过时方法解决
- Abnova循环肿瘤DNA丨全血分离,基因组DNA萃取分析
- How to install swoole under window
- Kotlin之 Databinding 异常
- Answer to the first stage of the assignment of "information security management and evaluation" of the higher vocational group of the 2018 Jiangsu Vocational College skills competition
猜你喜欢
Bus message bus
LM11丨重构K线构建择时交易策略
The latest trends of data asset management and data security at home and abroad
Comment les entreprises gèrent - elles les données? Partager les leçons tirées des quatre aspects de la gouvernance des données
【NOI模拟赛】区域划分(结论,构造)
.net 5 FluentFTP连接FTP失败问题:This operation is only allowed using a successfully authenticated context
What books can greatly improve programming ideas and abilities?
Config distributed configuration center
Pinduoduo lost the lawsuit: "bargain for free" infringed the right to know but did not constitute fraud, and was sentenced to pay 400 yuan
MOS管参数μCox得到的一种方法
随机推荐
Algorithm --- bit count (kotlin)
关于数据库数据转移的问题,求各位解答下
联合索引ABC的几种索引利用情况
Jmeter 5.5版本发布说明
循环肿瘤细胞——Abnova 解决方案来啦
Bus消息总线
Config分布式配置中心
libcurl返回curlcode说明
MYSQL binlog相关命令
MOS tube parameters μ A method of Cox
企业如何进行数据治理?分享数据治理4个方面的经验总结
The startup of MySQL installed in RPM mode of Linux system failed
ESXI挂载移动(机械)硬盘详细教程
LC 面试题 02.07. 链表相交 & LC142. 环形链表II
反射(二)
How to install swoole under window
mobx 知识点集合案例(快速入门)
工具类:对象转map 驼峰转下划线 下划线转驼峰
LM11丨重构K线构建择时交易策略
二十岁的我4面拿到字节跳动offer,至今不敢相信