当前位置:网站首页>[Part 14] source code analysis and application details of completionservice class [key]
[Part 14] source code analysis and application details of completionservice class [key]
2022-06-11 21:21:00 【__ Struggling Kaka】
1.1 summary
When we use ExecutorService Start multiple Callable when , Every Callable Return to one Future, And when we execute Future Of get Method to get the result , May get Future Not the first to complete the execution Callable Of Future, It will Blocking , Thus, the first completed Callable result , This will cause serious performance loss .
Let's look at an example :
Future f1 = excutor.submit(c1);
f1.get();
Future f2 = excutor.submit(c2);
f2.get();
f1.get() It will be blocked before success , It will block c2 Implementation , Seriously reduces efficiency .
CompletionService It is to solve this problem , It is Java8 New interface , its The implementation class is ExecutorCompletionService.CompletionService According to the thread pool Task The execution results of are sorted according to the order of execution completion , If the task is completed first, you can get... First .
1.2 The principle,
1.2.1 A small chestnut
To make an RFQ application , This application requires three e-commerce inquiries , Then save it in your own database .
// Creating a thread pool
ExecutorService executor = Executors.newFixedThreadPool(3);
// Asynchronous to e-commerce S1 inquiry
Future<Integer> f1 = executor.submit( ()->getPriceByS1());
// Asynchronous to e-commerce S2 inquiry
Future<Integer> f2 = executor.submit( ()->getPriceByS2());
// Asynchronous to e-commerce S3 inquiry
Future<Integer> f3 = executor.submit( ()->getPriceByS3());
// Create a blocking queue
BlockingQueue<Integer> bq = new LinkedBlockingQueue<>();
// Online retailers S1 The quotation enters the blocking queue asynchronously
executor.execute(()-> bq.put(f1.get()));
// Online retailers S2 The quotation enters the blocking queue asynchronously
executor.execute(()-> bq.put(f2.get()));
// Online retailers S3 The quotation enters the blocking queue asynchronously
executor.execute(()-> bq.put(f3.get()));
// Save all quotes asynchronously
for (int i=0; i<3; i++) {
Integer r = bq.take();
executor.execute(()->save(r));
}
CompletionService The principle of implementation is to maintain a blocking queue internally , When the task execution ends, the execution result of the task is added to the blocking queue , The difference is CompletionService It's the result of the task Future Object to join the blocking queue , The above example code puts the final execution result of the task into the blocking queue .
1.2.1 CompletionService structure

CompletionSerive Interface has an implementation class ExecutorCompletionService, The structure is as follows
1.2.2 ExecutorCompletionService structure

1.2.2.1 Construction method
structure ExecutorCompletionService object
executor: Associated thread pool
completionQueue: Custom result storage queue
ExecutorCompletionService(Executor executor)
ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)
1.2.2.2 submit Method
To submit a Callable perhaps Runnable Type of task , And back to Future
Future<V> submit(Callable<V> task)
Future<V> submit(Runnable task, V result)
1.2.2.3 take Method
Blocking method , Get and remove the result of a completed task from the result queue , If not, it will block , Until a task is completed, the result is returned .
Future<V> take() throws InterruptedException
1.2.2.3 poll Method
Get and remove the result of a completed task from the result queue , If not, it will return null, This method doesn't block .
timeout: How long is the maximum waiting time
unit: Time unit
Future<V> poll()
Future<V> poll(long timeout, TimeUnit unit)
1.2.3 Case study
1.2.3.1 Problem recurrence ( Don't use CompletionService)
Don't use CompletionService Problems in
package com.brycen.part3.threadpool;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
public class CompletionServiceExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
List<Callable<Integer>> callables = Arrays.asList(
()->{
mySleep(20);
System.out.println("=============20 end==============");
return 20;
},
()->{
mySleep(10);
System.out.println("=============10 end==============");
return 10;
}
);
List<Future<Integer>> futures = new ArrayList<>();
// Submit tasks , And will future Add to list Collection
futures.add(executorService.submit(callables.get(0)));
futures.add(executorService.submit(callables.get(1)));
// Traverse Future, Because I don't know which task to complete first , So the first thing we get from the simulation here is the task with the longest execution time , Then the task with shorter execution time must wait for the task with longer execution time to complete
for (Future future:futures) {
System.out.println(" result : "+future.get());
}
System.out.println("============main end=============");
}
private static void mySleep(int seconds){
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Running results :
Even if it's dormant 10 Second tasks are executed first and no results are output , Because when you get the result, you may get it first 20 Second task results , And dormant 20 The task of seconds has not been completed , At this point, it will block , This affects the performance .
=10 end==
=20 end==
result : 20
result : 10
main end=
1.2.3.2 utilize CompletionService solve the problem
package com.brycen.part3.threadpool;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
public class CompletionServiceExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
List<Callable<Integer>> callables = Arrays.asList(
()->{
mySleep(20);
System.out.println("=============20 end==============");
return 20;
},
()->{
mySleep(10);
System.out.println("=============10 end==============");
return 10;
}
);
// structure ExecutorCompletionService, Associated with thread pool
CompletionService completionService = new ExecutorCompletionService(executorService);
// Submit Callable Mission
completionService.submit(callables.get(0));
completionService.submit(callables.get(1));
// obtain future result , It won't block
Future<Integer> pollFuture = completionService.poll();
// Here, because there is no implementation completed Callable, So back null
System.out.println(pollFuture);
// obtain future result , Waiting for the most 3 second , It won't block
Future<Integer> pollTimeOutFuture = completionService.poll(3,TimeUnit.SECONDS);
// Here, because there is no implementation completed Callable, So back null
System.out.println(pollTimeOutFuture);
// adopt take obtain Future result , This method will block
for(int i=0;i<callables.size();i++){
System.out.println(completionService.take().get());
}
System.out.println("============main end=============");
}
private static void mySleep(int seconds){
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Running results :
null
null
=10 end==
10
=20 end==
20
main end=
1.3 summary
1、 comparison ExecutorService,CompletionService Can be more accurate and simple to complete the execution of asynchronous tasks
2、CompletionService One implementation of is ExecutorCompletionService, It is Executor and BlockingQueue Functional fusion ,Executor Complete the calculation task ,BlockingQueue Be responsible for saving the execution results of asynchronous tasks
3、 When performing a large number of independent and isomorphic tasks , have access to CompletionService
4、CompletionService You can set a time limit for the execution of a task , Mainly through BlockingQueue Of poll(long time,TimeUnit unit) Limit the time for obtaining the results of task execution , If not, cancel the task
5、CompletionService It can make the execution results of asynchronous tasks orderly . First of all, execute the first in the blocking queue , Take advantage of this feature , You can easily achieve the order of subsequent processing , Avoid unnecessary waiting , At the same time, it can also quickly realize such as Forking Cluster Such a need .
边栏推荐
- 【C語言進階】整型在內存中的存儲
- 一个Golang的私有库设置问题
- SQL的语法
- Release of version 5.6 of rainbow, add multiple installation methods, and optimize the topology operation experience
- php pcntl_ Fork create multiple child process resolution
- select _ Lazy loading
- Technical exchange | why should network security equipment use bypass function
- 产品资讯|PoE网卡家族集体亮相,机器视觉完美搭档!
- JVM方法区
- IDEA中,运行yarn命令,显示无法加载文件,因为在此系统上禁用运行脚本
猜你喜欢

Weekly 02 | to tell you the truth, I am actually a student of MIT

Teach you how to use win7 system to quickly build your own website

重投农业,加码技术服务,拼多多底盘进一步夯实

Why should I use iwarp, roce V2, nvme of and other protocols for 100g network transmission

Release of version 5.6 of rainbow, add multiple installation methods, and optimize the topology operation experience

【数据可视化】Apache Superset 1.2.0教程 (二)——快速入门(可视化王者英雄数据)

周刊02|不瞞你說,我其實是MIT的學生

ORA-04098: trigger ‘xxx. xxx‘ is invalid and failed re-validation

【 C Advanced language】 Integer Storage in Memory

【数据可视化】使用 Apache Superset 可视化 ClickHouse 数据
随机推荐
RANSAC提取平面(MATLAB内置函数)
Mysql add 新增多个新字段并指定字段位置
New product release: domestic single port Gigabit network card is officially mass produced!
Only 38 years old! Zhou Chuan, a researcher of the Chinese Academy of Sciences, died unfortunately. Rao Yi sent a document to mourn: he guided me when he was still my student
BUG -- coredump使用
一个Golang的私有库设置问题
LR-LINK联瑞携新品首次亮相数博会-助力新基建数据中心建设
Frequency domain filter
【C语言进阶】整型在内存中的存储
Stream Chinese sorting
【博弈论-绪论】
Solve the problem of img 5px spacing
Part I physical layer
The official announced the launch of Alibaba's 2023 global school recruitment: Technical Posts account for more than 60%
[nk] deleted number of 100 C Xiaohong in Niuke practice match
Add personal statement for go file in file template in Golan
应用业务层修改
Cs144 lab0 lab1 record
PHP strtotime 获取自然月误差问题解决方案
JVM之堆区