当前位置:网站首页>Use of completable future
Use of completable future
2022-07-07 06:59:00 【Ghost punishment OLO】
CompletableFuture The use of,
Future The limitations of , He can't directly combine multiple tasks in a chain , It needs the help of concurrent tool classes to complete , Complex implementation logic
and CompletableFutures It's right Future Expansion and enhancement of ,CompltableFuture Realization Future Interface , On this basis, it has carried out rich expansion , It makes up for Future The limitations of , meanwhile CompletableFuture Realize the ability of arranging tasks . With this ability , It can easily organize the running sequence of different tasks , Rules and methods , In a way , This ability is his core ability . And in the past , Though through CountDownlatch And other tool classes can also realize the arrangement of tasks , But it requires complex logical processing
ComplatableFuture The infrastructure of is as follows

CompletionStage Interface defines the method of task arrangement , Perform a certain stage , You can go down to the next stage . Executed asynchronously
The default thread pool is **ForkJoinPool.commonPool(), But in order not to affect each other , And easy to locate the problem , Custom thread pools are highly recommended **.
CompletableFuture The default Thread pool as follows :
// according to commonPool To choose the degree of parallelism , The calculation of parallelism is in ForkJoinPool Static code snippets complete
private static final boolean useCommonPool =
(ForkJoinPool.getCommonPoolParallelism() > 1);
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
ForkJoinPool In the initialization commonPool Parameters of
static {
// initialize field offsets for CAS etc
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ForkJoinPool.class;
CTL = U.objectFieldOffset
(k.getDeclaredField("ctl"));
RUNSTATE = U.objectFieldOffset
(k.getDeclaredField("runState"));
STEALCOUNTER = U.objectFieldOffset
(k.getDeclaredField("stealCounter"));
Class<?> tk = Thread.class;
……
} catch (Exception e) {
throw new Error(e);
}
commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
defaultForkJoinWorkerThreadFactory =
new DefaultForkJoinWorkerThreadFactory();
modifyThreadPermission = new RuntimePermission("modifyThread");
// call makeCommonPool Method creation commonPool, The parallelism is the number of logical cores -1
common = java.security.AccessController.doPrivileged
(new java.security.PrivilegedAction<ForkJoinPool>() {
public ForkJoinPool run() { return makeCommonPool(); }});
int par = common.config & SMASK; // report 1 even if threads disabled
commonParallelism = par > 0 ? par : 1;
}
function
Common methods
Dependency relationship
- thenApply(): Put the execution results of the previous tasks , Give it to the back Function
- thenCompose(): Task used to connect two dependencies , The result is returned by the second task
and A collection of relations
- thenCombine(): Merge tasks , There is a return value 、
- thenAcceptBoth(): I want to finish the two hot broadcasts , Give the results to thenAcceptBotj Handle , No return value
- runAfterBoth(); When both tasks are completed , Take the next step
or Aggregate relationship
applyToEither(): Which of the two tasks performs faster , Just use which result , There is a return valueacceptEither(): Which of the two tasks performs faster , Just the result of consumption , No return valuerunAfterEither(): Any task is completed , Proceed to the next step (RunnableTypes of tasks )
Parallel execution
allOf(): When all given CompletableFuture When finished , Back to a new CompletableFutureanyOf(): When any givenCompletablFutureWhen finished , Back to a new CompletableFuture
The result processing
- whenComplete: When the task is completed , The results will be used ( or null) And exceptions at this stage ( or null without ) Perform the given operation
- exceptionally: Back to a new CompletableFuture, When the front CompletableFuture When finished , It's also done , When it is completed abnormally , The exception of a given function triggers this CompletableFuture Completion
Asynchronous operations
CompletableFuture Four static methods are provided to create an asynchronous operation
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
The difference between the four methods
- runAsync() With Runnable The function interface type is parameter , No results returned ,supplyAsync() With Supplier Functional interface type has no parameters , The return result type is not U;Supplier Interface get() Is blocked by the return value
- Use unspecified Executor Method time , For internal use Fork Join Pool.commonPool() Execute asynchronous code as his thread pool . If no thread pool is specified , Use the specified thread pool to run
- By default CompletableFuture Will use public ForkJoinPool Thread pool , The default number of threads created by this thread pool is CPU The number of nuclear ( It can also be done through JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism To set up ForkJoinPool Number of threads in the thread pool ). If all CompletableFuture Share a thread pool , So once there's a task to perform some very slow I/O operation , Will cause all threads in the thread pool to block in I/O Operationally , This causes thread starvation , And then affect the performance of the whole system . therefore , It is strongly recommended that you create different thread pools according to different business types , To avoid interfering with each other
public static void main(String[] args) throws ExecutionException, InterruptedException {
Runnable runnable = ()-> System.out.println(" No return result task ");
CompletableFuture.runAsync(runnable);
CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
System.out.println(" Asynchronous tasks with return values ");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello world";
});
String result = future.get();
System.out.println();
}
To get the results (join&get)
join() and get() Methods are used to get CompletableFuture Return value after asynchrony ,join() Method throws uncheck abnormal
, Developers will not be forced to throw .get() Method throws a checked exception ,ExecutionException, InterruptedException It needs to be handled manually by the user ( Throw or try catch)
The result processing
When CompletableFuture The result of calculation is finished , Or throw an exception , We can execute specific Action. Mainly the following methods :
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
- Action The type is BiConsumer<? super T,? super Throwable>, It can handle normal calculations , Or something unusual .
- The method is different Async ending , signify Action Execute with the same thread , and Async Other threads may be used to execute ( If you use the same thread pool , It may also be executed by the same thread ).
- These methods will return CompletableFuture, When Action After execution, its results return to the original CompletableFuture Or return an exception
test
package ThreadPoolExecultorTest;
import java.util.*;
import java.util.concurrent.*;
/**
* @Author lijiaxin
* @Description TODO
* @Date 2022/07/05/9:23
* @Version 1.0
*/
public class CompletableFuture1Test {
public static void main(String[] args) {
for (int i = 0; i < 4 ; i++) {
/* Start the main thread */
int finalI = i;
CompletableFuture.runAsync(()->{
final CompletableFuture<List> listmates = CompletableFuture.supplyAsync(()->{
List list = null;
if (finalI == 1){
list = test1();
}else if(finalI == 2){
list = test2();
}else if(finalI == 3){
list = test3();
}else {
list = test4();
}
return list;
},threadPoolExecutor());
listmates.thenAcceptAsync(list->{
linkedLists((LinkedList) list);
},threadPoolExecutor());
},threadPoolExecutor());
}
}
public static ThreadPoolExecutor threadPoolExecutor() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 1,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(20),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()); // Tasks cannot be discarded - Synchronous execution
return executor;
}
public static List<String> test1(){
System.out.println("1");
List<String> list = new LinkedList<>();
list.add("1");
return list;
}
public static List<String> test2(){
System.out.println("2");
List<String> list = new LinkedList<>();
list.add("2");
return list;
}
public static List<String> test4(){
System.out.println("4");
List<String> list = new LinkedList<>();
list.add("4");
return list;
}
public static List<String> test3(){
System.out.println("3");
List<String> list = new LinkedList<>();
list.add("3");
return list;
}
public static LinkedList<LinkedList<String>> linkedLists(LinkedList linkedList){
System.out.println(linkedList);
LinkedList linkedList1 = null;
linkedList1.add(linkedList);
for (Object list:linkedList1) {
System.out.println(list.toString());
}
return linkedList1;
}
}
边栏推荐
- Postgresql源码(60)事务系统总结
- LM11丨重构K线构建择时交易策略
- 多学科融合
- SolidWorks GB Library (steel profile library, including aluminum profile, aluminum tube and other structures) installation and use tutorial (generating aluminum profile as an example)
- Algorithm --- bit count (kotlin)
- How can flinksql calculate the difference between a field before and after update when docking with CDC?
- 品牌·咨询标准化
- 分布式id解决方案
- 数据资产管理与数据安全国内外最新趋势
- 化工园区危化品企业安全风险智能化管控平台建设四大目标
猜你喜欢

Comment les entreprises gèrent - elles les données? Partager les leçons tirées des quatre aspects de la gouvernance des données

Mysql---- import and export & View & Index & execution plan

反射(二)

大促过后,销量与流量兼具,是否真的高枕无忧?
SVN version management in use replacement release and connection reset

Matlab tips (29) polynomial fitting plotfit

MATLAB小技巧(30)非线性拟合 lsqcurefit

Config distributed configuration center

Installing redis and windows extension method under win system

Data of all class a scenic spots in China in 2022 (13604)
随机推荐
Programmers' daily | daily anecdotes
How to share the same storage among multiple kubernetes clusters
Networkx绘图和常用库函数坐标绘图
MySQL view bin log and recover data
多个kubernetes集群如何实现共享同一个存储
「运维有小邓」符合GDPR的合规要求
隐马尔科夫模型(HMM)学习笔记
品牌电商如何逆势增长?在这里预见未来!
请教一下,监听pgsql ,怎样可以监听多个schema和table
Pinduoduo lost the lawsuit: "bargain for free" infringed the right to know but did not constitute fraud, and was sentenced to pay 400 yuan
.net core 访问不常见的静态文件类型(MIME 类型)
FPGA课程:JESD204B的应用场景(干货分享)
2022年全国所有A级景区数据(13604条)
根据IP获取地市
Please answer the questions about database data transfer
ESXI挂载移动(机械)硬盘详细教程
Matlab tips (29) polynomial fitting plotfit
js装饰器@decorator学习笔记
使用TCP/IP四层模型进行网络传输的基本流程
Redhat5 installing vmware tools under virtual machine