当前位置:网站首页>Use of completable future
Use of completable future
2022-07-07 06:59:00 【Ghost punishment OLO】
CompletableFuture The use of,
Future The limitations of , He can't directly combine multiple tasks in a chain , It needs the help of concurrent tool classes to complete , Complex implementation logic
and CompletableFutures It's right Future Expansion and enhancement of ,CompltableFuture Realization Future Interface , On this basis, it has carried out rich expansion , It makes up for Future The limitations of , meanwhile CompletableFuture Realize the ability of arranging tasks . With this ability , It can easily organize the running sequence of different tasks , Rules and methods , In a way , This ability is his core ability . And in the past , Though through CountDownlatch And other tool classes can also realize the arrangement of tasks , But it requires complex logical processing
ComplatableFuture The infrastructure of is as follows
CompletionStage Interface defines the method of task arrangement , Perform a certain stage , You can go down to the next stage . Executed asynchronously
The default thread pool is **ForkJoinPool.commonPool()
, But in order not to affect each other , And easy to locate the problem , Custom thread pools are highly recommended **.
CompletableFuture
The default Thread pool as follows :
// according to commonPool To choose the degree of parallelism , The calculation of parallelism is in ForkJoinPool Static code snippets complete
private static final boolean useCommonPool =
(ForkJoinPool.getCommonPoolParallelism() > 1);
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
ForkJoinPool In the initialization commonPool Parameters of
static {
// initialize field offsets for CAS etc
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ForkJoinPool.class;
CTL = U.objectFieldOffset
(k.getDeclaredField("ctl"));
RUNSTATE = U.objectFieldOffset
(k.getDeclaredField("runState"));
STEALCOUNTER = U.objectFieldOffset
(k.getDeclaredField("stealCounter"));
Class<?> tk = Thread.class;
……
} catch (Exception e) {
throw new Error(e);
}
commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
defaultForkJoinWorkerThreadFactory =
new DefaultForkJoinWorkerThreadFactory();
modifyThreadPermission = new RuntimePermission("modifyThread");
// call makeCommonPool Method creation commonPool, The parallelism is the number of logical cores -1
common = java.security.AccessController.doPrivileged
(new java.security.PrivilegedAction<ForkJoinPool>() {
public ForkJoinPool run() { return makeCommonPool(); }});
int par = common.config & SMASK; // report 1 even if threads disabled
commonParallelism = par > 0 ? par : 1;
}
function
Common methods
Dependency relationship
- thenApply(): Put the execution results of the previous tasks , Give it to the back Function
- thenCompose(): Task used to connect two dependencies , The result is returned by the second task
and A collection of relations
- thenCombine(): Merge tasks , There is a return value 、
- thenAcceptBoth(): I want to finish the two hot broadcasts , Give the results to thenAcceptBotj Handle , No return value
- runAfterBoth(); When both tasks are completed , Take the next step
or Aggregate relationship
applyToEither()
: Which of the two tasks performs faster , Just use which result , There is a return valueacceptEither()
: Which of the two tasks performs faster , Just the result of consumption , No return valuerunAfterEither()
: Any task is completed , Proceed to the next step (Runnable
Types of tasks )
Parallel execution
allOf()
: When all given CompletableFuture When finished , Back to a new CompletableFutureanyOf()
: When any givenCompletablFuture
When finished , Back to a new CompletableFuture
The result processing
- whenComplete: When the task is completed , The results will be used ( or null) And exceptions at this stage ( or null without ) Perform the given operation
- exceptionally: Back to a new CompletableFuture, When the front CompletableFuture When finished , It's also done , When it is completed abnormally , The exception of a given function triggers this CompletableFuture Completion
Asynchronous operations
CompletableFuture Four static methods are provided to create an asynchronous operation
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
The difference between the four methods
- runAsync() With Runnable The function interface type is parameter , No results returned ,supplyAsync() With Supplier Functional interface type has no parameters , The return result type is not U;Supplier Interface get() Is blocked by the return value
- Use unspecified Executor Method time , For internal use Fork Join Pool.commonPool() Execute asynchronous code as his thread pool . If no thread pool is specified , Use the specified thread pool to run
- By default CompletableFuture Will use public ForkJoinPool Thread pool , The default number of threads created by this thread pool is CPU The number of nuclear ( It can also be done through JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism To set up ForkJoinPool Number of threads in the thread pool ). If all CompletableFuture Share a thread pool , So once there's a task to perform some very slow I/O operation , Will cause all threads in the thread pool to block in I/O Operationally , This causes thread starvation , And then affect the performance of the whole system . therefore , It is strongly recommended that you create different thread pools according to different business types , To avoid interfering with each other
public static void main(String[] args) throws ExecutionException, InterruptedException {
Runnable runnable = ()-> System.out.println(" No return result task ");
CompletableFuture.runAsync(runnable);
CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
System.out.println(" Asynchronous tasks with return values ");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello world";
});
String result = future.get();
System.out.println();
}
To get the results (join&get)
join() and get() Methods are used to get CompletableFuture Return value after asynchrony ,join() Method throws uncheck abnormal
, Developers will not be forced to throw .get() Method throws a checked exception ,ExecutionException, InterruptedException It needs to be handled manually by the user ( Throw or try catch)
The result processing
When CompletableFuture The result of calculation is finished , Or throw an exception , We can execute specific Action. Mainly the following methods :
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
- Action The type is BiConsumer<? super T,? super Throwable>, It can handle normal calculations , Or something unusual .
- The method is different Async ending , signify Action Execute with the same thread , and Async Other threads may be used to execute ( If you use the same thread pool , It may also be executed by the same thread ).
- These methods will return CompletableFuture, When Action After execution, its results return to the original CompletableFuture Or return an exception
test
package ThreadPoolExecultorTest;
import java.util.*;
import java.util.concurrent.*;
/**
* @Author lijiaxin
* @Description TODO
* @Date 2022/07/05/9:23
* @Version 1.0
*/
public class CompletableFuture1Test {
public static void main(String[] args) {
for (int i = 0; i < 4 ; i++) {
/* Start the main thread */
int finalI = i;
CompletableFuture.runAsync(()->{
final CompletableFuture<List> listmates = CompletableFuture.supplyAsync(()->{
List list = null;
if (finalI == 1){
list = test1();
}else if(finalI == 2){
list = test2();
}else if(finalI == 3){
list = test3();
}else {
list = test4();
}
return list;
},threadPoolExecutor());
listmates.thenAcceptAsync(list->{
linkedLists((LinkedList) list);
},threadPoolExecutor());
},threadPoolExecutor());
}
}
public static ThreadPoolExecutor threadPoolExecutor() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 1,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(20),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()); // Tasks cannot be discarded - Synchronous execution
return executor;
}
public static List<String> test1(){
System.out.println("1");
List<String> list = new LinkedList<>();
list.add("1");
return list;
}
public static List<String> test2(){
System.out.println("2");
List<String> list = new LinkedList<>();
list.add("2");
return list;
}
public static List<String> test4(){
System.out.println("4");
List<String> list = new LinkedList<>();
list.add("4");
return list;
}
public static List<String> test3(){
System.out.println("3");
List<String> list = new LinkedList<>();
list.add("3");
return list;
}
public static LinkedList<LinkedList<String>> linkedLists(LinkedList linkedList){
System.out.println(linkedList);
LinkedList linkedList1 = null;
linkedList1.add(linkedList);
for (Object list:linkedList1) {
System.out.println(list.toString());
}
return linkedList1;
}
}
边栏推荐
- 企业如何进行数据治理?分享数据治理4个方面的经验总结
- 化工园区危化品企业安全风险智能化管控平台建设四大目标
- 品牌电商如何逆势增长?在这里预见未来!
- from . onnxruntime_ pybind11_ State Import * noqa ddddocr operation error
- Bus message bus
- [start from scratch] detailed process of deploying yolov5 in win10 system (CPU, no GPU)
- Unable to debug screen program with serial port
- dolphinscheduler3. X local startup
- 什么情况下考虑分库分表
- .net core 访问不常见的静态文件类型(MIME 类型)
猜你喜欢
从零到一,教你搭建「CLIP 以文搜图」搜索服务(二):5 分钟实现原型
Prime partner of Huawei machine test questions
Cloudcompare point pair selection
Abnova 免疫组化服务解决方案
How can clothing stores make profits?
JDBC database connection pool usage problem
Learning notes | data Xiaobai uses dataease to make a large data screen
Bus message bus
POI export to excel: set font, color, row height adaptation, column width adaptation, lock cells, merge cells
Pinduoduo lost the lawsuit: "bargain for free" infringed the right to know but did not constitute fraud, and was sentenced to pay 400 yuan
随机推荐
Algorithm --- bit count (kotlin)
.net 5 FluentFTP连接FTP失败问题:This operation is only allowed using a successfully authenticated context
Problems and precautions about using data pumps (expdp, impdp) to export and import large capacity tables in Oracle migration
多线程与高并发(9)——AQS其他同步组件(Semaphore、ReentrantReadWriteLock、Exchanger)
ViewModelProvider.of 过时方法解决
unity3d学习笔记
工具类:对象转map 驼峰转下划线 下划线转驼峰
Postgresql源码(60)事务系统总结
Big coffee gathering | nextarch foundation cloud development meetup is coming
Prime partner of Huawei machine test questions
Under what circumstances should we consider sub database and sub table
LVS+Keepalived(DR模式)学习笔记
循环肿瘤细胞——Abnova 解决方案来啦
精准时空行程流调系统—基于UWB超高精度定位系统
Anr principle and Practice
大促过后,销量与流量兼具,是否真的高枕无忧?
Matlab tips (29) polynomial fitting plotfit
「运维有小邓」符合GDPR的合规要求
jdbc数据库连接池使用问题
数据资产管理与数据安全国内外最新趋势