当前位置:网站首页>Completabilefuture asynchronous task choreography usage and explanation

Completabilefuture asynchronous task choreography usage and explanation

2022-06-11 05:53:00 Endwas

At work , Multiple services or methods are often called to obtain different data , If the traditional method is to acquire one by one in series , Then encapsulate and return to . We can try it CompletableFuture, Delegate multiple operations to asynchronous threads for execution , Then the main thread waits for the longest task to complete , Return all results together .

Future limitations

When we get a result containing Future when , We can use get Method waits for the thread to complete and gets the return value , But we all know future.get() yes Blocking Methods , It will wait until the thread finishes executing to get the return value . We can see FutureTask Medium get Method , It is to loop the code until the thread executes and returns .

  /** * Awaits completion or aborts on interrupt or timeout. * * @param timed true if use timed waits * @param nanos time to wait, if timed * @return state upon completion */
    private int awaitDone(boolean timed, long nanos) throws InterruptedException {
    
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
    
        	// loop   Omit code 
        	...
        }

Let's consider a scenario , If we finish the asynchronous task 1 You need to get the return value , Then use the return value to make other asynchronous calls 2, Then we need to wait for asynchronous tasks when the main thread is blocked 1 complete , Then the asynchronous task is executed by 2, Then continue to block the waiting task 2 Return . This is not only Block main thread , and Poor performance .

CompletableFuture Introduce

What is? CompletableFuture:CompletableFuture Combined with the Future The advantages of , Provides a very powerful Future Extension of , Can help us simplify the complexity of asynchronous programming , Provides Functional programming The ability of , The calculation result can be processed by callback , And provides transformations and combinations CompletableFuture Methods .

Functional programming : Use Functional Interface As a parameter , You can use lambda The expression suggests implementing , Convenient programming , I've explained it before .

Common auxiliary methods

  1. isCompletedExceptionally: The CompletableFuture Whether it ends abnormally .
//  Including canceling cancel、 Display call completeExceptionally、 interrupt .
public boolean isCompletedExceptionally() {
    
        Object r;
        return ((r = result) instanceof AltResult) && r != NIL;
    }
  1. isCancelled: The CompletableFuture Whether it is cancelled before the normal execution is completed .
 public boolean isCancelled() {
    
        Object r;
        //  Determine whether the exception is CancellationException
        return ((r = result) instanceof AltResult) &&
            (((AltResult)r).ex instanceof CancellationException);
 }
  1. isDone: The CompletableFuture Whether the execution has ended, including exception generation and cancellation .
 public boolean isDone() {
    
       return result != null;
 }
  1. get: Blocking access CompletableFuture result
 public T get() throws InterruptedException, ExecutionException {
    
        Object r;
        return reportGet((r = result) == null ? waitingGet(true) : r);
 }
  1. join: Blocking access CompletableFuture result
 public T join() {
    
        Object r;
        return reportJoin((r = result) == null ? waitingGet(false) : r);
 }

join() And get() The difference lies in join() Returns the result of the calculation or throws a unchecked abnormal (CompletionException), and get() Returns a specific exception .

CompletableFuture structure

CompletableFuture Construction method with or without parameters , What is created at this time is incomplete CompletableFuture. Use get Will always block the main thread .

So we usually use static methods to create instances .

//  No input parameter has a return value 
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
//  No input parameter, no return value , Simple execution 
public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

We noticed that each method has a refactoring method .Executor Parameters can be manually specified thread pool , Otherwise the default ForkJoinPool.commonPool() System level common thread pool .


【 Be careful 】: default commonPool These threads are daemon threads . We need to use guard threads carefully when programming , If we set our ordinary user thread as a daemon thread , When the main thread of our program ends ,JVM There are no other user threads in the , that CompletableFuture The daemon thread will exit directly , Problems that make the task impossible !!

CompletableFuture Commonly used Api

We will explain later api There are generally three similar approaches . I'll just demonstrate the third .

  1. xxx(method);
  2. xxxAsync(method);
  3. xxxAsync(method, executor)
    The difference between the three is that the first synchronous execution is performed by the main thread , The second type of asynchronous handover is the default thread pool , The third type of asynchronous handover creates a thread pool .

1. structure CompletableFuture example

Now we will CompletableFuture To try to use . Start with the basic build .

/** * 1.run + runAsync + supply + supplyAsync */
/** 1. Asynchronous running is left to the default thread pool (forkjoinpool) No input parameter, no return value run * 2. Asynchronous running is left to the creation of thread pools (threadPoolExecutor) No input parameter, no return value run */
CompletableFuture<Void> run = CompletableFuture.runAsync(() ->
        System.out.println("completablefuture runs asynchronously"));

CompletableFuture<Void> runCustomize = CompletableFuture.runAsync(() ->
        System.out.println("completablefuture runs asynchronously in customize threadPool"
        ), threadPoolExecutor);


//================ Here is supply===================
/** 1. Asynchronous running is left to the default thread pool (forkjoinpool) No input parameter has a return value supply * 2. Asynchronously run the thread pool created by yourself   No input parameter has a return value supply */
CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
    
    System.out.println("completablefuture supplys asynchronously");
    return "success";
});

CompletableFuture<String> supplyCustomize = CompletableFuture.supplyAsync(() -> {
    
    System.out.println("completablefuture supplys asynchronously in customize threadPool");
    return "success";
}, threadPoolExecutor);

result

completablefuture runs asynchronously
completablefuture runs asynchronously in customize threadPool
completablefuture supplys asynchronously
completablefuture supplys asynchronously in customize threadPool

The first two CompletableFuture It's not worth it , So when the subsequent chain calls want to use, the input parameters are null Of .

2.whenComplete

Consider when we are CompletableFuture At the end of the execution , Hope to get the implementation results 、 Or abnormal , Then do further processing for the result or exception . Then we need to use whenComplete.

  /** *  Participation is BiConsumer, The first parameter is the result of the previous step 、 The second is the exception executed in the previous step  */
 CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
    
          int a = 1/0;
          System.out.println("completablefuture supplys asynchronously");
          return "success";
 });
 CompletableFuture<String> complete = supply.whenCompleteAsync((result, throwable) -> {
    
     System.out.println("whenComplete: " + result + " throws " + throwable);
 });

result:

whenComplete: null throws java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero

3.handle

handle and whenComplete The input parameters are the same , But it can return after execution Execution results , and whenComplete Can only process and cannot return .

public <U> CompletableFuture<U>     handle(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U>     handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U>     handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)

example:

/** *  Participation is BiFunction  The first parameter is the result of the previous step 、 The second is the exception executed in the previous step  *  The return value can be of any type  */
CompletableFuture<String> handleAsyncCustomize = supply.handleAsync((a, throwable) -> {
    
            System.out.println("handleAsyncCustomize: " + a + " throws " + throwable);
            return "handleAsync success";
}, threadPoolExecutor);

System.out.println(handleAsyncCustomize.get());

result:

handleAsyncCustomize: success throws null
handleAsync success

4.thenApply

thenApply And again handle Is very similar , There are both input parameters and return values , But he has only one participant , Unable to handle the exception of the previous asynchronous task . If something unusual happens get I will make a mistake .

public <U> CompletableFuture<U>     thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
/** * Function  The input parameter is the result of the first asynchronous task execution 、 The output parameter is the return value  *  If asynchronous task 1 abnormal   be 2 Unable to execute  get Will report a mistake  */
 CompletableFuture<String> supplyCustomize = CompletableFuture.supplyAsync(() -> {
    
     System.out.println("completablefuture supplys asynchronously in customize threadPool");
     int a = 1/0;
     return "success";
 }, threadPoolExecutor);

 CompletableFuture<String> applyAsyncCustomize = supplyCustomize.thenApplyAsync(a -> {
    
     System.out.println("applyAsyncCustomize " + a);
     return "success";
 }, threadPoolExecutor);
 System.out.println(applyAsyncCustomize.get());

result:

Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
	at CompletableFutureTask.main.main(main.java:48)
Caused by: java.lang.ArithmeticException: / by zero
	at CompletableFutureTask.main.lambda$main$3(main.java:40)
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

5.thenAccept

thenAccept Yes, there is an input parameter but no return value , If you continue the chain call, the next asynchronous task will get null value .

public CompletableFuture<Void>  thenAccept(Consumer<? super T> action)
public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action, Executor executor)

example:

/** * Consumer The first input parameter is the result returned in the previous step , If no result is returned, it will be null *  No return value  */
CompletableFuture<String> supplyCustomize = CompletableFuture.supplyAsync(() -> {
    
   System.out.println("completablefuture supplys asynchronously in customize threadPool");
    return "success";
}, threadPoolExecutor);

CompletableFuture<Void> applyAsyncCustomize = supplyCustomize.thenAcceptAsync(a ->{
    
    System.out.println("accept " + a);
}, threadPoolExecutor);
System.out.println(applyAsyncCustomize.get());

result:

completablefuture supplys asynchronously in customize threadPool
accept success
null

6.thenCompose

and thenCombine Somewhat different ,thenCombine It's a combination of two CompletableFuture The returned results are processed asynchronously , and thenCompose Is based on the first returned result , Encapsulated in new CompletableFuture return

example:

CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
    
     System.out.println("completablefuture supplys asynchronously");
     return "success";
 });
 CompletableFuture<String> future = supply.thenComposeAsync(a -> {
    
     String name = a + " or fail";
     return CompletableFuture.supplyAsync(() -> {
    
         return name;
     });
 });
 System.out.println(future.get());

result:

completablefuture supplys asynchronously in customize threadPool
success or fail

7.exceptionally

Handling when an exception occurs , Note that the return value type after the exception needs to be the same as that of the exception CF The return value type is consistent , It is equivalent to the idea of service degradation .

example:

/** *  When an exception occurs , return 0 */
CompletableFuture<Integer> supplyCustomize = CompletableFuture.supplyAsync(() -> {
    
     int a = 1/0;
     System.out.println("completablefuture supplys asynchronously in customize threadPool");
     return 2;
 }, threadPoolExecutor).exceptionally(a->{
    
     System.out.println(a);
     return 0;
 });

 System.out.println(supplyCustomize.get());

result:

completablefuture runs asynchronously in customize threadPool
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
0

CompletableFuture combined Api- The task is complete

1.thenCombine

thenCombine It's a combined task , above CompletableFuture The use is done in a chain , When you finish the first , Make the next asynchronous call according to the first execution result , Combined asynchronism , Two asynchronous tasks can be completely independent , Only when they are all finished will they continue to execute .

example:

/** * BiFunction Enter the reference , Two CF As input parameter , There is a return value  */
CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
    
	System.out.println("completablefuture supplys asynchronously");
    return "success";
});
//  Asynchronous running is left to the default thread pool (forkjoinpool) No input parameter has a return value supply
CompletableFuture<String> supplyCustomize = CompletableFuture.supplyAsync(() -> {
    
    System.out.println("completablefuture supplys asynchronously in customize threadPool");
    return "success";
}, threadPoolExecutor);

CompletableFuture<String> future = supply.thenCombine(supplyCustomize, (a, b) -> {
    
    return "thenCombine result: " + a + "-" + b;
});
System.out.println(future.get());

result:

completablefuture supplys asynchronously
completablefuture supplys asynchronously in customize threadPool
thenCombine result: success-success

2.thenAcceptBoth

thenAcceptBoth and thenCombine The difference is that there is no return value , Put two CF Return value for processing , no return value

example:

/** *  Accept CF、BiConsumer, No return value  */
CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
    
    System.out.println("completablefuture supplys asynchronously");
    return "success";
});
//  Asynchronous running is left to the default thread pool (forkjoinpool) No input parameter has a return value supply
CompletableFuture<String> supplyCustomize = CompletableFuture.supplyAsync(() -> {
    
    System.out.println("completablefuture supplys asynchronously in customize threadPool");
    return "success";
}, threadPoolExecutor);

CompletableFuture<Void> bothAsync = supplyCustomize.thenAcceptBothAsync(supply, (a, b) -> {
    
    System.out.println("thenAcceptBoth result " + a + "-" + b);
}, threadPoolExecutor);

result:

completablefuture supplys asynchronously
completablefuture supplys asynchronously in customize threadPool
thenAcceptBoth result success-success
null

3.runAfterBoth

No input parameter, no return value , It will only be executed after the previous two runs are completed runAfterBoth Methods , We can be anywhere CF Simulate long-time running to test .

example:

CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
    
     System.out.println("completablefuture supplys asynchronously");
     return "success";
 });
 //  Asynchronous running is left to the default thread pool (forkjoinpool) No input parameter has a return value supply
 CompletableFuture<String> supplyCustomize = CompletableFuture.supplyAsync(() -> {
    
     System.out.println("completablefuture supplys asynchronously in customize threadPool");
     return "success";
 }, threadPoolExecutor);
 CompletableFuture<Void> bothAsync = supply.runAfterBothAsync(supplyCustomize, () -> {
    
     System.out.println("run After Both Async");
 });

result:

completablefuture supplys asynchronously
completablefuture supplys asynchronously in customize threadPool
run After Both Async

CompletableFuture combined Api- Complete any task

1.acceptEither

and thenAcceptBoth Corresponding , Two CompletableFuture Any execution is completed , Will continue to the next asynchronous task

example:

/** *  Task one takes a long time , So when task two is finished, it will be executed acceptEitherAsync *  Parameters  Consumer  The input parameter returns the value for executing the completed task  */
CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
    
    try {
    
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
    
        e.printStackTrace();
    }
    System.out.println("completablefuture supplys asynchronously");
    return "success1";
CompletableFuture<String> supplyCustomize = CompletableFuture.supplyAsync(() -> {
    
    System.out.println("completablefuture supplys asynchronously in customize threadPool");
    return "success2";
}, threadPoolExecutor);
CompletableFuture<Void> future = supply.acceptEitherAsync(supplyCustomize, a -> {
    
    System.out.println("acceptEither result " + a);
});

result:

completablefuture supplys asynchronously in customize threadPool
acceptEither result success2
completablefuture supplys asynchronously

2.applyToEither

Corresponding thenCombine, Two CompletableFuture Any execution is completed , Will continue to the next asynchronous task

example:

/** *  Parameter is Function, There are both input parameters and return values  */
CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
    
    try {
    
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
    
        e.printStackTrace();
    }
    System.out.println("completablefuture supplys asynchronously");
    return "success1";
});
//  Asynchronous running is left to the default thread pool (forkjoinpool) No input parameter has a return value supply
CompletableFuture<String> supplyCustomize = CompletableFuture.supplyAsync(() -> {
    
    System.out.println("completablefuture supplys asynchronously in customize threadPool");
    return "success2";
}, threadPoolExecutor);
CompletableFuture<String> future = supply.applyToEither(supplyCustomize, a -> {
    
    System.out.println("acceptEither result " + a);
    return "appleTo " + a;
});

3.runAfterEither

Corresponding runAfterBoth, Two CompletableFuture Any execution is completed , Will continue to the next asynchronous task

/** * runnable  No input parameter, no return value  */
CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
    
    try {
    
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
    
        e.printStackTrace();
    }
    System.out.println("completablefuture supplys asynchronously");
    return "success1";
});
//  Asynchronous running is left to the default thread pool (forkjoinpool) No input parameter has a return value supply
CompletableFuture<String> supplyCustomize = CompletableFuture.supplyAsync(() -> {
    
    System.out.println("completablefuture supplys asynchronously in customize threadPool");
    return "success2";
}, threadPoolExecutor);
CompletableFuture<Void> future = supply.runAfterEitherAsync(supplyCustomize, () -> {
    
    System.out.println("runAfterEither result ");
});

CompletableFuture And allOf|anyOf

1.allOf

All completed get Method to get the return value , The return value is null, Each task return value needs to call each CompletableFuture.

example:

/** *  When all the tasks are completed , return null, Return value of each task , You need to call a specific  *  Asynchronous task acquisition  */
CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
    
    try {
    
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
    
        e.printStackTrace();
    }
    System.out.println("completablefuture supplys asynchronously");
    return "success1";
});
CompletableFuture<String> supplyCustomize = CompletableFuture.supplyAsync(() -> {
    
    System.out.println("completablefuture supplys asynchronously in customize threadPool");
    return "success2";
}, threadPoolExecutor);
CompletableFuture<Void> allOf = CompletableFuture.allOf(supply, supplyCustomize);
allOf.get();
System.out.println(supplyCustomize.get() + "=>" + supply.get());

result:

completablefuture supplys asynchronously in customize threadPool
completablefuture supplys asynchronously
success2==success1

2.anyOf

/** *  The result of any first execution will be returned first  */
CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
    
try {
    
    TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
    
    e.printStackTrace();
}
	System.out.println("completablefuture supplys asynchronously");
	return "success1";
});
//  Asynchronous running is left to the default thread pool (forkjoinpool) No input parameter has a return value supply
CompletableFuture<Integer> supplyCustomize = CompletableFuture.supplyAsync(() -> {
    
	System.out.println("completablefuture supplys asynchronously in customize threadPool");
	return 2;
}, threadPoolExecutor);
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(supply, supplyCustomize);
System.out.println(anyOf.get());

result: Execute first, so return first 2

completablefuture supplys asynchronously in customize threadPool
2
completablefuture supplys asynchronously

summary

CompletableFuture Rich asynchronous invocation methods , It can help us avoid using mainthreads to link multiple asynchronous tasks , Improve program performance , Faster response . At the same time, excellent exception handling mechanism , It can also be solved perfectly in the process of asynchronous call errors .

原网站

版权声明
本文为[Endwas]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/03/202203020533117256.html