当前位置:网站首页>Asynchronous, thread pool (completablefuture)

Asynchronous, thread pool (completablefuture)

2022-06-10 21:18:00 liangjiayy

There are four ways to initialize threads :

  • 1、 Inherit Thread
  • 2、 Realization Runnable
  • 3、 Realization Callable Interface +FutureTask( You can get the results back , Can handle exceptions )
  • 4、 Thread pool

Advantages and disadvantages :

  • 1、2 Unable to get return value ,3 Can get the return value ( Wait when blocked )
  • 1、2、3 Can't control resources ,
  • 4 Can control resources , Stable performance

Code example :

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.*;

@Slf4j
public class CreateThreadTest {
    
    public static void main(String[] args) {
    
        // 1、 Inherit Thread
        new Thread(new Thread01()).start();

        // 2、 Realization Runnable
        new Thread(new Rannable01()).start();

        // 3、 Realization Callable Interface +FutureTask( You can get the results back , Can handle exceptions )
        FutureTask<String> stringFutureTask = new FutureTask<>(new Callable01());
        new Thread(stringFutureTask).start();
        try {
    
            log.info(" result :{}",stringFutureTask.get());// Block wait 
        } catch (InterruptedException | ExecutionException e) {
    
            e.printStackTrace();
        }

        // 4、 Thread pool 
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        executorService.submit(()->log.info(" The way 4..."));//submit You can get results ,execute Can't get results 
        executorService.shutdown();// close 
    }
}
@Slf4j
class Thread01 extends Thread{
    
    @Override
    public void run() {
    
        log.info(" The way 1...");
    }
}
@Slf4j
class Rannable01 implements Runnable{
    

    @Override
    public void run() {
    
        log.info(" The way 2...");
    }
}
@Slf4j
class Callable01 implements Callable<String>{
    

    @Override
    public String call() throws Exception {
    
        log.info(" The way 3...");
        return "Callable01";
    }
}

 Insert picture description here

ThreadPoolExecutor Seven parameters :

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
  • int corePoolSize, Number of core threads 【 As long as the thread pool is not destroyed ( Set up allowCoreThreadTimeOut attribute ), It's always been there 】; The number of threads ready after the thread pool is created , Wait for receiving asynchronous request to execute ;
  • int maximumPoolSize, Maximum number of threads , Can be used to control resources
  • long keepAliveTime, Survival time , The current number of threads is greater than the specified number of core threads , As long as the survival time reaches this value , Will be released .(maximumPoolSize-corePoolSize)
  • TimeUnit unit, Time unit
  • BlockingQueue<Runnable> workQueue, Blocking queues , If there are many tasks , Redundant tasks will be put in the queue
  • ThreadFactory threadFactory, A factory for creating threads
  • RejectedExecutionHandler handler If the queue is full , Execute the task according to the rejection policy specified by us

The order of work

  • 1、 Thread pool creation , Prepare the specified quantity (corePoolSize) The core thread of , To receive tasks
  • 2、 The core thread is full , The incoming thread will be put into the blocking queue (workQueue) in , When there are idle core threads , Will block the queue to get the task
  • 3、 The blocking queue is full , Just open a new thread to execute , It can only drive up to maximumPoolSize The specified quantity
  • 4、 achieve maximumPoolSize Threads , Use the reject policy to reject the task
  • 5、 If the current number of threads is greater than the specified number of core threads (corePoolSize), Each extra thread is idle for a specified time (keepAliveTime) later , Release thread

Executors Common methods for creating thread pools :

  • Executors.newCachedThreadPool();core yes 0, All are recyclable
  • Executors.newFixedThreadPool(); Fixed size ,core=max
  • Executors.newSingleThreadExecutor(); Thread pool for scheduled tasks
  • Executors.newScheduledThreadPool(); Single threaded thread pool , The background obtains tasks from the queue , One by one

CompletableFuture Start asynchronous task

  • runAsync: No return value
  • supplyAsync: There is a return value
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class CompletableFutureTest {
    
    private static ExecutorService executor = Executors.newFixedThreadPool(2);
    public static void main(String[] args) {
    
        // The way 1、 No return value 
        CompletableFuture.runAsync(()->log.info("runAsync"),executor);

        //  The way 2、 There is a return value 
        CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {
    
            log.info("supplyAsync");
            return "supplyAsync";
        }, executor);
        // Get the return value 
        try {
    
            String res = supplyAsync.get();
            log.info(" Return value :{}",res);
        } catch (InterruptedException e) {
    
            e.printStackTrace();
        } catch (ExecutionException e) {
    
            e.printStackTrace();
        }
    }
}

The perception after the method execution is completed

CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(
        () -> {
    
            log.info(" establish ...");
            return 10 / 0;
        }, executor)
        // After processing, use the current thread to process , If whenCompleteAsync, Then use other threads to process 
        .whenComplete((res, e) -> {
    
            log.info(" result :{}, abnormal :{}", res, e);
        })
        // Callback after exception occurs 
        .exceptionally((throwable) -> {
    
            log.info(" Something unusual happened , return 0, abnormal :{}", throwable.getCause().toString());
            return 0;
        });

// Print the results 
try {
    
    log.info(" result :{}",supplyAsync.get());
} catch (InterruptedException | ExecutionException e) {
    
    e.printStackTrace();
}

 Insert picture description here

Processing after method execution

CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(
        () -> {
    
            log.info(" establish ...");
            return 10 / 2;
            // return 10 / 0;
        }, executor)
        // Parameters :BiFunction<? super T, Throwable, ? extends U> fn; Receive results 、 abnormal , Return the new result 
        .handle((res, e) -> {
    
            if (res != null) {
    
                log.info(" The result is not empty ");
                return res * 2;
            }

            if (e != null) {
    
                log.info(" There are abnormal ");
                return 0;
            }

            log.info(" Other situations ");
            return 0;
        });

// Print the results 
try {
    
    log.info(" result :{}", supplyAsync.get());
} catch (InterruptedException | ExecutionException e) {
    
    e.printStackTrace();
}

 Insert picture description here

Thread pool serial method

The basic method :

  • thenApply: Receive the last returned result , Return the new result
  • thenAccept: Only the last returned result can be received , No return value
  • thenRun: Do not accept or return results
     Insert picture description here

Reality shows a need :

  • Start a thread first , Mission 1
  • Mission 2 Access to task 1 Return result of , And return a new result
  • Mission 3 Get tasks on the same thread 2 Result , But it doesn't return any results
  • Mission 4 Neither receive the last returned result , No result is returned

Code :

    private static void  Thread pool serial method () {
    
        CompletableFuture.supplyAsync(
                () -> {
    
                    log.info(" Mission 1...");
                    return 111;
                })

                // To get the results , Return the new result ( The types can be different )
                .thenApplyAsync((res) -> {
    
                    log.info(" Mission 2... Get the result of the previous :{}", res);
                    return "222";
                }, executor)

                // Get the last result , But no result is returned 
                .thenAccept((res) -> {
    
                    log.info(" Mission 3... Get the result of the previous :{}", res);
                })

                // Neither get , And don't go back 
                .thenRunAsync(() -> {
    
                    log.info(" Mission 4... Unable to get the result of the previous ");
                }, executor);

        //  Run under the test package , Thread sleep , Prevent early termination 
        try {
    
            Thread.sleep(2000);
        } catch (InterruptedException e) {
    
            e.printStackTrace();
        }
    }

 Insert picture description here

Two task combination

It's all done

  • thenCombine: Combine two future, Get two future Return result of , And return the return result of the current task
  • thenAcceptBoth: Combine two future, Get two future The return result of the task , And then deal with the task , no return value
  • runAfterBoth: Combine two future, There is no need to get future Result , Just two future Process tasks after processing them
  • Add after the above method Async, It means to open a new thread , for example :thenCombineAsync

Example : Use the third task , Get the return results of the first two tasks , And put them together , Return the final result :

// Two tasks 
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(
        () -> {
    
            log.info(" Mission 1...");
            return 111;
        }, executor);
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(
        () -> {
    
            log.info(" Mission 2...");
            return 222;
        }, executor);

// Mission 3 Execute after the first two executions :
CompletableFuture<String> cf3 = cf1.thenCombineAsync(cf2, (f1, f2) -> {
    
    log.info(" Mission 3... The results obtained :{},{}", f1, f2);
    return f1 + "" + f2;
}, executor);

// Print tasks 3 Result 
try {
    
    log.info(" Mission 3 The return value of :{}",cf3.get());
} catch (InterruptedException e) {
    
    e.printStackTrace();
} catch (ExecutionException e) {
    
    e.printStackTrace();
}

 Insert picture description here

Just finish one

  • applyToEither: One of the two tasks is completed , Get its return value , Process the task and return the new value
  • acceptEither: One of the two tasks is completed , Get its return value , Processing tasks , But no return value
  • runAfterEither: One of the two tasks is completed , Do not get its return value , Processing tasks , There is no return value
  • Can be added Async
  • The two combined tasks need to have the same return value

There are three ways to test :

// Two tasks 
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(
        () -> {
    
            log.info(" Mission 1...");
            return 111;
        }, executor);
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(
        () -> {
    
            log.info(" Mission 2... Start ");
            try {
    
                Thread.sleep(2000);
                log.info(" Mission 2... end ");
            } catch (InterruptedException e) {
    
                e.printStackTrace();
            }
            return 222;
        }, executor);

//========= The way 1=========
// Mission 3 Execute after the first two executions :
CompletableFuture<Integer> cf3 = cf1.applyToEitherAsync(cf2, (f1) -> {
    
    log.info(" Mission 3-1... The results obtained :{}, And expand the results 2 times ", f1);
    return f1 * 2;
}, executor);
// Print cf3 Result 
try {
    
    log.info(" Mission 3-1 The return value of :{}", cf3.get());
} catch (InterruptedException e) {
    
    e.printStackTrace();
} catch (ExecutionException e) {
    
    e.printStackTrace();
}

//========= The way 2=========
cf1.acceptEitherAsync(cf2, (f1) -> {
    
    log.info(" Mission 3-2 The results obtained :{}", f1);
}, executor);

//========= The way 3=========
cf1.runAfterEitherAsync(cf2,
        () -> log.info(" Mission 3-3: One of the first two tasks has been completed "),
        executor);

 Insert picture description here

Multitasking

CompletableFuture There are two static methods in :

  • allOf: All tasks are completed
  • anyOf: One task can be completed
     Insert picture description here

Code example :

// Two tasks 
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(
        () -> {
    
            log.info(" Mission 1...");
            return 111;
        }, executor);
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(
        () -> {
    
            log.info(" Mission 2... Start ");
            try {
    
                Thread.sleep(2000);
                log.info(" Mission 2... end ");
            } catch (InterruptedException e) {
    
                e.printStackTrace();
            }
            return 222;
        }, executor);

// Just finish one :
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(cf1, cf2);
try {
    
    log.info(" One task has been completed , Return value :{}",anyOf.get());
} catch (InterruptedException | ExecutionException e) {
    
    e.printStackTrace();
}

// It's all done :
CompletableFuture<Void> allOf = CompletableFuture.allOf(cf1, cf2);
try {
    
    log.info(" It's all done , Return value :{},{}",cf1.get(),cf2.get());
} catch (InterruptedException | ExecutionException e) {
    
    e.printStackTrace();
}

 Insert picture description here

原网站

版权声明
本文为[liangjiayy]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/161/202206102001161184.html