当前位置:网站首页>Asynchronous, thread pool (completablefuture)
Asynchronous, thread pool (completablefuture)
2022-06-10 21:18:00 【liangjiayy】
There are four ways to initialize threads :
- 1、 Inherit Thread
- 2、 Realization Runnable
- 3、 Realization Callable Interface +FutureTask( You can get the results back , Can handle exceptions )
- 4、 Thread pool
Advantages and disadvantages :
- 1、2 Unable to get return value ,3 Can get the return value ( Wait when blocked )
- 1、2、3 Can't control resources ,
- 4 Can control resources , Stable performance
Code example :
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j
public class CreateThreadTest {
public static void main(String[] args) {
// 1、 Inherit Thread
new Thread(new Thread01()).start();
// 2、 Realization Runnable
new Thread(new Rannable01()).start();
// 3、 Realization Callable Interface +FutureTask( You can get the results back , Can handle exceptions )
FutureTask<String> stringFutureTask = new FutureTask<>(new Callable01());
new Thread(stringFutureTask).start();
try {
log.info(" result :{}",stringFutureTask.get());// Block wait
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
// 4、 Thread pool
ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.submit(()->log.info(" The way 4..."));//submit You can get results ,execute Can't get results
executorService.shutdown();// close
}
}
@Slf4j
class Thread01 extends Thread{
@Override
public void run() {
log.info(" The way 1...");
}
}
@Slf4j
class Rannable01 implements Runnable{
@Override
public void run() {
log.info(" The way 2...");
}
}
@Slf4j
class Callable01 implements Callable<String>{
@Override
public String call() throws Exception {
log.info(" The way 3...");
return "Callable01";
}
}

ThreadPoolExecutor Seven parameters :
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
int corePoolSize, Number of core threads 【 As long as the thread pool is not destroyed ( Set up allowCoreThreadTimeOut attribute ), It's always been there 】; The number of threads ready after the thread pool is created , Wait for receiving asynchronous request to execute ;int maximumPoolSize, Maximum number of threads , Can be used to control resourceslong keepAliveTime, Survival time , The current number of threads is greater than the specified number of core threads , As long as the survival time reaches this value , Will be released .(maximumPoolSize-corePoolSize)TimeUnit unit, Time unitBlockingQueue<Runnable> workQueue, Blocking queues , If there are many tasks , Redundant tasks will be put in the queueThreadFactory threadFactory, A factory for creating threadsRejectedExecutionHandler handlerIf the queue is full , Execute the task according to the rejection policy specified by us
The order of work
- 1、 Thread pool creation , Prepare the specified quantity (
corePoolSize) The core thread of , To receive tasks - 2、 The core thread is full , The incoming thread will be put into the blocking queue (
workQueue) in , When there are idle core threads , Will block the queue to get the task - 3、 The blocking queue is full , Just open a new thread to execute , It can only drive up to
maximumPoolSizeThe specified quantity - 4、 achieve
maximumPoolSizeThreads , Use the reject policy to reject the task - 5、 If the current number of threads is greater than the specified number of core threads (
corePoolSize), Each extra thread is idle for a specified time (keepAliveTime) later , Release thread
Executors Common methods for creating thread pools :
Executors.newCachedThreadPool();core yes 0, All are recyclableExecutors.newFixedThreadPool();Fixed size ,core=maxExecutors.newSingleThreadExecutor();Thread pool for scheduled tasksExecutors.newScheduledThreadPool();Single threaded thread pool , The background obtains tasks from the queue , One by one
CompletableFuture Start asynchronous task
- runAsync: No return value
- supplyAsync: There is a return value
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class CompletableFutureTest {
private static ExecutorService executor = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
// The way 1、 No return value
CompletableFuture.runAsync(()->log.info("runAsync"),executor);
// The way 2、 There is a return value
CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {
log.info("supplyAsync");
return "supplyAsync";
}, executor);
// Get the return value
try {
String res = supplyAsync.get();
log.info(" Return value :{}",res);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
The perception after the method execution is completed
CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(
() -> {
log.info(" establish ...");
return 10 / 0;
}, executor)
// After processing, use the current thread to process , If whenCompleteAsync, Then use other threads to process
.whenComplete((res, e) -> {
log.info(" result :{}, abnormal :{}", res, e);
})
// Callback after exception occurs
.exceptionally((throwable) -> {
log.info(" Something unusual happened , return 0, abnormal :{}", throwable.getCause().toString());
return 0;
});
// Print the results
try {
log.info(" result :{}",supplyAsync.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}

Processing after method execution
CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(
() -> {
log.info(" establish ...");
return 10 / 2;
// return 10 / 0;
}, executor)
// Parameters :BiFunction<? super T, Throwable, ? extends U> fn; Receive results 、 abnormal , Return the new result
.handle((res, e) -> {
if (res != null) {
log.info(" The result is not empty ");
return res * 2;
}
if (e != null) {
log.info(" There are abnormal ");
return 0;
}
log.info(" Other situations ");
return 0;
});
// Print the results
try {
log.info(" result :{}", supplyAsync.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}

Thread pool serial method
The basic method :
thenApply: Receive the last returned result , Return the new resultthenAccept: Only the last returned result can be received , No return valuethenRun: Do not accept or return results
Reality shows a need :
- Start a thread first , Mission 1
- Mission 2 Access to task 1 Return result of , And return a new result
- Mission 3 Get tasks on the same thread 2 Result , But it doesn't return any results
- Mission 4 Neither receive the last returned result , No result is returned
Code :
private static void Thread pool serial method () {
CompletableFuture.supplyAsync(
() -> {
log.info(" Mission 1...");
return 111;
})
// To get the results , Return the new result ( The types can be different )
.thenApplyAsync((res) -> {
log.info(" Mission 2... Get the result of the previous :{}", res);
return "222";
}, executor)
// Get the last result , But no result is returned
.thenAccept((res) -> {
log.info(" Mission 3... Get the result of the previous :{}", res);
})
// Neither get , And don't go back
.thenRunAsync(() -> {
log.info(" Mission 4... Unable to get the result of the previous ");
}, executor);
// Run under the test package , Thread sleep , Prevent early termination
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

Two task combination
It's all done
thenCombine: Combine two future, Get two future Return result of , And return the return result of the current taskthenAcceptBoth: Combine two future, Get two future The return result of the task , And then deal with the task , no return valuerunAfterBoth: Combine two future, There is no need to get future Result , Just two future Process tasks after processing them- Add after the above method
Async, It means to open a new thread , for example :thenCombineAsync
Example : Use the third task , Get the return results of the first two tasks , And put them together , Return the final result :
// Two tasks
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(
() -> {
log.info(" Mission 1...");
return 111;
}, executor);
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(
() -> {
log.info(" Mission 2...");
return 222;
}, executor);
// Mission 3 Execute after the first two executions :
CompletableFuture<String> cf3 = cf1.thenCombineAsync(cf2, (f1, f2) -> {
log.info(" Mission 3... The results obtained :{},{}", f1, f2);
return f1 + "" + f2;
}, executor);
// Print tasks 3 Result
try {
log.info(" Mission 3 The return value of :{}",cf3.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}

Just finish one
applyToEither: One of the two tasks is completed , Get its return value , Process the task and return the new valueacceptEither: One of the two tasks is completed , Get its return value , Processing tasks , But no return valuerunAfterEither: One of the two tasks is completed , Do not get its return value , Processing tasks , There is no return value- Can be added
Async - The two combined tasks need to have the same return value
There are three ways to test :
// Two tasks
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(
() -> {
log.info(" Mission 1...");
return 111;
}, executor);
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(
() -> {
log.info(" Mission 2... Start ");
try {
Thread.sleep(2000);
log.info(" Mission 2... end ");
} catch (InterruptedException e) {
e.printStackTrace();
}
return 222;
}, executor);
//========= The way 1=========
// Mission 3 Execute after the first two executions :
CompletableFuture<Integer> cf3 = cf1.applyToEitherAsync(cf2, (f1) -> {
log.info(" Mission 3-1... The results obtained :{}, And expand the results 2 times ", f1);
return f1 * 2;
}, executor);
// Print cf3 Result
try {
log.info(" Mission 3-1 The return value of :{}", cf3.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
//========= The way 2=========
cf1.acceptEitherAsync(cf2, (f1) -> {
log.info(" Mission 3-2 The results obtained :{}", f1);
}, executor);
//========= The way 3=========
cf1.runAfterEitherAsync(cf2,
() -> log.info(" Mission 3-3: One of the first two tasks has been completed "),
executor);

Multitasking
CompletableFuture There are two static methods in :
allOf: All tasks are completedanyOf: One task can be completed
Code example :
// Two tasks
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(
() -> {
log.info(" Mission 1...");
return 111;
}, executor);
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(
() -> {
log.info(" Mission 2... Start ");
try {
Thread.sleep(2000);
log.info(" Mission 2... end ");
} catch (InterruptedException e) {
e.printStackTrace();
}
return 222;
}, executor);
// Just finish one :
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(cf1, cf2);
try {
log.info(" One task has been completed , Return value :{}",anyOf.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
// It's all done :
CompletableFuture<Void> allOf = CompletableFuture.allOf(cf1, cf2);
try {
log.info(" It's all done , Return value :{},{}",cf1.get(),cf2.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}

边栏推荐
猜你喜欢
![[generation confrontation network learning part I] classic Gan and its existing problems and related improvements](/img/5a/0a4015cd4dcc21afd16ca7f895d909.png)
[generation confrontation network learning part I] classic Gan and its existing problems and related improvements

Uncover secrets: how can wechat red envelopes in the Spring Festival Gala resist 10billion requests?

What is the difference between localhost and 127.0.0.1?

Self attention and multi head attention

简解深度学习Attention

保姆级教程:如何成为Apache Linkis文档贡献者

KCon 2022 议题大众评选火热进行中!不要错过“心仪”的议题哦~

LeetCode:497. Random points in non overlapping rectangles -- medium

Talk about server performance optimization ~ (recommended Collection)

一、Vulkan开发理论基础知识
随机推荐
LeetCode 进阶之路 - 167.两数之和 II - 输入有序数组
编程式导航路由跳转到当前路由(参数不变), 多次执行会抛出NavigationDuplicated的警告错误?
MySQL service startup failed
pytorch深度学习——神经网络卷积层Conv2d
游戏兼容性测试(通用方案)
LeetCode 进阶之路 - 反转字符串
用一个性能提升了666倍的小案例说明在TiDB中正确使用索引的重要性
从h264实时流中提取Nalu单元数据
Power set V4 recursion of brute force method /1~n
【生成对抗网络学习 其一】经典GAN与其存在的问题和相关改进
^30h5 web worker multithreading
shell实现ssh登录并执行命令
pdf. Js----- JS parse PDF file to realize preview, and obtain the contents in PDF file (in array form)
面试必备——synchronized底层原理的基础知识
Niuke.com: numbers that appear more than half of the times in the array
In MySQL basics, MySQL adds an automatically added primary key (or any field) to an existing table
Game compatibility test (general scheme)
六级考试-商务英语-考前最后一背
Read the source code of micropyton - add the C extension class module (2)
App test case