I have been swimming in RocketMQ In the source code , Found in RocketMQ It is used in many places CompletableFuture, So let's talk with you today JDK1.8 Asynchronous artifact provided CompletableFuture, And will finally combine RocketMQ Source code analysis CompletableFuture Use .
Future Interface and its limitations
We all know ,Java There are two main ways to create threads in , Inherit Thread Or realize Runnable Interface . But both of them have a common shortcoming , That is, the result of thread execution cannot be obtained , That is, there is no return value . So in JDK1.5 In the future, in order to solve the problem of no return value , Provides Callable and Future Interface and Future The corresponding implementation class FutureTask, adopt FutureTask You can get the result of asynchronous execution .
So , We want to start asynchronous threads , Perform tasks , To get the results , This can be achieved .
FutureTask<String> futureTask = new FutureTask<>(() -> " Three friends ");
new Thread(futureTask).start();
Or use thread pool
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> future = executorService.submit(() -> " Three friends ");
The thread pool bottom layer will also be committed Callable The implementation of is encapsulated into FutureTask, And then through execute Method to submit a task , Execute asynchronous logic .
Future Work, limitations
Though through Future Interface get Method can obtain the result of asynchronous execution of a task , however get Method will block the main thread , That is, the asynchronous task is not completed , The main thread will always block , Until the end of the mission .
Future Also provided isDone Method to check whether the asynchronous thread task execution is completed , If completed , You can get the execution result of the task , The code is as follows .
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> future = executorService.submit(() -> " Three friends ");
while (!future.isDone()) {
// Has the task been completed , If not, continue to judge circularly
But this kind of polling checks the execution status of asynchronous thread tasks , It's also very consuming cpu resources .
At the same time, it deals with some complex asynchronous operation tasks , Various synchronization components may be required to complete it together .
therefore , As can be seen from the above introduction ,Future There are still strong limitations in the process of use , So in order to solve this limitation , stay JDK1.8 When ,Doug Lea The great God provides us with a more powerful class CompletableFuture.
What is? CompletableFuture?
CompletableFuture stay JDK1.8 Provides a more powerful asynchronous programming api. It has achieved Future Interface , That is to say Future Functional characteristics of CompletableFuture Also have ; besides , It also achieved CompletionStage Interface ,CompletionStage Interface defines the method of task arrangement , Perform a certain stage , You can go down to the next stage .
CompletableFuture Compared with Future The biggest improvement is to provide a callback monitoring function similar to the observer mode , That is, when the task of the previous stage is completed , You can call back the tasks of the next stage specified by you , There is no need to block the results obtained before processing the results .
CompletableFuture common api Detailed explanation
CompletableFuture Methods api many , But it can be mainly divided into the following categories .
1、 Instantiation CompletableFuture
Constructors create
CompletableFuture<String> completableFuture = new CompletableFuture<>();
At this time, if there are other threads executing the following code , You can print out Three friends
completableFuture.complete(" Three friends ")
Static method creation
In addition to using construction methods to construct ,CompletableFuture Static methods are also provided to create
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor); public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
supply and run The main difference is supply You can have a return value ,run no return value . As for another parameter Executor Is the thread pool used to execute asynchronous tasks , If you don't pass Executor Words , The default is ForkJoinPool The implementation of this thread pool .
Once constructed by static methods , Will immediately start asynchronous thread execution Supplier perhaps Runnable Submitted tasks .
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> " Three friends ");
Once the task is completed , You can print the return value , The usage here is the same as Future It's the same .
So compare two instantiation methods , The main difference between using static methods and using construction methods is , Using construction methods requires other threads to actively call complete To indicate the completion of task execution , Because it's very simple , Because asynchronous tasks are not executed during construction , Therefore, other threads are required to actively call complete To indicate the completion of task execution .
2、 Gets the result of the task execution
public T get();
public T get(long timeout, TimeUnit unit);
public T getNow(T valueIfAbsent);
public T join();
get() and get(long timeout, TimeUnit unit) Is to implement the Future Function of interface , The main difference between the two is get() It will block until the result is obtained ,get(long timeout, TimeUnit unit) Value can specify the timeout , When it's time to get the task , Will throw TimeoutException abnormal .
getNow(T valueIfAbsent): Is to get the execution result of the task , But there will be no blockage . If the task has not been completed , Then it will return your incoming valueIfAbsent Parameter values , If the execution is completed , The result of task execution will be returned .
join(): Follow get() The main difference is ,get() Will throw an exception during the check ,join() Can't .
3、 Actively trigger task completion
public boolean complete(T value);
public boolean completeExceptionally(Throwable ex);
complete: Actively trigger the completion of the current asynchronous task . When calling this method, if your task has been completed , Then the method will return false; If the task is not completed , It will return true, And the result of the task obtained by other threads is complete Parameter values for .
completeExceptionally: Follow complete It's about the same ,complete Is the normal end of the task , Return results , and completeExceptionally Is the exception that triggers task execution .
4、 Carry out the next step of processing the task execution results
You can only receive the callback after the normal execution of the task
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public CompletableFuture<Void> thenRun(Runnable action);
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
The characteristic of this kind of callback is , When the task is completed normally , When there is no exception, it will call back .
thenApply: You can get the result of the task execution in the previous step for processing , And return the processing result thenRun: You can't get the result of the previous task , But it will carry out Runnable Interface implementation thenAccept: You can get the result of the task execution in the previous step for processing , But there is no need to return the result of processing
thenApply Example :
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> 10)
.thenApply(v -> (" The result of the previous step is :" + v));
Execution results :
The result of the previous step is :10
thenRun Example :
CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> 10)
.thenRun(() -> System.out.println(" The previous step is completed "));
Execution results :
The previous step is completed
thenAccept Example :
CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> 10)
.thenAccept(v -> System.out.println(" The previous step is completed , The result is :" + v));
Execution results :
The previous step is completed , The result is :10
thenApply There are abnormal examples :
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
// Simulate anomalies
int i = 1 / 0;
return 10;
}).thenApply(v -> (" The result of the previous step is :" + v));
Execution results :
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
When there is an exception, it will not be recalled
You can only receive callbacks after the task handles exceptions
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
When an exception occurs during the execution of the above task , Callbacks exceptionally Method , But if there's no exception , There will be no callback .
exceptionally Can swallow the exception , also fn The return value of will be returned .
Actually this exceptionally The method is a bit like degradation . When something goes wrong , Come to this callback , You can return a default value .
No exceptions :
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
return 100;
}).exceptionally(e -> {
System.out.println(" Something is wrong , Return default ");
return 110;
Execution results :
Under abnormal circumstances :
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
int i = 1 / 0;
return 100;
}).exceptionally(e -> {
System.out.println(" Something is wrong , Return default ");
return 110;
Execution results :
Something is wrong , Return default
It can receive tasks and execute normal and abnormal callbacks at the same time
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> actin);
The callback method specified by this kind of method will be called back no matter whether the previous task is executed successfully or fails .
handle : Follow exceptionally It's kind of like , however exceptionally It is only when an exception occurs that the callback , Both have return values , Can swallow abnormal , however handle Under normal circumstances, it can also callback .
whenComplete: Can accept normal or abnormal callback , And it does not affect the return value of the previous stage , That is, the main thread can get the return value of the previous stage ; When an exception occurs ,whenComplete You can't swallow this exception , That is to say, when the main thread obtains the result of executing the abnormal task , It throws an exception .
Here's a demonstration whenComplete Handle exception example situations ,handle Follow exceptionally The handling of exceptions is almost .
whenComplete Examples of handling exceptions :
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
int i = 1 / 0;
return 10;
}).whenComplete((r, e) -> {
System.out.println("whenComplete Is called the ");
Execution results :
whenComplete Is called the
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
5、 Merge the task results
public <U,V> CompletionStage<V> thenCombine
(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn);
This method means , Current tasks and other After the tasks are completed , Get the results of these two tasks , Callback BiFunction , Then return the new result .
thenCombine Please continue to look at the example of .
6、 With Async The way to end
Some methods mentioned above , for instance thenAccept Method , He has two corresponding Async The way to end , as follows :
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
thenAcceptAsync Follow thenAccept The main difference is thenAcceptAsync A thread will be restarted to perform the next stage of the task , and thenAccept Or use the thread of the task execution in the previous stage .
Two thenAcceptAsync The main difference is that a default thread pool is used to execute tasks , That is to say ForkJoinPool, One is to use the thread pool passed in by method parameters to execute tasks .
Except, of course, thenAccept Out of the way , There are many other methods mentioned above with Async The corresponding method of the end , The main difference between them is whether to start asynchronous threads to execute tasks .
Of course , There are others api, You can check it by yourself
CompletableFuture stay RocketMQ The use of
CompletableFuture stay RocketMQ There are many usage scenarios in , Here I give a scenario of message storage .
stay RocketMQ in ,Broker When receiving the message generated by the producer , The message will be persisted to disk and synchronized to the slave node . Persistence to disk and message synchronization to slave nodes are two independent tasks , Mutual interference , Can be executed independently of each other . When the task of message persistence to disk and synchronization to slave node is completed , It is necessary to count the time consumed by the whole message storage , Therefore, counting the time consumed by the whole storage message depends on the completion of the first two tasks .
The implementation code is as follows
Message storage disk brushing task and master-slave copy task :
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Submit the request for disk brushing
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
// Submit the request for master-slave replication
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg); // Brush set and Master slave copy Two asynchronous tasks pass thenCombine union
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
// When both disk brushing and master-slave copy tasks are completed , It's going to come back
// If the disk brushing fails , Then set the status of the message store to failed
if (flushStatus != PutMessageStatus.PUT_OK) {
// If master-slave replication fails , Then set the status of the message store to failed
if (replicaStatus != PutMessageStatus.PUT_OK) {
// Finally, the result of message storage is returned
return putMessageResult;
The execution results of the above two combined tasks pass thenAccept Method to monitor , Count the time consumption of message storage :
// Start time of message storage
long beginTime = this.getSystemClock().now();
// Store messages , Then return CompletableFuture, That is, the return value of the above code
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg); // Listen for the result of message storage
putResultFuture.thenAccept((result) -> {
// After the message is stored, it will call back
long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) {
log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); if (null == result || !result.isOk()) {
CompletableFuture The advantages of
1、 Asynchronous functional programming , Achieve elegance , Easy to maintain ;
2、 It provides a mechanism for exception management , Give you a chance to throw 、 Manage exceptions during asynchronous task execution , Listen for these exceptions ;
3、 Have the ability to arrange tasks . With this ability , You can easily organize the running sequence of different tasks 、 Rules and methods .
Reference resources :
If you find this article helpful , Please help me to praise 、 Looking at 、 Forward to more people , Thank you very much !
Previous popular articles recommend
Zookeeper Distributed lock implementation Curator Eleven questions
7000 word +24 This picture shows you a thorough understanding of thread pools
- Face slag counter attack :Spring Thirty five questions , 40000 words + Fifty pictures in detail ! Recommended collection !
Scan the code or search for official account Sanyou's java Diary , Dry the goods in time and don't miss it , Official account is dedicated to explaining technology through drawing pictures and easy to understand language , Make learning technology easier .
Catch all the asynchronous artifacts in one net CompletableFuture More articles about
- 🏆【Java Technology zone 】「 Concurrent programming topics 」 Teach you how to use asynchronous artifact CompletableFuture
A summary of the premises stay java8 before , We use java Multithreaded programming , Usually by Runnable Medium run Method to accomplish , This way, , There is an obvious disadvantage , Namely , no return value . Now , You may try to use Callable Medium ...
- [django]python Asynchronous artifact -celery
python Asynchronous artifact celery https://segmentfault.com/a/1190000007780963
- Asynchronous programming CompletableFuture Request merging for high concurrency system optimization
Let's start with the scene : according to Redis website , Stand-alone version Redis The read-write performance of 12 ten thousand / second , Batch processing can achieve 70 ten thousand / second . Whether it's a cache or a database , All have the function of batch processing . When our system reaches the bottleneck , We consider fully squeezing cache and ...
- Asynchronous programming CompletableFuture
Multithreading optimizes performance , Parallelization of serial operation Serial operation // following 2 Both are time-consuming operations doBizA(); doBizB(); Change to parallelization new Thread(() -> doBizA()).start() ...
- Java And contract asynchronous actuator CompletableFuture
Preface CompletableFuture It's right Future A powerful extension of ,Future Only by polling isDone() Method or call get() Block waiting to get the result of an asynchronous task , To proceed to the next step , When we execute ...
- How to write elegant asynchronous code — CompletableFuture
Preface In our consciousness , Synchronous execution of the program is more in line with people's way of thinking , Asynchronous things are usually not easy to handle . In the case of asynchronous computation , Actions represented by callbacks are often scattered in the code , It can also be nested inside each other , If you need to deal with one of the steps, you may find ...
- ES6 Series articles Asynchronous artifact async-await
About asynchronous processing ,ES5 We are in hell because of the callback ,ES6 Of Promise To free us from the demons , finally .ES7 Of async-await Take us to the light . Let's learn today async-await. async-await and ...
- Java8 Asynchronous orchestration class CompletableFuture
In order to prevent unscrupulous web crawlers from grabbing articles , This is to mark , Reprint please indicate the source of the article .LaplaceDemon/ShiJiaqi. https://www.cnblogs.com/shijiaqi1066/p/8758206 ...
- 《Java 8 in Action》Chapter 11:CompletableFuture: Combined asynchronous programming
The data of a website comes from Facebook.Twitter and Google, This requires websites and multiple on the Internet Web Service communications . But , You don't want to wait for a response from some services , Block the application from running , Waste billions of precious CPU Clock cycle ...
- Programming old driver takes you around CompletableFuture Asynchronous programming
This article starts from an example , Introduce CompletableFuture Basic usage . But more , Better practice it yourself . So I suggest that you guys finish reading , Practice on the computer , Fast grasp CompletableFuture. Personal blog ...
Random recommendation
- 《JavaScript Advanced programming ( The first 3 edition )》 note - order
I seldom read books , Don't like reading , It's mainly because I can't sit when I go to school , There's not much qualitative , A book can be read in two days , Flip freely , You can leave it in the corner for months without moving it . Last time I met <JavaScript Advanced programming ( The first 3 edition )> It really feels like ...
- spark Summary of learning notes -spark Refinement of introductory materials
Spark Learning notes Spark brief introduction spark It can be very easy to communicate with yarn combination , Call directly HDFS.Hbase The data above , and hadoop combination . It's easy to configure . spark Rapid development , Frame ratio hadoop More flexible and practical . ...
- One a day linux command (61):wget command
Linux System Medium wget It's a tool for downloading files , It's used on the command line . about Linux Users are essential tools , We often have to download some software or restore the backup from the remote server to the local server .wget Support HTTP,HTTPS and FTP ...
- Third question There are the following Student object , private String name; private int age; private int score; private String classNum; among ,classNum The class number of a student , for example “class05”. There are the following List List list = new ArrayList();
list.add(new Student("Tom", 18, 100, "class05")); list.add(new Student("Jer ...
- database Mark.2
select count(*) as count,DATE_SUB('2016-10-04',INTERVAL regDay DAY) from result_1005 group by DATE_S ...
- Nginx Important structures request_t Analytic http Access to requests
Please give the link of the original text in the obvious position of the article page , Otherwise, the right to pursue legal responsibility is reserved . The main reference of this paper is < In depth understanding of nginx Module development and architecture analysis > A Book , Processing the user request part , It's a reading note that contains the author's understanding . Welcome to correct , Discuss . ...
- [Javascript] Safe Nested Object Inspection
A common problem when dealing with some kinds of data is that not every object has the same nested s ...
- MAC OS Advanced must see —— this 10 There's a trick that changes your seconds MAC Talent show"
The content and pictures of the article come from : What is worth buying , If copyright is involved , Please contact the author for deletion Articles included in : Fengyun community ( Provide thousands of all kinds of products mac Software download ) Use mac The system has been around for years , Excellent office efficiency and increasing compatibility make mac Become ...
- The first week ch01 After class test
1.Amdahl The law states , We've made a major improvement in one part of the system , A significant speedup of the system can be achieved .(B) A . correct B . error analysis :Amdahl The laws of , The main idea of the law is , When we accelerate a part of the system , the ...
- POJ3422 Kaka's Matrix Travels 【 Cost stream 】*
POJ3422 Kaka's Matrix Travels Description On an N × N chessboard with a non-negative number in each ...