当前位置:网站首页>Use of completable future
Use of completable future
2022-07-07 06:59:00 【Ghost punishment OLO】
CompletableFuture The use of,
Future The limitations of , He can't directly combine multiple tasks in a chain , It needs the help of concurrent tool classes to complete , Complex implementation logic
and CompletableFutures It's right Future Expansion and enhancement of ,CompltableFuture Realization Future Interface , On this basis, it has carried out rich expansion , It makes up for Future The limitations of , meanwhile CompletableFuture Realize the ability of arranging tasks . With this ability , It can easily organize the running sequence of different tasks , Rules and methods , In a way , This ability is his core ability . And in the past , Though through CountDownlatch And other tool classes can also realize the arrangement of tasks , But it requires complex logical processing
ComplatableFuture The infrastructure of is as follows
CompletionStage Interface defines the method of task arrangement , Perform a certain stage , You can go down to the next stage . Executed asynchronously
The default thread pool is **ForkJoinPool.commonPool()
, But in order not to affect each other , And easy to locate the problem , Custom thread pools are highly recommended **.
CompletableFuture
The default Thread pool as follows :
// according to commonPool To choose the degree of parallelism , The calculation of parallelism is in ForkJoinPool Static code snippets complete
private static final boolean useCommonPool =
(ForkJoinPool.getCommonPoolParallelism() > 1);
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
ForkJoinPool In the initialization commonPool Parameters of
static {
// initialize field offsets for CAS etc
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ForkJoinPool.class;
CTL = U.objectFieldOffset
(k.getDeclaredField("ctl"));
RUNSTATE = U.objectFieldOffset
(k.getDeclaredField("runState"));
STEALCOUNTER = U.objectFieldOffset
(k.getDeclaredField("stealCounter"));
Class<?> tk = Thread.class;
……
} catch (Exception e) {
throw new Error(e);
}
commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
defaultForkJoinWorkerThreadFactory =
new DefaultForkJoinWorkerThreadFactory();
modifyThreadPermission = new RuntimePermission("modifyThread");
// call makeCommonPool Method creation commonPool, The parallelism is the number of logical cores -1
common = java.security.AccessController.doPrivileged
(new java.security.PrivilegedAction<ForkJoinPool>() {
public ForkJoinPool run() { return makeCommonPool(); }});
int par = common.config & SMASK; // report 1 even if threads disabled
commonParallelism = par > 0 ? par : 1;
}
function
Common methods
Dependency relationship
- thenApply(): Put the execution results of the previous tasks , Give it to the back Function
- thenCompose(): Task used to connect two dependencies , The result is returned by the second task
and A collection of relations
- thenCombine(): Merge tasks , There is a return value 、
- thenAcceptBoth(): I want to finish the two hot broadcasts , Give the results to thenAcceptBotj Handle , No return value
- runAfterBoth(); When both tasks are completed , Take the next step
or Aggregate relationship
applyToEither()
: Which of the two tasks performs faster , Just use which result , There is a return valueacceptEither()
: Which of the two tasks performs faster , Just the result of consumption , No return valuerunAfterEither()
: Any task is completed , Proceed to the next step (Runnable
Types of tasks )
Parallel execution
allOf()
: When all given CompletableFuture When finished , Back to a new CompletableFutureanyOf()
: When any givenCompletablFuture
When finished , Back to a new CompletableFuture
The result processing
- whenComplete: When the task is completed , The results will be used ( or null) And exceptions at this stage ( or null without ) Perform the given operation
- exceptionally: Back to a new CompletableFuture, When the front CompletableFuture When finished , It's also done , When it is completed abnormally , The exception of a given function triggers this CompletableFuture Completion
Asynchronous operations
CompletableFuture Four static methods are provided to create an asynchronous operation
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
The difference between the four methods
- runAsync() With Runnable The function interface type is parameter , No results returned ,supplyAsync() With Supplier Functional interface type has no parameters , The return result type is not U;Supplier Interface get() Is blocked by the return value
- Use unspecified Executor Method time , For internal use Fork Join Pool.commonPool() Execute asynchronous code as his thread pool . If no thread pool is specified , Use the specified thread pool to run
- By default CompletableFuture Will use public ForkJoinPool Thread pool , The default number of threads created by this thread pool is CPU The number of nuclear ( It can also be done through JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism To set up ForkJoinPool Number of threads in the thread pool ). If all CompletableFuture Share a thread pool , So once there's a task to perform some very slow I/O operation , Will cause all threads in the thread pool to block in I/O Operationally , This causes thread starvation , And then affect the performance of the whole system . therefore , It is strongly recommended that you create different thread pools according to different business types , To avoid interfering with each other
public static void main(String[] args) throws ExecutionException, InterruptedException {
Runnable runnable = ()-> System.out.println(" No return result task ");
CompletableFuture.runAsync(runnable);
CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
System.out.println(" Asynchronous tasks with return values ");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello world";
});
String result = future.get();
System.out.println();
}
To get the results (join&get)
join() and get() Methods are used to get CompletableFuture Return value after asynchrony ,join() Method throws uncheck abnormal
, Developers will not be forced to throw .get() Method throws a checked exception ,ExecutionException, InterruptedException It needs to be handled manually by the user ( Throw or try catch)
The result processing
When CompletableFuture The result of calculation is finished , Or throw an exception , We can execute specific Action. Mainly the following methods :
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
- Action The type is BiConsumer<? super T,? super Throwable>, It can handle normal calculations , Or something unusual .
- The method is different Async ending , signify Action Execute with the same thread , and Async Other threads may be used to execute ( If you use the same thread pool , It may also be executed by the same thread ).
- These methods will return CompletableFuture, When Action After execution, its results return to the original CompletableFuture Or return an exception
test
package ThreadPoolExecultorTest;
import java.util.*;
import java.util.concurrent.*;
/**
* @Author lijiaxin
* @Description TODO
* @Date 2022/07/05/9:23
* @Version 1.0
*/
public class CompletableFuture1Test {
public static void main(String[] args) {
for (int i = 0; i < 4 ; i++) {
/* Start the main thread */
int finalI = i;
CompletableFuture.runAsync(()->{
final CompletableFuture<List> listmates = CompletableFuture.supplyAsync(()->{
List list = null;
if (finalI == 1){
list = test1();
}else if(finalI == 2){
list = test2();
}else if(finalI == 3){
list = test3();
}else {
list = test4();
}
return list;
},threadPoolExecutor());
listmates.thenAcceptAsync(list->{
linkedLists((LinkedList) list);
},threadPoolExecutor());
},threadPoolExecutor());
}
}
public static ThreadPoolExecutor threadPoolExecutor() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 1,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(20),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()); // Tasks cannot be discarded - Synchronous execution
return executor;
}
public static List<String> test1(){
System.out.println("1");
List<String> list = new LinkedList<>();
list.add("1");
return list;
}
public static List<String> test2(){
System.out.println("2");
List<String> list = new LinkedList<>();
list.add("2");
return list;
}
public static List<String> test4(){
System.out.println("4");
List<String> list = new LinkedList<>();
list.add("4");
return list;
}
public static List<String> test3(){
System.out.println("3");
List<String> list = new LinkedList<>();
list.add("3");
return list;
}
public static LinkedList<LinkedList<String>> linkedLists(LinkedList linkedList){
System.out.println(linkedList);
LinkedList linkedList1 = null;
linkedList1.add(linkedList);
for (Object list:linkedList1) {
System.out.println(list.toString());
}
return linkedList1;
}
}
边栏推荐
- 偏执的非合格公司
- JDBC database connection pool usage problem
- 2018年江苏省职业院校技能大赛高职组“信息安全管理与评估”赛项任务书第一阶段答案
- What books can greatly improve programming ideas and abilities?
- The startup of MySQL installed in RPM mode of Linux system failed
- ANR 原理及实践
- Postgresql中procedure支持事务语法(实例&分析)
- Navicat importing 15g data reports an error [2013 - lost connection to MySQL server during query] [1153: got a packet bigger]
- MOS管参数μCox得到的一种方法
- 化工园区危化品企业安全风险智能化管控平台建设四大目标
猜你喜欢
SolidWorks的GB库(钢型材库,包括铝型材、铝管等结构)安装及使用教程(生成铝型材为例)
JESD204B时钟网络
LC 面试题 02.07. 链表相交 & LC142. 环形链表II
Bus消息总线
MySQL view bin log and recover data
Abnova 体外转录 mRNA工作流程和加帽方法介绍
This article introduces you to the characteristics, purposes and basic function examples of static routing
How can gyms improve their competitiveness?
Maze games based on JS
Cloudcompare point pair selection
随机推荐
How can gyms improve their competitiveness?
Can't you really do it when you are 35 years old?
Comment les entreprises gèrent - elles les données? Partager les leçons tirées des quatre aspects de la gouvernance des données
impdp的transform参数的测试
What books can greatly improve programming ideas and abilities?
Anr principle and Practice
FPGA课程:JESD204B的应用场景(干货分享)
多个kubernetes集群如何实现共享同一个存储
请教一下,监听pgsql ,怎样可以监听多个schema和table
学术报告系列(六) - Autonomous Driving on the journey to full autonomy
MySQL (x)
Please tell me how to monitor multiple schemas and tables by listening to PgSQL
一文带你了解静态路由的特点、目的及配置基本功能示例
途家、木鸟、美团……民宿暑期战事将起
How to share the same storage among multiple kubernetes clusters
From zero to one, I will teach you to build the "clip search by text" search service (2): 5 minutes to realize the prototype
Answer to the first stage of the assignment of "information security management and evaluation" of the higher vocational group of the 2018 Jiangsu Vocational College skills competition
企業如何進行數據治理?分享數據治理4個方面的經驗總結
【luogu P1971】兔兔与蛋蛋游戏(二分图博弈)
MySql用户权限