当前位置:网站首页>Completabilefuture asynchronous task choreography usage and explanation
Completabilefuture asynchronous task choreography usage and explanation
2022-06-11 05:53:00 【Endwas】
At work , Multiple services or methods are often called to obtain different data , If the traditional method is to acquire one by one in series , Then encapsulate and return to . We can try it CompletableFuture, Delegate multiple operations to asynchronous threads for execution , Then the main thread waits for the longest task to complete , Return all results together .
Future limitations
When we get a result containing Future when , We can use get Method waits for the thread to complete and gets the return value , But we all know future.get() yes Blocking Methods , It will wait until the thread finishes executing to get the return value . We can see FutureTask Medium get Method , It is to loop the code until the thread executes and returns .
/** * Awaits completion or aborts on interrupt or timeout. * * @param timed true if use timed waits * @param nanos time to wait, if timed * @return state upon completion */
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
// loop Omit code
...
}
}
Let's consider a scenario , If we finish the asynchronous task 1 You need to get the return value , Then use the return value to make other asynchronous calls 2, Then we need to wait for asynchronous tasks when the main thread is blocked 1 complete , Then the asynchronous task is executed by 2, Then continue to block the waiting task 2 Return . This is not only Block main thread , and Poor performance .
CompletableFuture Introduce
What is? CompletableFuture:CompletableFuture Combined with the Future The advantages of , Provides a very powerful Future Extension of , Can help us simplify the complexity of asynchronous programming , Provides Functional programming The ability of , The calculation result can be processed by callback , And provides transformations and combinations CompletableFuture Methods .
Functional programming : Use Functional Interface As a parameter , You can use lambda The expression suggests implementing , Convenient programming , I've explained it before .
Common auxiliary methods
- isCompletedExceptionally: The CompletableFuture Whether it ends abnormally .
// Including canceling cancel、 Display call completeExceptionally、 interrupt .
public boolean isCompletedExceptionally() {
Object r;
return ((r = result) instanceof AltResult) && r != NIL;
}
- isCancelled: The CompletableFuture Whether it is cancelled before the normal execution is completed .
public boolean isCancelled() {
Object r;
// Determine whether the exception is CancellationException
return ((r = result) instanceof AltResult) &&
(((AltResult)r).ex instanceof CancellationException);
}
- isDone: The CompletableFuture Whether the execution has ended, including exception generation and cancellation .
public boolean isDone() {
return result != null;
}
- get: Blocking access CompletableFuture result
public T get() throws InterruptedException, ExecutionException {
Object r;
return reportGet((r = result) == null ? waitingGet(true) : r);
}
- join: Blocking access CompletableFuture result
public T join() {
Object r;
return reportJoin((r = result) == null ? waitingGet(false) : r);
}
join() And get() The difference lies in join() Returns the result of the calculation or throws a unchecked abnormal (CompletionException), and get() Returns a specific exception .
CompletableFuture structure
CompletableFuture Construction method with or without parameters , What is created at this time is incomplete CompletableFuture. Use get Will always block the main thread .
So we usually use static methods to create instances .
// No input parameter has a return value
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
// No input parameter, no return value , Simple execution
public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
We noticed that each method has a refactoring method .Executor Parameters can be manually specified thread pool , Otherwise the default ForkJoinPool.commonPool() System level common thread pool .
【 Be careful 】: default commonPool These threads are daemon threads . We need to use guard threads carefully when programming , If we set our ordinary user thread as a daemon thread , When the main thread of our program ends ,JVM There are no other user threads in the , that CompletableFuture The daemon thread will exit directly , Problems that make the task impossible !!
CompletableFuture Commonly used Api
We will explain later api There are generally three similar approaches . I'll just demonstrate the third .
- xxx(method);
- xxxAsync(method);
- xxxAsync(method, executor)
The difference between the three is that the first synchronous execution is performed by the main thread , The second type of asynchronous handover is the default thread pool , The third type of asynchronous handover creates a thread pool .
1. structure CompletableFuture example
Now we will CompletableFuture To try to use . Start with the basic build .
/** * 1.run + runAsync + supply + supplyAsync */
/** 1. Asynchronous running is left to the default thread pool (forkjoinpool) No input parameter, no return value run * 2. Asynchronous running is left to the creation of thread pools (threadPoolExecutor) No input parameter, no return value run */
CompletableFuture<Void> run = CompletableFuture.runAsync(() ->
System.out.println("completablefuture runs asynchronously"));
CompletableFuture<Void> runCustomize = CompletableFuture.runAsync(() ->
System.out.println("completablefuture runs asynchronously in customize threadPool"
), threadPoolExecutor);
//================ Here is supply===================
/** 1. Asynchronous running is left to the default thread pool (forkjoinpool) No input parameter has a return value supply * 2. Asynchronously run the thread pool created by yourself No input parameter has a return value supply */
CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
System.out.println("completablefuture supplys asynchronously");
return "success";
});
CompletableFuture<String> supplyCustomize = CompletableFuture.supplyAsync(() -> {
System.out.println("completablefuture supplys asynchronously in customize threadPool");
return "success";
}, threadPoolExecutor);
result
completablefuture runs asynchronously
completablefuture runs asynchronously in customize threadPool
completablefuture supplys asynchronously
completablefuture supplys asynchronously in customize threadPool
The first two CompletableFuture It's not worth it , So when the subsequent chain calls want to use, the input parameters are null Of .
2.whenComplete
Consider when we are CompletableFuture At the end of the execution , Hope to get the implementation results 、 Or abnormal , Then do further processing for the result or exception . Then we need to use whenComplete.
/** * Participation is BiConsumer, The first parameter is the result of the previous step 、 The second is the exception executed in the previous step */
CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
int a = 1/0;
System.out.println("completablefuture supplys asynchronously");
return "success";
});
CompletableFuture<String> complete = supply.whenCompleteAsync((result, throwable) -> {
System.out.println("whenComplete: " + result + " throws " + throwable);
});
result:
whenComplete: null throws java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
3.handle
handle and whenComplete The input parameters are the same , But it can return after execution Execution results , and whenComplete Can only process and cannot return .
public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)
example:
/** * Participation is BiFunction The first parameter is the result of the previous step 、 The second is the exception executed in the previous step * The return value can be of any type */
CompletableFuture<String> handleAsyncCustomize = supply.handleAsync((a, throwable) -> {
System.out.println("handleAsyncCustomize: " + a + " throws " + throwable);
return "handleAsync success";
}, threadPoolExecutor);
System.out.println(handleAsyncCustomize.get());
result:
handleAsyncCustomize: success throws null
handleAsync success
4.thenApply
thenApply And again handle Is very similar , There are both input parameters and return values , But he has only one participant , Unable to handle the exception of the previous asynchronous task . If something unusual happens get I will make a mistake .
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
/** * Function The input parameter is the result of the first asynchronous task execution 、 The output parameter is the return value * If asynchronous task 1 abnormal be 2 Unable to execute get Will report a mistake */
CompletableFuture<String> supplyCustomize = CompletableFuture.supplyAsync(() -> {
System.out.println("completablefuture supplys asynchronously in customize threadPool");
int a = 1/0;
return "success";
}, threadPoolExecutor);
CompletableFuture<String> applyAsyncCustomize = supplyCustomize.thenApplyAsync(a -> {
System.out.println("applyAsyncCustomize " + a);
return "success";
}, threadPoolExecutor);
System.out.println(applyAsyncCustomize.get());
result:
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at CompletableFutureTask.main.main(main.java:48)
Caused by: java.lang.ArithmeticException: / by zero
at CompletableFutureTask.main.lambda$main$3(main.java:40)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
5.thenAccept
thenAccept Yes, there is an input parameter but no return value , If you continue the chain call, the next asynchronous task will get null value .
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
example:
/** * Consumer The first input parameter is the result returned in the previous step , If no result is returned, it will be null * No return value */
CompletableFuture<String> supplyCustomize = CompletableFuture.supplyAsync(() -> {
System.out.println("completablefuture supplys asynchronously in customize threadPool");
return "success";
}, threadPoolExecutor);
CompletableFuture<Void> applyAsyncCustomize = supplyCustomize.thenAcceptAsync(a ->{
System.out.println("accept " + a);
}, threadPoolExecutor);
System.out.println(applyAsyncCustomize.get());
result:
completablefuture supplys asynchronously in customize threadPool
accept success
null
6.thenCompose
and thenCombine Somewhat different ,thenCombine It's a combination of two CompletableFuture The returned results are processed asynchronously , and thenCompose Is based on the first returned result , Encapsulated in new CompletableFuture return
example:
CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
System.out.println("completablefuture supplys asynchronously");
return "success";
});
CompletableFuture<String> future = supply.thenComposeAsync(a -> {
String name = a + " or fail";
return CompletableFuture.supplyAsync(() -> {
return name;
});
});
System.out.println(future.get());
result:
completablefuture supplys asynchronously in customize threadPool
success or fail
7.exceptionally
Handling when an exception occurs , Note that the return value type after the exception needs to be the same as that of the exception CF The return value type is consistent , It is equivalent to the idea of service degradation .
example:
/** * When an exception occurs , return 0 */
CompletableFuture<Integer> supplyCustomize = CompletableFuture.supplyAsync(() -> {
int a = 1/0;
System.out.println("completablefuture supplys asynchronously in customize threadPool");
return 2;
}, threadPoolExecutor).exceptionally(a->{
System.out.println(a);
return 0;
});
System.out.println(supplyCustomize.get());
result:
completablefuture runs asynchronously in customize threadPool
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
0
CompletableFuture combined Api- The task is complete
1.thenCombine
thenCombine It's a combined task , above CompletableFuture The use is done in a chain , When you finish the first , Make the next asynchronous call according to the first execution result , Combined asynchronism , Two asynchronous tasks can be completely independent , Only when they are all finished will they continue to execute .
example:
/** * BiFunction Enter the reference , Two CF As input parameter , There is a return value */
CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
System.out.println("completablefuture supplys asynchronously");
return "success";
});
// Asynchronous running is left to the default thread pool (forkjoinpool) No input parameter has a return value supply
CompletableFuture<String> supplyCustomize = CompletableFuture.supplyAsync(() -> {
System.out.println("completablefuture supplys asynchronously in customize threadPool");
return "success";
}, threadPoolExecutor);
CompletableFuture<String> future = supply.thenCombine(supplyCustomize, (a, b) -> {
return "thenCombine result: " + a + "-" + b;
});
System.out.println(future.get());
result:
completablefuture supplys asynchronously
completablefuture supplys asynchronously in customize threadPool
thenCombine result: success-success
2.thenAcceptBoth
thenAcceptBoth and thenCombine The difference is that there is no return value , Put two CF Return value for processing , no return value
example:
/** * Accept CF、BiConsumer, No return value */
CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
System.out.println("completablefuture supplys asynchronously");
return "success";
});
// Asynchronous running is left to the default thread pool (forkjoinpool) No input parameter has a return value supply
CompletableFuture<String> supplyCustomize = CompletableFuture.supplyAsync(() -> {
System.out.println("completablefuture supplys asynchronously in customize threadPool");
return "success";
}, threadPoolExecutor);
CompletableFuture<Void> bothAsync = supplyCustomize.thenAcceptBothAsync(supply, (a, b) -> {
System.out.println("thenAcceptBoth result " + a + "-" + b);
}, threadPoolExecutor);
result:
completablefuture supplys asynchronously
completablefuture supplys asynchronously in customize threadPool
thenAcceptBoth result success-success
null
3.runAfterBoth
No input parameter, no return value , It will only be executed after the previous two runs are completed runAfterBoth Methods , We can be anywhere CF Simulate long-time running to test .
example:
CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
System.out.println("completablefuture supplys asynchronously");
return "success";
});
// Asynchronous running is left to the default thread pool (forkjoinpool) No input parameter has a return value supply
CompletableFuture<String> supplyCustomize = CompletableFuture.supplyAsync(() -> {
System.out.println("completablefuture supplys asynchronously in customize threadPool");
return "success";
}, threadPoolExecutor);
CompletableFuture<Void> bothAsync = supply.runAfterBothAsync(supplyCustomize, () -> {
System.out.println("run After Both Async");
});
result:
completablefuture supplys asynchronously
completablefuture supplys asynchronously in customize threadPool
run After Both Async
CompletableFuture combined Api- Complete any task
1.acceptEither
and thenAcceptBoth Corresponding , Two CompletableFuture Any execution is completed , Will continue to the next asynchronous task
example:
/** * Task one takes a long time , So when task two is finished, it will be executed acceptEitherAsync * Parameters Consumer The input parameter returns the value for executing the completed task */
CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("completablefuture supplys asynchronously");
return "success1";
CompletableFuture<String> supplyCustomize = CompletableFuture.supplyAsync(() -> {
System.out.println("completablefuture supplys asynchronously in customize threadPool");
return "success2";
}, threadPoolExecutor);
CompletableFuture<Void> future = supply.acceptEitherAsync(supplyCustomize, a -> {
System.out.println("acceptEither result " + a);
});
result:
completablefuture supplys asynchronously in customize threadPool
acceptEither result success2
completablefuture supplys asynchronously
2.applyToEither
Corresponding thenCombine, Two CompletableFuture Any execution is completed , Will continue to the next asynchronous task
example:
/** * Parameter is Function, There are both input parameters and return values */
CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("completablefuture supplys asynchronously");
return "success1";
});
// Asynchronous running is left to the default thread pool (forkjoinpool) No input parameter has a return value supply
CompletableFuture<String> supplyCustomize = CompletableFuture.supplyAsync(() -> {
System.out.println("completablefuture supplys asynchronously in customize threadPool");
return "success2";
}, threadPoolExecutor);
CompletableFuture<String> future = supply.applyToEither(supplyCustomize, a -> {
System.out.println("acceptEither result " + a);
return "appleTo " + a;
});
3.runAfterEither
Corresponding runAfterBoth, Two CompletableFuture Any execution is completed , Will continue to the next asynchronous task
/** * runnable No input parameter, no return value */
CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("completablefuture supplys asynchronously");
return "success1";
});
// Asynchronous running is left to the default thread pool (forkjoinpool) No input parameter has a return value supply
CompletableFuture<String> supplyCustomize = CompletableFuture.supplyAsync(() -> {
System.out.println("completablefuture supplys asynchronously in customize threadPool");
return "success2";
}, threadPoolExecutor);
CompletableFuture<Void> future = supply.runAfterEitherAsync(supplyCustomize, () -> {
System.out.println("runAfterEither result ");
});
CompletableFuture And allOf|anyOf
1.allOf
All completed get Method to get the return value , The return value is null, Each task return value needs to call each CompletableFuture.
example:
/** * When all the tasks are completed , return null, Return value of each task , You need to call a specific * Asynchronous task acquisition */
CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("completablefuture supplys asynchronously");
return "success1";
});
CompletableFuture<String> supplyCustomize = CompletableFuture.supplyAsync(() -> {
System.out.println("completablefuture supplys asynchronously in customize threadPool");
return "success2";
}, threadPoolExecutor);
CompletableFuture<Void> allOf = CompletableFuture.allOf(supply, supplyCustomize);
allOf.get();
System.out.println(supplyCustomize.get() + "=>" + supply.get());
result:
completablefuture supplys asynchronously in customize threadPool
completablefuture supplys asynchronously
success2==success1
2.anyOf
/** * The result of any first execution will be returned first */
CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("completablefuture supplys asynchronously");
return "success1";
});
// Asynchronous running is left to the default thread pool (forkjoinpool) No input parameter has a return value supply
CompletableFuture<Integer> supplyCustomize = CompletableFuture.supplyAsync(() -> {
System.out.println("completablefuture supplys asynchronously in customize threadPool");
return 2;
}, threadPoolExecutor);
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(supply, supplyCustomize);
System.out.println(anyOf.get());
result: Execute first, so return first 2
completablefuture supplys asynchronously in customize threadPool
2
completablefuture supplys asynchronously
summary
CompletableFuture Rich asynchronous invocation methods , It can help us avoid using mainthreads to link multiple asynchronous tasks , Improve program performance , Faster response . At the same time, excellent exception handling mechanism , It can also be solved perfectly in the process of asynchronous call errors .
边栏推荐
- Further efficient identification of memory leakage based on memory optimization tool leakcanary and bytecode instrumentation technology
- 使用Batch管理VHD
- NFC Development -- utility tools and development documents (IV)
- Sword finger offer 50: the first character that appears only once
- Clojure installation of metabase source code secondary development
- 数组部分方法
- [go deep into kotlin] - flow advanced
- Altiumdesigner2020 import 3D body SolidWorks 3D model
- 20多种云协作功能,3分钟聊透企业的数据安全经
- Sword finger offer 32: print binary tree from top to bottom
猜你喜欢

做亚马逊测评要了解的知识点有哪些?

NDK R21 compiles ffmpeg 4.2.2 (x86, x86_64, armv7, armv8)

Pycharm usage experience

NDK learning notes (14) create an avi video player using avilib+window

NFC Development -- difference between ID card and IC card (M1 card and CPU card) (III)

使用Genymotion Scrapy控制手机

Preliminary understanding of multi task learning
![Experimental report on information management and information system [information security and confidentiality] of Huazhong Agricultural University](/img/f6/e58196aeac85178f6603cea1962a6e.jpg)
Experimental report on information management and information system [information security and confidentiality] of Huazhong Agricultural University

NDK learning notes (VI) Basics: memory management, standard file i/o

Cocoatouch framework and building application interface
随机推荐
11. Gesture recognition
Using batch enumeration files
Super (subclass)__ init__ And parent class__ init__ ()
Elk log system practice (VI): comparison between vector and filebeat for technology selection
Recursively process data accumulation
Managing VHDS using batch
Méthode de la partie du tableau
NDK learning notes (14) create an avi video player using avilib+window
SwiftUI: Navigation all know
NDK R21 compiles ffmpeg 4.2.2+x264 and converts video files using ffmpeg
NDK R21 compiles ffmpeg 4.2.2 (x86, x86_64, armv7, armv8)
使用Genymotion Scrapy控制手机
DISM命令使用小结
Multithreading tutorial (XXVI) field updater and atomic accumulator
NDK learning notes (13) create an avi video player using avilib+opengl es 2.0
亚马逊、速卖通、Lazada、虾皮平台在用911+VM的环境可以进行产号、养号、补单等操作吗?
All questions and answers of database SQL practice niuke.com
Dichotomy find template
PageHelper page 2 collections in the same interface
创建酷炫的 CollectionViewCell 转换动画