当前位置:网站首页>Completabilefuture asynchronous task choreography usage and explanation
Completabilefuture asynchronous task choreography usage and explanation
2022-06-11 05:53:00 【Endwas】
At work , Multiple services or methods are often called to obtain different data , If the traditional method is to acquire one by one in series , Then encapsulate and return to . We can try it CompletableFuture, Delegate multiple operations to asynchronous threads for execution , Then the main thread waits for the longest task to complete , Return all results together .
Future limitations
When we get a result containing Future when , We can use get Method waits for the thread to complete and gets the return value , But we all know future.get() yes Blocking Methods , It will wait until the thread finishes executing to get the return value . We can see FutureTask Medium get Method , It is to loop the code until the thread executes and returns .
/** * Awaits completion or aborts on interrupt or timeout. * * @param timed true if use timed waits * @param nanos time to wait, if timed * @return state upon completion */
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
// loop Omit code
...
}
}
Let's consider a scenario , If we finish the asynchronous task 1 You need to get the return value , Then use the return value to make other asynchronous calls 2, Then we need to wait for asynchronous tasks when the main thread is blocked 1 complete , Then the asynchronous task is executed by 2, Then continue to block the waiting task 2 Return . This is not only Block main thread , and Poor performance .
CompletableFuture Introduce
What is? CompletableFuture:CompletableFuture Combined with the Future The advantages of , Provides a very powerful Future Extension of , Can help us simplify the complexity of asynchronous programming , Provides Functional programming The ability of , The calculation result can be processed by callback , And provides transformations and combinations CompletableFuture Methods .
Functional programming : Use Functional Interface As a parameter , You can use lambda The expression suggests implementing , Convenient programming , I've explained it before .
Common auxiliary methods
- isCompletedExceptionally: The CompletableFuture Whether it ends abnormally .
// Including canceling cancel、 Display call completeExceptionally、 interrupt .
public boolean isCompletedExceptionally() {
Object r;
return ((r = result) instanceof AltResult) && r != NIL;
}
- isCancelled: The CompletableFuture Whether it is cancelled before the normal execution is completed .
public boolean isCancelled() {
Object r;
// Determine whether the exception is CancellationException
return ((r = result) instanceof AltResult) &&
(((AltResult)r).ex instanceof CancellationException);
}
- isDone: The CompletableFuture Whether the execution has ended, including exception generation and cancellation .
public boolean isDone() {
return result != null;
}
- get: Blocking access CompletableFuture result
public T get() throws InterruptedException, ExecutionException {
Object r;
return reportGet((r = result) == null ? waitingGet(true) : r);
}
- join: Blocking access CompletableFuture result
public T join() {
Object r;
return reportJoin((r = result) == null ? waitingGet(false) : r);
}
join() And get() The difference lies in join() Returns the result of the calculation or throws a unchecked abnormal (CompletionException), and get() Returns a specific exception .
CompletableFuture structure
CompletableFuture Construction method with or without parameters , What is created at this time is incomplete CompletableFuture. Use get Will always block the main thread .
So we usually use static methods to create instances .
// No input parameter has a return value
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
// No input parameter, no return value , Simple execution
public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
We noticed that each method has a refactoring method .Executor Parameters can be manually specified thread pool , Otherwise the default ForkJoinPool.commonPool() System level common thread pool .
【 Be careful 】: default commonPool These threads are daemon threads . We need to use guard threads carefully when programming , If we set our ordinary user thread as a daemon thread , When the main thread of our program ends ,JVM There are no other user threads in the , that CompletableFuture The daemon thread will exit directly , Problems that make the task impossible !!
CompletableFuture Commonly used Api
We will explain later api There are generally three similar approaches . I'll just demonstrate the third .
- xxx(method);
- xxxAsync(method);
- xxxAsync(method, executor)
The difference between the three is that the first synchronous execution is performed by the main thread , The second type of asynchronous handover is the default thread pool , The third type of asynchronous handover creates a thread pool .
1. structure CompletableFuture example
Now we will CompletableFuture To try to use . Start with the basic build .
/** * 1.run + runAsync + supply + supplyAsync */
/** 1. Asynchronous running is left to the default thread pool (forkjoinpool) No input parameter, no return value run * 2. Asynchronous running is left to the creation of thread pools (threadPoolExecutor) No input parameter, no return value run */
CompletableFuture<Void> run = CompletableFuture.runAsync(() ->
System.out.println("completablefuture runs asynchronously"));
CompletableFuture<Void> runCustomize = CompletableFuture.runAsync(() ->
System.out.println("completablefuture runs asynchronously in customize threadPool"
), threadPoolExecutor);
//================ Here is supply===================
/** 1. Asynchronous running is left to the default thread pool (forkjoinpool) No input parameter has a return value supply * 2. Asynchronously run the thread pool created by yourself No input parameter has a return value supply */
CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
System.out.println("completablefuture supplys asynchronously");
return "success";
});
CompletableFuture<String> supplyCustomize = CompletableFuture.supplyAsync(() -> {
System.out.println("completablefuture supplys asynchronously in customize threadPool");
return "success";
}, threadPoolExecutor);
result
completablefuture runs asynchronously
completablefuture runs asynchronously in customize threadPool
completablefuture supplys asynchronously
completablefuture supplys asynchronously in customize threadPool
The first two CompletableFuture It's not worth it , So when the subsequent chain calls want to use, the input parameters are null Of .
2.whenComplete
Consider when we are CompletableFuture At the end of the execution , Hope to get the implementation results 、 Or abnormal , Then do further processing for the result or exception . Then we need to use whenComplete.
/** * Participation is BiConsumer, The first parameter is the result of the previous step 、 The second is the exception executed in the previous step */
CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
int a = 1/0;
System.out.println("completablefuture supplys asynchronously");
return "success";
});
CompletableFuture<String> complete = supply.whenCompleteAsync((result, throwable) -> {
System.out.println("whenComplete: " + result + " throws " + throwable);
});
result:
whenComplete: null throws java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
3.handle
handle and whenComplete The input parameters are the same , But it can return after execution Execution results , and whenComplete Can only process and cannot return .
public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)
example:
/** * Participation is BiFunction The first parameter is the result of the previous step 、 The second is the exception executed in the previous step * The return value can be of any type */
CompletableFuture<String> handleAsyncCustomize = supply.handleAsync((a, throwable) -> {
System.out.println("handleAsyncCustomize: " + a + " throws " + throwable);
return "handleAsync success";
}, threadPoolExecutor);
System.out.println(handleAsyncCustomize.get());
result:
handleAsyncCustomize: success throws null
handleAsync success
4.thenApply
thenApply And again handle Is very similar , There are both input parameters and return values , But he has only one participant , Unable to handle the exception of the previous asynchronous task . If something unusual happens get I will make a mistake .
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
/** * Function The input parameter is the result of the first asynchronous task execution 、 The output parameter is the return value * If asynchronous task 1 abnormal be 2 Unable to execute get Will report a mistake */
CompletableFuture<String> supplyCustomize = CompletableFuture.supplyAsync(() -> {
System.out.println("completablefuture supplys asynchronously in customize threadPool");
int a = 1/0;
return "success";
}, threadPoolExecutor);
CompletableFuture<String> applyAsyncCustomize = supplyCustomize.thenApplyAsync(a -> {
System.out.println("applyAsyncCustomize " + a);
return "success";
}, threadPoolExecutor);
System.out.println(applyAsyncCustomize.get());
result:
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at CompletableFutureTask.main.main(main.java:48)
Caused by: java.lang.ArithmeticException: / by zero
at CompletableFutureTask.main.lambda$main$3(main.java:40)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
5.thenAccept
thenAccept Yes, there is an input parameter but no return value , If you continue the chain call, the next asynchronous task will get null value .
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
example:
/** * Consumer The first input parameter is the result returned in the previous step , If no result is returned, it will be null * No return value */
CompletableFuture<String> supplyCustomize = CompletableFuture.supplyAsync(() -> {
System.out.println("completablefuture supplys asynchronously in customize threadPool");
return "success";
}, threadPoolExecutor);
CompletableFuture<Void> applyAsyncCustomize = supplyCustomize.thenAcceptAsync(a ->{
System.out.println("accept " + a);
}, threadPoolExecutor);
System.out.println(applyAsyncCustomize.get());
result:
completablefuture supplys asynchronously in customize threadPool
accept success
null
6.thenCompose
and thenCombine Somewhat different ,thenCombine It's a combination of two CompletableFuture The returned results are processed asynchronously , and thenCompose Is based on the first returned result , Encapsulated in new CompletableFuture return
example:
CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
System.out.println("completablefuture supplys asynchronously");
return "success";
});
CompletableFuture<String> future = supply.thenComposeAsync(a -> {
String name = a + " or fail";
return CompletableFuture.supplyAsync(() -> {
return name;
});
});
System.out.println(future.get());
result:
completablefuture supplys asynchronously in customize threadPool
success or fail
7.exceptionally
Handling when an exception occurs , Note that the return value type after the exception needs to be the same as that of the exception CF The return value type is consistent , It is equivalent to the idea of service degradation .
example:
/** * When an exception occurs , return 0 */
CompletableFuture<Integer> supplyCustomize = CompletableFuture.supplyAsync(() -> {
int a = 1/0;
System.out.println("completablefuture supplys asynchronously in customize threadPool");
return 2;
}, threadPoolExecutor).exceptionally(a->{
System.out.println(a);
return 0;
});
System.out.println(supplyCustomize.get());
result:
completablefuture runs asynchronously in customize threadPool
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
0
CompletableFuture combined Api- The task is complete
1.thenCombine
thenCombine It's a combined task , above CompletableFuture The use is done in a chain , When you finish the first , Make the next asynchronous call according to the first execution result , Combined asynchronism , Two asynchronous tasks can be completely independent , Only when they are all finished will they continue to execute .
example:
/** * BiFunction Enter the reference , Two CF As input parameter , There is a return value */
CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
System.out.println("completablefuture supplys asynchronously");
return "success";
});
// Asynchronous running is left to the default thread pool (forkjoinpool) No input parameter has a return value supply
CompletableFuture<String> supplyCustomize = CompletableFuture.supplyAsync(() -> {
System.out.println("completablefuture supplys asynchronously in customize threadPool");
return "success";
}, threadPoolExecutor);
CompletableFuture<String> future = supply.thenCombine(supplyCustomize, (a, b) -> {
return "thenCombine result: " + a + "-" + b;
});
System.out.println(future.get());
result:
completablefuture supplys asynchronously
completablefuture supplys asynchronously in customize threadPool
thenCombine result: success-success
2.thenAcceptBoth
thenAcceptBoth and thenCombine The difference is that there is no return value , Put two CF Return value for processing , no return value
example:
/** * Accept CF、BiConsumer, No return value */
CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
System.out.println("completablefuture supplys asynchronously");
return "success";
});
// Asynchronous running is left to the default thread pool (forkjoinpool) No input parameter has a return value supply
CompletableFuture<String> supplyCustomize = CompletableFuture.supplyAsync(() -> {
System.out.println("completablefuture supplys asynchronously in customize threadPool");
return "success";
}, threadPoolExecutor);
CompletableFuture<Void> bothAsync = supplyCustomize.thenAcceptBothAsync(supply, (a, b) -> {
System.out.println("thenAcceptBoth result " + a + "-" + b);
}, threadPoolExecutor);
result:
completablefuture supplys asynchronously
completablefuture supplys asynchronously in customize threadPool
thenAcceptBoth result success-success
null
3.runAfterBoth
No input parameter, no return value , It will only be executed after the previous two runs are completed runAfterBoth Methods , We can be anywhere CF Simulate long-time running to test .
example:
CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
System.out.println("completablefuture supplys asynchronously");
return "success";
});
// Asynchronous running is left to the default thread pool (forkjoinpool) No input parameter has a return value supply
CompletableFuture<String> supplyCustomize = CompletableFuture.supplyAsync(() -> {
System.out.println("completablefuture supplys asynchronously in customize threadPool");
return "success";
}, threadPoolExecutor);
CompletableFuture<Void> bothAsync = supply.runAfterBothAsync(supplyCustomize, () -> {
System.out.println("run After Both Async");
});
result:
completablefuture supplys asynchronously
completablefuture supplys asynchronously in customize threadPool
run After Both Async
CompletableFuture combined Api- Complete any task
1.acceptEither
and thenAcceptBoth Corresponding , Two CompletableFuture Any execution is completed , Will continue to the next asynchronous task
example:
/** * Task one takes a long time , So when task two is finished, it will be executed acceptEitherAsync * Parameters Consumer The input parameter returns the value for executing the completed task */
CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("completablefuture supplys asynchronously");
return "success1";
CompletableFuture<String> supplyCustomize = CompletableFuture.supplyAsync(() -> {
System.out.println("completablefuture supplys asynchronously in customize threadPool");
return "success2";
}, threadPoolExecutor);
CompletableFuture<Void> future = supply.acceptEitherAsync(supplyCustomize, a -> {
System.out.println("acceptEither result " + a);
});
result:
completablefuture supplys asynchronously in customize threadPool
acceptEither result success2
completablefuture supplys asynchronously
2.applyToEither
Corresponding thenCombine, Two CompletableFuture Any execution is completed , Will continue to the next asynchronous task
example:
/** * Parameter is Function, There are both input parameters and return values */
CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("completablefuture supplys asynchronously");
return "success1";
});
// Asynchronous running is left to the default thread pool (forkjoinpool) No input parameter has a return value supply
CompletableFuture<String> supplyCustomize = CompletableFuture.supplyAsync(() -> {
System.out.println("completablefuture supplys asynchronously in customize threadPool");
return "success2";
}, threadPoolExecutor);
CompletableFuture<String> future = supply.applyToEither(supplyCustomize, a -> {
System.out.println("acceptEither result " + a);
return "appleTo " + a;
});
3.runAfterEither
Corresponding runAfterBoth, Two CompletableFuture Any execution is completed , Will continue to the next asynchronous task
/** * runnable No input parameter, no return value */
CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("completablefuture supplys asynchronously");
return "success1";
});
// Asynchronous running is left to the default thread pool (forkjoinpool) No input parameter has a return value supply
CompletableFuture<String> supplyCustomize = CompletableFuture.supplyAsync(() -> {
System.out.println("completablefuture supplys asynchronously in customize threadPool");
return "success2";
}, threadPoolExecutor);
CompletableFuture<Void> future = supply.runAfterEitherAsync(supplyCustomize, () -> {
System.out.println("runAfterEither result ");
});
CompletableFuture And allOf|anyOf
1.allOf
All completed get Method to get the return value , The return value is null, Each task return value needs to call each CompletableFuture.
example:
/** * When all the tasks are completed , return null, Return value of each task , You need to call a specific * Asynchronous task acquisition */
CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("completablefuture supplys asynchronously");
return "success1";
});
CompletableFuture<String> supplyCustomize = CompletableFuture.supplyAsync(() -> {
System.out.println("completablefuture supplys asynchronously in customize threadPool");
return "success2";
}, threadPoolExecutor);
CompletableFuture<Void> allOf = CompletableFuture.allOf(supply, supplyCustomize);
allOf.get();
System.out.println(supplyCustomize.get() + "=>" + supply.get());
result:
completablefuture supplys asynchronously in customize threadPool
completablefuture supplys asynchronously
success2==success1
2.anyOf
/** * The result of any first execution will be returned first */
CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("completablefuture supplys asynchronously");
return "success1";
});
// Asynchronous running is left to the default thread pool (forkjoinpool) No input parameter has a return value supply
CompletableFuture<Integer> supplyCustomize = CompletableFuture.supplyAsync(() -> {
System.out.println("completablefuture supplys asynchronously in customize threadPool");
return 2;
}, threadPoolExecutor);
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(supply, supplyCustomize);
System.out.println(anyOf.get());
result: Execute first, so return first 2
completablefuture supplys asynchronously in customize threadPool
2
completablefuture supplys asynchronously
summary
CompletableFuture Rich asynchronous invocation methods , It can help us avoid using mainthreads to link multiple asynchronous tasks , Improve program performance , Faster response . At the same time, excellent exception handling mechanism , It can also be solved perfectly in the process of asynchronous call errors .
边栏推荐
- NDK learning notes (IX) POSIX sockect connection oriented communication
- The meaning in the status column displayed by PS aux command
- Use com youth. banner. Solution to the inflateexception reported by the banner plug-in
- Adapter the problem of executing only one animation in multiple frames
- Fix [no Internet, security] problem
- 安装Oracle数据库
- [go deep into kotlin] - flow advanced
- Dichotomy find template
- "All in one" is a platform to solve all needs, and the era of operation and maintenance monitoring 3.0 has come
- ReferenceError: server is not defined
猜你喜欢

NFC Development -- the method of using NFC mobile phones as access control cards (II)

Multithreading tutorial (XXVII) CPU cache and pseudo sharing

Jsonobject jsonarray for parsing

NDK learning notes (12) native graphics API, using avilib to create an avi video player

袋鼠雲數棧基於CBO在Spark SQL優化上的探索

“All in ONE”一个平台解决所有需求,运维监控3.0时代已来

微信小程序text内置组件换行符不换行的原因-wxs处理换行符,正则加段首空格

我们真的需要会议耳机吗?

YOLOv5的Tricks | 【Trick8】图片采样策略——按数据集各类别权重采样

What should the cross-border e-commerce evaluation team do?
随机推荐
AttributeError: ‘HistGradientBoostingClassifier‘ object has no attribute ‘_ n_ features‘
Clear function of ArrayList
20多种云协作功能,3分钟聊透企业的数据安全经
Multithreading tutorial (XXII) happens before principle
Data quality: the core of data governance
Big meal count (time complexity) -- leetcode daily question
[daily exercises] 1 Sum of two numbers
Using batch enumeration files
Cocoatouch framework and building application interface
Start the project using the locally configured gradle
配置Rust编译环境
11. Gesture recognition
DISM命令使用小结
Distributed framework ray - detailed introduction to starting ray and connecting clusters
Convert result set of SQL to set
跨境电商测评自养号团队应该怎么做?
Vscode plug-in development
Clojure installation of metabase source code secondary development
NDK learning notes (13) create an avi video player using avilib+opengl es 2.0
袋鼠雲數棧基於CBO在Spark SQL優化上的探索