当前位置:网站首页>Use of completable future

Use of completable future

2022-07-07 06:59:00 Ghost punishment OLO

CompletableFuture The use of,

Future The limitations of , He can't directly combine multiple tasks in a chain , It needs the help of concurrent tool classes to complete , Complex implementation logic

and CompletableFutures It's right Future Expansion and enhancement of ,CompltableFuture Realization Future Interface , On this basis, it has carried out rich expansion , It makes up for Future The limitations of , meanwhile CompletableFuture Realize the ability of arranging tasks . With this ability , It can easily organize the running sequence of different tasks , Rules and methods , In a way , This ability is his core ability . And in the past , Though through CountDownlatch And other tool classes can also realize the arrangement of tasks , But it requires complex logical processing

ComplatableFuture The infrastructure of is as follows

 Insert picture description here

CompletionStage Interface defines the method of task arrangement , Perform a certain stage , You can go down to the next stage . Executed asynchronously

The default thread pool is **ForkJoinPool.commonPool(), But in order not to affect each other , And easy to locate the problem , Custom thread pools are highly recommended **.

CompletableFuture The default Thread pool as follows :

//  according to commonPool To choose the degree of parallelism , The calculation of parallelism is in ForkJoinPool Static code snippets complete 
private static final boolean useCommonPool =
    (ForkJoinPool.getCommonPoolParallelism() > 1);

private static final Executor asyncPool = useCommonPool ?
    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

ForkJoinPool In the initialization commonPool Parameters of

static {
    // initialize field offsets for CAS etc
    try {
        U = sun.misc.Unsafe.getUnsafe();
        Class<?> k = ForkJoinPool.class;
        CTL = U.objectFieldOffset
            (k.getDeclaredField("ctl"));
        RUNSTATE = U.objectFieldOffset
            (k.getDeclaredField("runState"));
        STEALCOUNTER = U.objectFieldOffset
            (k.getDeclaredField("stealCounter"));
        Class<?> tk = Thread.class;
        ……
    } catch (Exception e) {
        throw new Error(e);
    }

    commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
    defaultForkJoinWorkerThreadFactory =
        new DefaultForkJoinWorkerThreadFactory();
    modifyThreadPermission = new RuntimePermission("modifyThread");

    //  call makeCommonPool Method creation commonPool, The parallelism is the number of logical cores -1
    common = java.security.AccessController.doPrivileged
        (new java.security.PrivilegedAction<ForkJoinPool>() {
            public ForkJoinPool run() { return makeCommonPool(); }});
    int par = common.config & SMASK; // report 1 even if threads disabled
    commonParallelism = par > 0 ? par : 1;
}

function

Common methods
Dependency relationship
  • thenApply(): Put the execution results of the previous tasks , Give it to the back Function
  • thenCompose(): Task used to connect two dependencies , The result is returned by the second task
and A collection of relations
  • thenCombine(): Merge tasks , There is a return value 、
  • thenAcceptBoth(): I want to finish the two hot broadcasts , Give the results to thenAcceptBotj Handle , No return value
  • runAfterBoth(); When both tasks are completed , Take the next step
or Aggregate relationship
  • applyToEither(): Which of the two tasks performs faster , Just use which result , There is a return value
  • acceptEither(): Which of the two tasks performs faster , Just the result of consumption , No return value
  • runAfterEither(): Any task is completed , Proceed to the next step (Runnable Types of tasks )
Parallel execution
  • allOf(): When all given CompletableFuture When finished , Back to a new CompletableFuture
  • anyOf(): When any given CompletablFuture When finished , Back to a new CompletableFuture
The result processing
  • whenComplete: When the task is completed , The results will be used ( or null) And exceptions at this stage ( or null without ) Perform the given operation
  • exceptionally: Back to a new CompletableFuture, When the front CompletableFuture When finished , It's also done , When it is completed abnormally , The exception of a given function triggers this CompletableFuture Completion

Asynchronous operations

CompletableFuture Four static methods are provided to create an asynchronous operation

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

The difference between the four methods

  • runAsync() With Runnable The function interface type is parameter , No results returned ,supplyAsync() With Supplier Functional interface type has no parameters , The return result type is not U;Supplier Interface get() Is blocked by the return value
  • Use unspecified Executor Method time , For internal use Fork Join Pool.commonPool() Execute asynchronous code as his thread pool . If no thread pool is specified , Use the specified thread pool to run
  • By default CompletableFuture Will use public ForkJoinPool Thread pool , The default number of threads created by this thread pool is CPU The number of nuclear ( It can also be done through JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism To set up ForkJoinPool Number of threads in the thread pool ). If all CompletableFuture Share a thread pool , So once there's a task to perform some very slow I/O operation , Will cause all threads in the thread pool to block in I/O Operationally , This causes thread starvation , And then affect the performance of the whole system . therefore , It is strongly recommended that you create different thread pools according to different business types , To avoid interfering with each other
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Runnable runnable = ()-> System.out.println(" No return result task ");
        CompletableFuture.runAsync(runnable);

        CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
            System.out.println(" Asynchronous tasks with return values ");
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello world";
        });
        String result = future.get();
        System.out.println();
    }

To get the results (join&get)

join() and get() Methods are used to get CompletableFuture Return value after asynchrony ,join() Method throws uncheck abnormal

, Developers will not be forced to throw .get() Method throws a checked exception ,ExecutionException, InterruptedException It needs to be handled manually by the user ( Throw or try catch)

The result processing

When CompletableFuture The result of calculation is finished , Or throw an exception , We can execute specific Action. Mainly the following methods :

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)

  • Action The type is BiConsumer<? super T,? super Throwable>, It can handle normal calculations , Or something unusual .
  • The method is different Async ending , signify Action Execute with the same thread , and Async Other threads may be used to execute ( If you use the same thread pool , It may also be executed by the same thread ).
  • These methods will return CompletableFuture, When Action After execution, its results return to the original CompletableFuture Or return an exception

test

package ThreadPoolExecultorTest;
import java.util.*;
import java.util.concurrent.*;

/**
 * @Author lijiaxin
 * @Description TODO
 * @Date 2022/07/05/9:23
 * @Version 1.0
 */
public class CompletableFuture1Test {
    public static void main(String[] args) {
        for (int i = 0; i < 4 ; i++) {
            /* Start the main thread */
            int finalI = i;
            CompletableFuture.runAsync(()->{
                final CompletableFuture<List> listmates = CompletableFuture.supplyAsync(()->{
                    List list = null;
                    if (finalI == 1){
                        list = test1();
                    }else if(finalI == 2){
                        list = test2();
                    }else if(finalI == 3){
                        list = test3();
                    }else {
                        list = test4();
                    }
                    return list;
                },threadPoolExecutor());
                listmates.thenAcceptAsync(list->{
                    linkedLists((LinkedList) list);
                },threadPoolExecutor());
            },threadPoolExecutor());
        }
    }
    public static ThreadPoolExecutor threadPoolExecutor() {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 1,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(20),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.CallerRunsPolicy()); // Tasks cannot be discarded - Synchronous execution 
        return executor;
    }
    public static List<String> test1(){
        System.out.println("1");
        List<String> list = new LinkedList<>();
        list.add("1");
        return list;
    }
    public static List<String> test2(){
        System.out.println("2");
        List<String> list = new LinkedList<>();
        list.add("2");
        return list;
    }
    public static List<String> test4(){
        System.out.println("4");
        List<String> list = new LinkedList<>();
        list.add("4");
        return list;
    }
    public static List<String> test3(){
        System.out.println("3");
        List<String> list = new LinkedList<>();
        list.add("3");
        return list;
    }
    public static LinkedList<LinkedList<String>> linkedLists(LinkedList linkedList){
        System.out.println(linkedList);
        LinkedList linkedList1 = null;
        linkedList1.add(linkedList);
        for (Object list:linkedList1) {
            System.out.println(list.toString());
        }
        return linkedList1;
    }

}

原网站

版权声明
本文为[Ghost punishment OLO]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/188/202207070259461655.html