1. summary
CompletableFuture yes jdk1.8 The introduced implementation class . Expanded Future and CompletionStage, It is an operation that can trigger some operations in the task completion stage Future. In short, asynchronous callback can be implemented .
2. Why introduce CompletableFuture
about jdk1.5 Of Future, Although it provides the ability to process tasks asynchronously , But the way to get results is not elegant , It still needs to be blocked ( Or rotation training ) The way . How to avoid blocking ? In fact, it is to register callback .
The industry combines observer mode to realize asynchronous callback . That is, notify the observer when the task is completed . such as Netty Of ChannelFuture, The processing of asynchronous results can be realized by registering and listening .
Netty Of ChannelFuture
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
checkNotNull(listener, "listener");
synchronized (this) {
addListener0(listener);
}
if (isDone()) {
notifyListeners();
}
return this;
}
private boolean setValue0(Object objResult) {
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
if (checkNotifyWaiters()) {
notifyListeners();
}
return true;
}
return false;
}
adopt addListener Method register to listen . If the task is completed , Would call notifyListeners notice .
CompletableFuture By extending the Future, Introduce functional programming , Process the result through callback .
3. function
CompletableFuture The function of is mainly reflected in his CompletionStage.
It can realize the following functions
- transformation (thenCompose)
- Combine (thenCombine)
- consumption (thenAccept)
- function (thenRun).
- Consumption with return (thenApply)
The difference between consumption and operation :
Consumption uses execution results . Running is just running a specific task . You can check other functions according to your needs .
CompletableFuture With the help of CompletionStage The method can realize chain call . And you can choose synchronous or asynchronous methods .
Here is a simple example to experience its functions .
public static void thenApply() {
ExecutorService executorService = Executors.newFixedThreadPool(2);
CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
try {
// Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("supplyAsync " + Thread.currentThread().getName());
return "hello";
}, executorService).thenApplyAsync(s -> {
System.out.println(s + "world");
return "hhh";
}, executorService);
cf.thenRunAsync(() -> {
System.out.println("ddddd");
});
cf.thenRun(() -> {
System.out.println("ddddsd");
});
cf.thenRun(() -> {
System.out.println(Thread.currentThread());
System.out.println("dddaewdd");
});
}
Execution results
supplyAsync pool-1-thread-1
helloworld
ddddd
ddddsd
Thread[main,5,main]
dddaewdd
According to the results, we can see that the corresponding tasks will be executed in an orderly manner .
Be careful :
If it's synchronous execution cf.thenRun. His execution thread may main Threads , It may also be the thread executing the source task . If the thread executing the source task is main Complete the task before calling . that cf.thenRun The method will be main Thread calls .
Here's an explanation , If there are multiple dependent tasks of the same task :
If these dependent tasks are executed synchronously . So if these tasks are called by the current thread (main) perform , Is orderly execution , If it is executed by the thread executing the source task , Then it will be executed in reverse order . Because the internal task data structure is LIFO.
If these dependent tasks are executed asynchronously , Then he will execute the task through the asynchronous thread pool . There is no guarantee of the order in which the tasks will be performed .
The above conclusion is obtained by reading the source code . Now let's go deep into the source code .
3. Source tracking
establish CompletableFuture
There are many ways to create , Even directly new One . Let's take a look supplyAsync Methods created asynchronously .
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
static Executor screenExecutor(Executor e) {
if (!useCommonPool && e == ForkJoinPool.commonPool())
return asyncPool;
if (e == null) throw new NullPointerException();
return e;
}
Enter the reference Supplier, Function with return value . If it's an asynchronous method , And passed the actuator , Then the incoming actuator will be used to execute the task . Otherwise, use public ForkJoin Parallel thread pool , If parallelism is not supported , Create a new thread to execute .
Here we need to pay attention to ForkJoin The task is executed through the daemon thread . Therefore, there must be a non daemon thread .
asyncSupplyStage Method
static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
Supplier<U> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<U> d = new CompletableFuture<U>();
e.execute(new AsyncSupply<U>(d, f));
return d;
}
Here you will create a for returning CompletableFuture.
And then construct a AsyncSupply, And will create CompletableFuture Passed in as a construction parameter .
that , The execution of the task depends entirely on AsyncSupply.
AsyncSupply#run
public void run() {
CompletableFuture<T> d; Supplier<T> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
try {
d.completeValue(f.get());
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
d.postComplete();
}
}
1. This method is called Supplier Of get Method . And set the result to CompletableFuture in . We should be aware that these operations are invoked in asynchronous threads. .
2.d.postComplete The method is to notify the task execution to complete . Trigger the execution of subsequent dependent tasks , Which is implementation CompletionStage The key point of .
Looking at postComplete Before using the method, let's take a look at the logic of creating dependent tasks .
thenAcceptAsync Method
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
return uniAcceptStage(asyncPool, action);
}
private CompletableFuture<Void> uniAcceptStage(Executor e,
Consumer<? super T> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<Void> d = new CompletableFuture<Void>();
if (e != null || !d.uniAccept(this, f, null)) {
# 1
UniAccept<T> c = new UniAccept<T>(e, d, this, f);
push(c);
c.tryFire(SYNC);
}
return d;
}
As mentioned above .thenAcceptAsync It's for consumption CompletableFuture Of . This method calls uniAcceptStage.
uniAcceptStage Logic :
1. Construct a CompletableFuture, Mainly for chain call .
2. If it is an asynchronous task , Go straight back to . Because the asynchronous thread will be triggered to execute the corresponding logic after the end of the source task .
3. If it is a synchronization task (e==null), Would call d.uniAccept Method . This method is logical here : If the source task is completed , call f, return true. Otherwise enter if Code block (Mark 1).
4. If it is an asynchronous task, directly enter if(Mark 1).
Mark1 Logic :
1. Construct a UniAccept, Put it push Push . Through here CAS Realize optimistic lock .
2. call c.tryFire Method .
final CompletableFuture<Void> tryFire(int mode) {
CompletableFuture<Void> d; CompletableFuture<T> a;
if ((d = dep) == null ||
!d.uniAccept(a = src, fn, mode > 0 ? null : this))
return null;
dep = null; src = null; fn = null;
return d.postFire(a, mode);
}
1. Would call d.uniAccept Method . In fact, this method determines whether the source task is completed , If completed, execute dependent tasks , Otherwise return to false.
2. If the dependent task has been executed , call d.postFire, Mainly Fire The subsequent processing of . The logic is different according to different modes .
Let's talk about it briefly , Actually mode There are synchronous and asynchronous , And iteration . Iteration in order to avoid infinite recursion .
Let's emphasize here d.uniAccept The third parameter of the method .
If it's an asynchronous call (mode>0), Pass in null. Otherwise incoming this.
Look at the following code .c Not for null Would call c.claim Method .
try {
if (c != null && !c.claim())
return false;
@SuppressWarnings("unchecked") S s = (S) r;
f.accept(s);
completeNull();
} catch (Throwable ex) {
completeThrowable(ex);
}
final boolean claim() {
Executor e = executor;
if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
if (e == null)
return true;
executor = null; // disable
e.execute(this);
}
return false;
}
claim The method is logic :
If the asynchronous thread is null. Description synchronization , Then go straight back true. Finally, the upper function will call f.accept(s) Synchronize tasks .
If the asynchronous thread is not null, Then use asynchronous threads to execute this.
this Of run The tasks are as follows . That is to say, it is called synchronously in asynchronous thread tryFire Method . Achieve its purpose of being executed by asynchronous threads .
public final void run() { tryFire(ASYNC); }
After reading the logic above , We basically understand the logic of dependent tasks .
In fact, first judge whether the source task is completed , If completed , Directly execute the task in the corresponding thread ( If it's synchronization , Then process... In the current thread , Otherwise, in asynchronous thread processing )
If the task is not completed , Go straight back to , Because when the task is completed, it will pass postComplete To trigger and invoke dependent tasks .
postComplete Method
final void postComplete() {
/*
* On each step, variable f holds current dependents to pop
* and run. It is extended along only one path at a time,
* pushing others to avoid unbounded recursion.
*/
CompletableFuture<?> f = this; Completion h;
while ((h = f.stack) != null ||
(f != this && (h = (f = this).stack) != null)) {
CompletableFuture<?> d; Completion t;
if (f.casStack(h, t = h.next)) {
if (t != null) {
if (f != this) {
pushStack(h);
continue;
}
h.next = null; // detach
}
f = (d = h.tryFire(NESTED)) == null ? this : d;
}
}
}
After the source task is completed, it will call .
In fact, the logic is very simple , Is the dependent task of the iteration stack . call h.tryFire Method .NESTED To avoid recursive loops . because FirePost Would call postComplete. If it is NESTED, Then... Is not called .
The contents of the stack are actually added when the dependent task is created . We have mentioned above .
4. summary
Basically, the above source code has analyzed the logic .
Because it involves operations such as asynchrony , We need to trim ( This is for fully asynchronous tasks ):
1. establish CompletableFuture After success, the corresponding task will be executed through asynchronous thread .
2. If CompletableFuture And dependent tasks ( asynchronous ), Will add tasks to CompletableFuture Save your stack . For the execution of dependent tasks after subsequent completion .
Of course , Creating dependent tasks is not just adding them to the stack . If the source task has been executed when the dependent task is created , Then the current thread will trigger the asynchronous thread of the dependent task to process the dependent task directly . And it will tell the stack that other dependent tasks and source tasks have been completed .
Mainly consider the reuse of code . So logic is relatively difficult to understand .
postComplete Method is called after the source task thread executes the source task. . It can also be invoked after relying on task threads. .
The method of performing dependent tasks mainly depends on tryFire Method . Because this method may be triggered by many different types of threads , So the logic goes around a little .( Other dependent task threads 、 Source task thread 、 The current dependent task thread )
If it is the current dependent task thread , Then it will perform dependent tasks , And will notify other dependent tasks .
If it is a source task thread , And other dependent task threads , Then the task is transferred to the dependent thread to execute . There is no need to notify other dependent tasks , Avoid dead recursion .
Have to say Doug Lea The coding , It's really art . The reusability of code is fully reflected in logic .
link :https://blog.csdn.net/weixin_...
Last , To share with you some of my collection of good github project , The content is still good , If it helps , By the way star.
- Recommended books that computer majors must chew : https://github.com/hello-go-maker/cs-books
- Java Actual combat project recommendation : https://github.com/hello-go-maker/Java-project
- Recommend some good computer learning tutorials , Include : data structure 、 Algorithm 、 computer network 、 operating system 、Java(spring、springmvc、springboot、springcloud), It also includes several enterprise level practical projects : https://github.com/hello-go-maker/cs-learn-source
- Java interview +Java Back end technology learning guide 】: An interview guide to the ideal Internet company , Include Java, Technical interview must have basic knowledge 、Leetcode、 Computer operating system 、 computer network 、 The system design 、 Distributed 、 database (MySQL、Redis)、Java Project practice, etc : https://github.com/hello-java-maker/JavaInterview