I have been swimming in RocketMQ In the source code , Found in RocketMQ It is used in many places CompletableFuture, So let's talk with you today JDK1.8 Asynchronous artifact provided CompletableFuture, And will finally combine RocketMQ Source code analysis CompletableFuture Use .
Future Interface and its limitations
We all know ,Java There are two main ways to create threads in , Inherit Thread Or realize Runnable Interface . But both of them have a common shortcoming , That is, the result of thread execution cannot be obtained , That is, there is no return value . So in JDK1.5 In the future, in order to solve the problem of no return value , Provides Callable and Future Interface and Future The corresponding implementation class FutureTask, adopt FutureTask You can get the result of asynchronous execution .
So , We want to start asynchronous threads , Perform tasks , To get the results , This can be achieved .
FutureTask<String> futureTask = new FutureTask<>(() -> " Three friends "); new Thread(futureTask).start(); System.out.println(futureTask.get());
Or use thread pool
ExecutorService executorService = Executors.newSingleThreadExecutor(); Future<String> future = executorService.submit(() -> " Three friends "); System.out.println(future.get()); executorService.shutdown();
The thread pool bottom layer will also be committed Callable The implementation of is encapsulated into FutureTask, And then through execute Method to submit a task , Execute asynchronous logic .
Future Work, limitations
Though through Future Interface get Method can obtain the result of asynchronous execution of a task , however get Method will block the main thread , That is, the asynchronous task is not completed , The main thread will always block , Until the end of the mission .
Future Also provided isDone Method to check whether the asynchronous thread task execution is completed , If completed , You can get the execution result of the task , The code is as follows .
ExecutorService executorService = Executors.newSingleThreadExecutor(); Future<String> future = executorService.submit(() -> " Three friends "); while (!future.isDone()) { // Has the task been completed , If not, continue to judge circularly } System.out.println(future.get()); executorService.shutdown();
But this kind of polling checks the execution status of asynchronous thread tasks , It's also very consuming cpu resources .
At the same time, it deals with some complex asynchronous operation tasks , Various synchronization components may be required to complete it together .
therefore , As can be seen from the above introduction ,Future There are still strong limitations in the process of use , So in order to solve this limitation , stay JDK1.8 When ,Doug Lea The great God provides us with a more powerful class CompletableFuture.
What is? CompletableFuture?
CompletableFuture stay JDK1.8 Provides a more powerful asynchronous programming api. It has achieved Future Interface , That is to say Future Functional characteristics of CompletableFuture Also have ; besides , It also achieved CompletionStage Interface ,CompletionStage Interface defines the method of task arrangement , Perform a certain stage , You can go down to the next stage .
CompletableFuture Compared with Future The biggest improvement is to provide a callback monitoring function similar to the observer mode , That is, when the task of the previous stage is completed , You can call back the tasks of the next stage specified by you , There is no need to block the results obtained before processing the results .
CompletableFuture common api Detailed explanation
CompletableFuture Methods api many , But it can be mainly divided into the following categories .
1、 Instantiation CompletableFuture
Constructors create
CompletableFuture<String> completableFuture = new CompletableFuture<>(); System.out.println(completableFuture.get());
At this time, if there are other threads executing the following code , You can print out Three friends
completableFuture.complete(" Three friends ")
Static method creation
In addition to using construction methods to construct ,CompletableFuture Static methods are also provided to create
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier); public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor); public static CompletableFuture<Void> runAsync(Runnable runnable); public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
supply and run The main difference is supply You can have a return value ,run no return value . As for another parameter Executor Is the thread pool used to execute asynchronous tasks , If you don't pass Executor Words , The default is ForkJoinPool The implementation of this thread pool .
Once constructed by static methods , Will immediately start asynchronous thread execution Supplier perhaps Runnable Submitted tasks .
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> " Three friends "); System.out.println(completableFuture.get());
Once the task is completed , You can print the return value , The usage here is the same as Future It's the same .
So compare two instantiation methods , The main difference between using static methods and using construction methods is , Using construction methods requires other threads to actively call complete To indicate the completion of task execution , Because it's very simple , Because asynchronous tasks are not executed during construction , Therefore, other threads are required to actively call complete To indicate the completion of task execution .
2、 Gets the result of the task execution
public T get(); public T get(long timeout, TimeUnit unit); public T getNow(T valueIfAbsent); public T join();
get() and get(long timeout, TimeUnit unit) Is to implement the Future Function of interface , The main difference between the two is get() It will block until the result is obtained ,get(long timeout, TimeUnit unit) Value can specify the timeout , When it's time to get the task , Will throw TimeoutException abnormal .
getNow(T valueIfAbsent): Is to get the execution result of the task , But there will be no blockage . If the task has not been completed , Then it will return your incoming valueIfAbsent Parameter values , If the execution is completed , The result of task execution will be returned .
join(): Follow get() The main difference is ,get() Will throw an exception during the check ,join() Can't .
3、 Actively trigger task completion
public boolean complete(T value); public boolean completeExceptionally(Throwable ex);
complete: Actively trigger the completion of the current asynchronous task . When calling this method, if your task has been completed , Then the method will return false; If the task is not completed , It will return true, And the result of the task obtained by other threads is complete Parameter values for .
completeExceptionally: Follow complete It's about the same ,complete Is the normal end of the task , Return results , and completeExceptionally Is the exception that triggers task execution .
4、 Carry out the next step of processing the task execution results
You can only receive the callback after the normal execution of the task
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn); public CompletableFuture<Void> thenRun(Runnable action); public CompletionStage<Void> thenAccept(Consumer<? super T> action);
The characteristic of this kind of callback is , When the task is completed normally , When there is no exception, it will call back .
thenApply: You can get the result of the task execution in the previous step for processing , And return the processing result thenRun: You can't get the result of the previous task , But it will carry out Runnable Interface implementation thenAccept: You can get the result of the task execution in the previous step for processing , But there is no need to return the result of processing
thenApply Example :
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> 10) .thenApply(v -> (" The result of the previous step is :" + v)); System.out.println(completableFuture.join());
Execution results :
The result of the previous step is :10
thenRun Example :
CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> 10) .thenRun(() -> System.out.println(" The previous step is completed "));
Execution results :
The previous step is completed
thenAccept Example :
CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> 10) .thenAccept(v -> System.out.println(" The previous step is completed , The result is :" + v));
Execution results :
The previous step is completed , The result is :10
thenApply There are abnormal examples :
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { // Simulate anomalies int i = 1 / 0; return 10; }).thenApply(v -> (" The result of the previous step is :" + v)); System.out.println(completableFuture.join());
Execution results :
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
When there is an exception, it will not be recalled
You can only receive callbacks after the task handles exceptions
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
When an exception occurs during the execution of the above task , Callbacks exceptionally Method , But if there's no exception , There will be no callback .
exceptionally Can swallow the exception , also fn The return value of will be returned .
Actually this exceptionally The method is a bit like degradation . When something goes wrong , Come to this callback , You can return a default value .
No exceptions :
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { return 100; }).exceptionally(e -> { System.out.println(" Something is wrong , Return default "); return 110; }); System.out.println(completableFuture.join());
Execution results :
100
Under abnormal circumstances :
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { int i = 1 / 0; return 100; }).exceptionally(e -> { System.out.println(" Something is wrong , Return default "); return 110; }); System.out.println(completableFuture.join());
Execution results :
Something is wrong , Return default 110
It can receive tasks and execute normal and abnormal callbacks at the same time
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn); public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> actin);
The callback method specified by this kind of method will be called back no matter whether the previous task is executed successfully or fails .
handle : Follow exceptionally It's kind of like , however exceptionally It is only when an exception occurs that the callback , Both have return values , Can swallow abnormal , however handle Under normal circumstances, it can also callback .
whenComplete: Can accept normal or abnormal callback , And it does not affect the return value of the previous stage , That is, the main thread can get the return value of the previous stage ; When an exception occurs ,whenComplete You can't swallow this exception , That is to say, when the main thread obtains the result of executing the abnormal task , It throws an exception .
Here's a demonstration whenComplete Handle exception example situations ,handle Follow exceptionally The handling of exceptions is almost .
whenComplete Examples of handling exceptions :
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { int i = 1 / 0; return 10; }).whenComplete((r, e) -> { System.out.println("whenComplete Is called the "); }); System.out.println(completableFuture.join());
Execution results :
whenComplete Is called the Exception in thread "main" java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
5、 Merge the task results
public <U,V> CompletionStage<V> thenCombine (CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
This method means , Current tasks and other After the tasks are completed , Get the results of these two tasks , Callback BiFunction , Then return the new result .
thenCombine Please continue to look at the example of .
6、 With Async The way to end
Some methods mentioned above , for instance thenAccept Method , He has two corresponding Async The way to end , as follows :
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor); public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
thenAcceptAsync Follow thenAccept The main difference is thenAcceptAsync A thread will be restarted to perform the next stage of the task , and thenAccept Or use the thread of the task execution in the previous stage .
Two thenAcceptAsync The main difference is that a default thread pool is used to execute tasks , That is to say ForkJoinPool, One is to use the thread pool passed in by method parameters to execute tasks .
Except, of course, thenAccept Out of the way , There are many other methods mentioned above with Async The corresponding method of the end , The main difference between them is whether to start asynchronous threads to execute tasks .
Of course , There are others api, You can check it by yourself
CompletableFuture stay RocketMQ The use of
CompletableFuture stay RocketMQ There are many usage scenarios in , Here I give a scenario of message storage .
stay RocketMQ in ,Broker When receiving the message generated by the producer , The message will be persisted to disk and synchronized to the slave node . Persistence to disk and message synchronization to slave nodes are two independent tasks , Mutual interference , Can be executed independently of each other . When the task of message persistence to disk and synchronization to slave node is completed , It is necessary to count the time consumed by the whole message storage , Therefore, counting the time consumed by the whole storage message depends on the completion of the first two tasks .
The implementation code is as follows
Message storage disk brushing task and master-slave copy task :
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result); // Submit the request for disk brushing CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg); // Submit the request for master-slave replication CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg); // Brush set and Master slave copy Two asynchronous tasks pass thenCombine union return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> { // When both disk brushing and master-slave copy tasks are completed , It's going to come back // If the disk brushing fails , Then set the status of the message store to failed if (flushStatus != PutMessageStatus.PUT_OK) { putMessageResult.setPutMessageStatus(flushStatus); } // If master-slave replication fails , Then set the status of the message store to failed if (replicaStatus != PutMessageStatus.PUT_OK) { putMessageResult.setPutMessageStatus(replicaStatus); } // Finally, the result of message storage is returned return putMessageResult; });
The execution results of the above two combined tasks pass thenAccept Method to monitor , Count the time consumption of message storage :
// Start time of message storage long beginTime = this.getSystemClock().now(); // Store messages , Then return CompletableFuture, That is, the return value of the above code CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg); // Listen for the result of message storage putResultFuture.thenAccept((result) -> { // After the message is stored, it will call back long elapsedTime = this.getSystemClock().now() - beginTime; if (elapsedTime > 500) { log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length); } this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); if (null == result || !result.isOk()) { this.storeStatsService.getPutMessageFailedTimes().add(1); } });
CompletableFuture The advantages of
1、 Asynchronous functional programming , Achieve elegance , Easy to maintain ;
2、 It provides a mechanism for exception management , Give you a chance to throw 、 Manage exceptions during asynchronous task execution , Listen for these exceptions ;
3、 Have the ability to arrange tasks . With this ability , You can easily organize the running sequence of different tasks 、 Rules and methods .
Reference resources :
[1]https://zhuanlan.zhihu.com/p/344431341
If you find this article helpful , Please help me to praise 、 Looking at 、 Forward to more people , Thank you very much !
Previous popular articles recommend
Zookeeper Distributed lock implementation Curator Eleven questions
7000 word +24 This picture shows you a thorough understanding of thread pools
- Face slag counter attack :Spring Thirty five questions , 40000 words + Fifty pictures in detail ! Recommended collection !
Scan the code or search for official account Sanyou's java Diary , Dry the goods in time and don't miss it , Official account is dedicated to explaining technology through drawing pictures and easy to understand language , Make learning technology easier .