当前位置:网站首页>Take you to play completablefuture asynchronous programming

Take you to play completablefuture asynchronous programming

2022-06-10 16:35:00 Mr. demon king

Preface

The first one who has been busy in life recently OKR, Wait, let's talk about it later , Starting today, I will resume the original article every week , Thank you for your persistence . This article is also recently in Code Review When , See everyone's code , I want to push down everyone's idea of asynchronous programming as a whole , Written from this .

Why use CompletableFuture

In some business scenarios, we need to use multithreading to execute tasks asynchronously , Speed up task execution . JDK5 Added Future Interface , Used to describe the result of an asynchronous calculation . although Future And related usage methods provide the ability to perform tasks asynchronously , But it's not convenient to get the results , We have to use Future.get Blocking the calling thread , Or use polling to judge Future.isDone Is the mission over , And get the results . Neither of these treatments is very elegant , The relevant code is as follows :

    @Test
    public void testFuture() {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        Future<String> future = executorService.submit(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        });
        try {
            System.out.println(future.get());
            System.out.println("end");
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

In addition, it cannot solve the scenario where multiple asynchronous tasks need to be interdependent , In short, it's , The main thread needs to wait for the sub thread to execute after the task is completed , At this time, you may think of CountDownLatch, Yes, it can solve , The code is as follows , however Java8 I no longer think this is an elegant solution , Now let's take a look at CompletableFuture Use .

    @Test
    public void testCountDownLatch() {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        CountDownLatch downLatch = new CountDownLatch(2);
        Future<String> orderFuture = executorService.submit(() -> {
            OrderService orderService = new OrderServiceImpl();
            String result = orderService.queryOrderInfo();
            downLatch.countDown();
            return result;
        });

        Future<String> trailFuture = executorService.submit(() -> {
            TrailService trailService = new TrailServiceImpl();
            String result = trailService.queryTrail();
            downLatch.countDown();
            return result;
        });

        try {
            downLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try {
            System.out.println(orderFuture.get() + trailFuture.get());
            System.out.println("end");
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

Introduction

Create examples

img
img

About CompletableFuture The creation of provides 5 Ways of planting , The first is to create a with default results CompletableFuture, Infrequent use , What we often use is runAsync and supplyAsync, Focus on these two static methods .runAsync Has no return value ,supplyAsync Has a return value , Both methods provide one or two creation forms , One is to use public by default ForkJoinPool Thread pool execution , The default number of threads in this thread pool is CPU The number of nuclear .

img
img

Another use is to provide an active thread pool , This is compared to ForkJoinPool Are the benefits of , You can customize thread pools for different scenarios , To divide the business and find problems , Another advantage is for ForkJoinPool For this shared thread , Once blocked , It will be regarded as that other threads cannot obtain execution opportunities .

CompletionStage

img
img

About CompletableFuture Core competence is through inheritance Future and CompletionStage To achieve , About Future Is to provide some asynchronous capabilities , If the single deposit is like this CompletableFuture It won't be so powerful , So our core is to introduce CompletionStage Some internal methods are commonly used .

img
img

About CompletionStage Methods , There are two types of interfaces , The core methods provide asynchronous methods , The asynchronous method is not introduced here , Basically, the principle is similar , A new thread pool is provided to implement tasks .

Asynchronous callback

Asynchronous callbacks can be divided into two categories , One is the return with parameters , The other is the return without parameters , The returned parameters include thenApply and thenCompose, Returns without parameters include thenRun and thenAccept.

There are parameters to return

thenApply and thenCompose Indicates the action to be performed after a task is completed , Callback method , The execution result of the task, that is, the return value of the method, will be passed to the callback method as an input parameter , It can also be understood as serial , The only difference is thenCompose Need to return to a new CompletionStage, The overall use is as follows :

    @Test
    public void testCompletableFuture() {
        long start = System.currentTimeMillis();
        CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "aaaaaaaaaaaaaaaaaaaaa";
        }).thenApply(x -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return x + "bbbbbbbbbbbbbbbbbbbbbbbb";
        }).thenCompose(x -> CompletableFuture.supplyAsync(x::toUpperCase));
        System.out.println(first.join());
        long end = System.currentTimeMillis();
        System.out.println(" Time consuming " + (end - start) / 1000 + "");
    }
No parameters return

thenAccep It is also the action of consuming the last task , Pass the execution result of the task, that is, the return value of the method, as an input parameter to the callback method , Just no return value ,thenAccep And thenRun The difference between methods is that there is no input parameter and no return value .

    @Test
    public void testCompletableFuture() {
        long start = System.currentTimeMillis();
        CompletableFuture<Void> first = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "a";
        }).thenAccept(x -> {
            System.out.println(x);
        });
        first.join();
        long end = System.currentTimeMillis();
        System.out.println(" Time consuming " + (end - start) / 1000 + " second ");
    }

abnormal

CompletableFuture Method execution if there is an exception , Only get perhaps join Method to get the exception , In this case CompletableFuture Provides three ways to handle exceptions .

exceptionally

exceptionally The use of is similar to try catch Medium catch Exception handling in code block .exceptionally Callback method executed when a task execution is abnormal , Pass the thrown exception as a parameter to the callback method , If the task is performed normally ,exceptionally Method CompletionStage Of result Is the result of the normal execution of the task .

    @Test
    public void testCompletableFuture() {
        long start = System.currentTimeMillis();
        CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            throw new RuntimeException("test");
        });
        CompletableFuture<String> two = first.exceptionally((x) -> {
            x.printStackTrace();
            return "123";
        });
        two.join();
        long end = System.currentTimeMillis();
        System.out.println(" Time consuming " + (end - start) / 1000 + " second ");
    }
whenComplete

whenComplete The use of is similar to try..catch..finanlly in finally Code block , Whether or not there is an exception , Will be implemented .whenComplete Callback method executed after a task is completed , The execution result or the exception thrown during execution will be passed to the callback method , If it is executed normally, the exception is null, The callback method corresponds to CompletableFuture Of result Consistent with the task , If the task is performed normally , be get Method returns the execution result , If it is an execution exception , be get Method throws an exception .

    @Test
    public void testCompletableFuture() {
        long start = System.currentTimeMillis();
        CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "aa";
        }).whenComplete((x, throwable) -> {
            //  If the exception exists , Printing exception , And return the default value
            if (throwable != null) {
                throwable.printStackTrace();
                System.out.println(" Failure ");
            } else {
                System.out.println(" success ");
            }
        });
        System.out.println(first.join());
        long end = System.currentTimeMillis();
        System.out.println(" Time consuming " + (end - start) / 1000 + " second ");
    }
handle

Follow whenComplete Almost the same , The difference lies in handle The callback method of has a return value , And handle Method CompletableFuture Of result Is the execution result of the callback method or the exception thrown during the execution of the callback method , With primordial CompletableFuture Of result irrelevant .

    @Test
    public void testCompletableFuture() {
        long start = System.currentTimeMillis();
        CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            throw new RuntimeException("test");
        }).handle((x, throwable) -> {
            //  If the exception exists , Printing exception , And return the default value
            if (throwable != null) {
                throwable.printStackTrace();
                return " abnormal ";
            } else {
                
                return x + "aa";
            }
        });
        System.out.println(first.join());
        long end = System.currentTimeMillis();
        System.out.println(" Time consuming " + (end - start) / 1000 + " second ");
    }

synthetic relation

There are two types of combinatorial relationships , One is the relationship with , A yes or relationship , And relationship is to continue to perform when all tasks are completed , Be similar to CountDownLatch, Or relationship is that as long as a task is completed, it can be executed downward .

And relationship
thenCombine /thenAcceptBoth / runAfterBoth/allOf

These four methods can combine multiple CompletableFuture combined , Will be multiple CompletableFuture When it's all done , To perform the following operations , The difference lies in ,thenCombine The execution result of the task will be passed to the specified method as a method input parameter , And the method has a return value ;thenAcceptBoth Similarly, the execution result of the task is entered as a method , But no return value ;runAfterBoth No participation , There is no return value . Note that there is only one execution exception in multiple tasks , The exception information is taken as the execution result of the specified task .allOf Multiple tasks will be executed only after they are completed , As long as there is a task execution exception , The return of the CompletableFuture perform get Method will throw an exception , If it's all normal , be get return null.

    @Test
    public void testCompletableFuture() {
        CompletableFuture<String> order = CompletableFuture.supplyAsync(() -> {
            OrderService orderService = new OrderServiceImpl();
            return orderService.queryOrderInfo();
        });
        CompletableFuture<String> trail = CompletableFuture.supplyAsync(() -> {
            TrailService trailService = new TrailServiceImpl();
            return trailService.queryTrail();
        });
        CompletableFuture<String> future = order.thenCombine(trail, (a, b) -> a + b);
        CompletableFuture<Void> afterBoth = future.runAfterBoth(trail, () -> {
            System.out.println(future.join());
        });
        CompletableFuture<Void> result = CompletableFuture.allOf(afterBoth);
        try {
            System.out.println(result.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
applyToEither / acceptEither / runAfterEither/anyOf

These four methods can combine multiple CompletableFuture combined , Just one of them CompletableFuture After execution , You can perform the following operations ,applyToEither The execution result of the completed task will be entered as a method , And it has a return value ;acceptEither Similarly, take the execution result of the completed task as the method parameter , But there is no return value ;runAfterEither There is no way to enter parameters , There is no return value , Note that there is only one execution exception in multiple tasks , The exception information is taken as the execution result of the specified task .anyOf Multiple tasks, as long as one task is completed , Follow up tasks can be performed .

    @Test
    public void testCompletableFuture() {
        CompletableFuture<String> order = CompletableFuture.supplyAsync(() -> {
            OrderService orderService = new OrderServiceImpl();
            return orderService.queryOrderInfo();
        });
        CompletableFuture<String> trail = CompletableFuture.supplyAsync(() -> {
            TrailService trailService = new TrailServiceImpl();
            return trailService.queryTrail();
        });
        CompletableFuture<String> future = order.applyToEither(trail, (result) -> result);
        CompletableFuture<Void> afterBoth = future.runAfterEither(trail, () -> {
            System.out.println(future.join());
        });
        CompletableFuture<Object> result = CompletableFuture.anyOf(afterBoth,order);
        try {
            System.out.println(result.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

How to use in a project

CompletableFuture When customizing threads , The default thread pool is ForkJoinPool.commonPool(), For us to use java Often do IO Intensive task , The default thread pool is far from enough ; On dual core and below machines , The default thread pool will degenerate into creating one thread for each task , Equivalent to no thread pool . So for CompletableFuture The thread pool must be customized for use in the project , At the same time, pay attention to the custom thread pool , Thread pool has a full capacity reject policy , If the rejection policy of discard policy is adopted , also allOf Methods and get Method will wait indefinitely if no timeout is set , Next, we use... Through custom threads CompletableFuture.

  1. Custom thread pool , Here through inheritance ThreadPoolExecutor, Rewrote shutdown() 、shutdownNow() 、beforeExecute() and afterExecute() Method to count the execution of thread pool , Here can also be combined with Spring and appllo Implement custom extended thread pool , The next article can talk about the extension idea and Implementation Scheme , Different thread pools are used for different business scenarios , One is to facilitate the troubleshooting of problems , Another is similar to Hystrix Isolation plan ;
package com.zto.lbd;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 *  Thread pool monitoring class
 *
 * @author wangtongzhou 18635604249
 * @since 2022-02-23 07:27
 */

public class ThreadPoolMonitor extends ThreadPoolExecutor {

    private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolMonitor.class);

    /**
     *  Save the start time of the task , When the task is over , Calculate the task execution time by subtracting the start time from the end time of the task
     */

    private ConcurrentHashMap<String, Date> startTimes;

    /**
     *  Thread pool name , Usually named after the business name , Easy to distinguish
     */

    private String poolName;

    /**
     *  Call the constructor of the parent class , And initialization HashMap And thread pool name
     *
     * @param corePoolSize     Number of core threads in thread pool
     * @param maximumPoolSize  Maximum number of threads in the thread pool
     * @param keepAliveTime    The maximum idle time of the thread
     * @param unit             A unit of free time
     * @param workQueue        Save the queue of submitted tasks
     * @param poolName         Thread pool name
     */

    public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                             TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName)
 
{
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                Executors.defaultThreadFactory(), poolName);
    }


    /**
     *  Call the constructor of the parent class , And initialization HashMap And thread pool name
     *
     * @param corePoolSize     Number of core threads in thread pool
     * @param maximumPoolSize  Maximum number of threads in the thread pool
     * @param keepAliveTime    The maximum idle time of the thread
     * @param unit             A unit of free time
     * @param workQueue        Save the queue of submitted tasks
     * @param threadFactory    Thread factory
     * @param poolName         Thread pool name
     */

    public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                             TimeUnit unit, BlockingQueue<Runnable> workQueue,
                             ThreadFactory threadFactory, String poolName)
 
{
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        this.startTimes = new ConcurrentHashMap<>();
        this.poolName = poolName;
    }


    /**
     *  When thread pool is delayed to close ( Wait for all tasks in the thread pool to be executed ), Statistics thread pool situation
     */

    @Override
    public void shutdown() {
        //  Statistics of executed tasks 、 Executing task 、 Number of tasks not performed
        LOGGER.info("{}  Close thread pool ,  Task executed : {},  Executing task : {},  Number of tasks not performed : {}",
                this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
        super.shutdown();
    }

    /**
     *  When the thread pool is shut down immediately , Statistics thread pool situation
     */

    @Override
    public List<Runnable> shutdownNow() {
        //  Statistics of executed tasks 、 Executing task 、 Number of tasks not performed
        LOGGER.info("{}  Close thread pool now , Task executed : {},  Executing task : {},  Number of tasks not performed : {}",
                this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
        return super.shutdownNow();
    }

    /**
     *  Before the mission , Record the start time of the task
     */

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        startTimes.put(String.valueOf(r.hashCode()), new Date());
    }

    /**
     *  After task execution , Calculate the end time of the task
     */

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        Date startDate = startTimes.remove(String.valueOf(r.hashCode()));
        Date finishDate = new Date();
        long diff = finishDate.getTime() - startDate.getTime();
        //  The statistical task takes time 、 Initial number of threads 、 Number of core threads 、 Number of tasks in progress 、
        //  Number of completed tasks 、 Total tasks 、 Number of tasks cached in the queue 、 The maximum number of threads that exist in the pool 、
        //  Maximum number of threads allowed 、 Thread idle time 、 Whether the thread pool is closed 、 Whether the thread pool is terminated
        LOGGER.info("{}-pool-monitor: " +
                        " The task takes time : {} ms,  Initial number of threads : {},  Number of core threads : {},  Number of tasks in progress : {}, " +
                        " Number of completed tasks : {},  Total tasks : {},  Number of tasks in the queue : {},  The maximum number of threads that exist in the pool : {}, " +
                        " Maximum number of threads : {},   Thread idle time : {},  Whether the thread pool is closed : {},  Whether the thread pool is terminated : {}",
                this.poolName,
                diff, this.getPoolSize(), this.getCorePoolSize(), this.getActiveCount(),
                this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(),
                this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated());
    }

    /**
     *  The thread used to generate the thread pool , Rewrites the default thread factory of thread pool , Pass in the thread pool name , Easy to track the problem
     */

    static class MonitorThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        /**
         *  Initialize thread factory
         *
         * @param poolName  Thread pool name
         */

        MonitorThreadFactory(String poolName) {
            SecurityManager s = System.getSecurityManager();
            group = Objects.nonNull(s) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
            namePrefix = poolName + "-pool-" + poolNumber.getAndIncrement() + "-thread-";
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
            if (t.isDaemon()) {
                t.setDaemon(false);
            }
            if (t.getPriority() != Thread.NORM_PRIORITY) {
                t.setPriority(Thread.NORM_PRIORITY);
            }
            return t;
        }
    }
}
  1. Using a custom thread pool CompletableFuture;
    private final static BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100);
    private final static ThreadPoolMonitor threadPoolMonitor = new ThreadPoolMonitor(510100L,
            TimeUnit.SECONDS, workQueue, "monitor");

    @Test
    public void testCompletableFuture() {
        CompletableFuture<String> order = CompletableFuture.supplyAsync(() -> {
            OrderService orderService = new OrderServiceImpl();
            return orderService.queryOrderInfo();
        },threadPoolMonitor);
        String result=order.join();
        assertTrue(Objects.nonNull(result));
    }

end

Welcome to pay attention to , A little bit of praise !

原网站

版权声明
本文为[Mr. demon king]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/03/202203020959248115.html

随机推荐