当前位置:网站首页>[Part 14] source code analysis and application details of completionservice class [key]
[Part 14] source code analysis and application details of completionservice class [key]
2022-06-11 21:21:00 【__ Struggling Kaka】
1.1 summary
When we use ExecutorService Start multiple Callable when , Every Callable Return to one Future, And when we execute Future Of get Method to get the result , May get Future Not the first to complete the execution Callable Of Future, It will Blocking , Thus, the first completed Callable result , This will cause serious performance loss .
Let's look at an example :
Future f1 = excutor.submit(c1);
f1.get();
Future f2 = excutor.submit(c2);
f2.get();
f1.get() It will be blocked before success , It will block c2 Implementation , Seriously reduces efficiency .
CompletionService It is to solve this problem , It is Java8 New interface , its The implementation class is ExecutorCompletionService.CompletionService According to the thread pool Task The execution results of are sorted according to the order of execution completion , If the task is completed first, you can get... First .
1.2 The principle,
1.2.1 A small chestnut
To make an RFQ application , This application requires three e-commerce inquiries , Then save it in your own database .
// Creating a thread pool
ExecutorService executor = Executors.newFixedThreadPool(3);
// Asynchronous to e-commerce S1 inquiry
Future<Integer> f1 = executor.submit( ()->getPriceByS1());
// Asynchronous to e-commerce S2 inquiry
Future<Integer> f2 = executor.submit( ()->getPriceByS2());
// Asynchronous to e-commerce S3 inquiry
Future<Integer> f3 = executor.submit( ()->getPriceByS3());
// Create a blocking queue
BlockingQueue<Integer> bq = new LinkedBlockingQueue<>();
// Online retailers S1 The quotation enters the blocking queue asynchronously
executor.execute(()-> bq.put(f1.get()));
// Online retailers S2 The quotation enters the blocking queue asynchronously
executor.execute(()-> bq.put(f2.get()));
// Online retailers S3 The quotation enters the blocking queue asynchronously
executor.execute(()-> bq.put(f3.get()));
// Save all quotes asynchronously
for (int i=0; i<3; i++) {
Integer r = bq.take();
executor.execute(()->save(r));
}
CompletionService The principle of implementation is to maintain a blocking queue internally , When the task execution ends, the execution result of the task is added to the blocking queue , The difference is CompletionService It's the result of the task Future Object to join the blocking queue , The above example code puts the final execution result of the task into the blocking queue .
1.2.1 CompletionService structure

CompletionSerive Interface has an implementation class ExecutorCompletionService, The structure is as follows
1.2.2 ExecutorCompletionService structure

1.2.2.1 Construction method
structure ExecutorCompletionService object
executor: Associated thread pool
completionQueue: Custom result storage queue
ExecutorCompletionService(Executor executor)
ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)
1.2.2.2 submit Method
To submit a Callable perhaps Runnable Type of task , And back to Future
Future<V> submit(Callable<V> task)
Future<V> submit(Runnable task, V result)
1.2.2.3 take Method
Blocking method , Get and remove the result of a completed task from the result queue , If not, it will block , Until a task is completed, the result is returned .
Future<V> take() throws InterruptedException
1.2.2.3 poll Method
Get and remove the result of a completed task from the result queue , If not, it will return null, This method doesn't block .
timeout: How long is the maximum waiting time
unit: Time unit
Future<V> poll()
Future<V> poll(long timeout, TimeUnit unit)
1.2.3 Case study
1.2.3.1 Problem recurrence ( Don't use CompletionService)
Don't use CompletionService Problems in
package com.brycen.part3.threadpool;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
public class CompletionServiceExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
List<Callable<Integer>> callables = Arrays.asList(
()->{
mySleep(20);
System.out.println("=============20 end==============");
return 20;
},
()->{
mySleep(10);
System.out.println("=============10 end==============");
return 10;
}
);
List<Future<Integer>> futures = new ArrayList<>();
// Submit tasks , And will future Add to list Collection
futures.add(executorService.submit(callables.get(0)));
futures.add(executorService.submit(callables.get(1)));
// Traverse Future, Because I don't know which task to complete first , So the first thing we get from the simulation here is the task with the longest execution time , Then the task with shorter execution time must wait for the task with longer execution time to complete
for (Future future:futures) {
System.out.println(" result : "+future.get());
}
System.out.println("============main end=============");
}
private static void mySleep(int seconds){
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Running results :
Even if it's dormant 10 Second tasks are executed first and no results are output , Because when you get the result, you may get it first 20 Second task results , And dormant 20 The task of seconds has not been completed , At this point, it will block , This affects the performance .
=10 end==
=20 end==
result : 20
result : 10
main end=
1.2.3.2 utilize CompletionService solve the problem
package com.brycen.part3.threadpool;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
public class CompletionServiceExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
List<Callable<Integer>> callables = Arrays.asList(
()->{
mySleep(20);
System.out.println("=============20 end==============");
return 20;
},
()->{
mySleep(10);
System.out.println("=============10 end==============");
return 10;
}
);
// structure ExecutorCompletionService, Associated with thread pool
CompletionService completionService = new ExecutorCompletionService(executorService);
// Submit Callable Mission
completionService.submit(callables.get(0));
completionService.submit(callables.get(1));
// obtain future result , It won't block
Future<Integer> pollFuture = completionService.poll();
// Here, because there is no implementation completed Callable, So back null
System.out.println(pollFuture);
// obtain future result , Waiting for the most 3 second , It won't block
Future<Integer> pollTimeOutFuture = completionService.poll(3,TimeUnit.SECONDS);
// Here, because there is no implementation completed Callable, So back null
System.out.println(pollTimeOutFuture);
// adopt take obtain Future result , This method will block
for(int i=0;i<callables.size();i++){
System.out.println(completionService.take().get());
}
System.out.println("============main end=============");
}
private static void mySleep(int seconds){
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Running results :
null
null
=10 end==
10
=20 end==
20
main end=
1.3 summary
1、 comparison ExecutorService,CompletionService Can be more accurate and simple to complete the execution of asynchronous tasks
2、CompletionService One implementation of is ExecutorCompletionService, It is Executor and BlockingQueue Functional fusion ,Executor Complete the calculation task ,BlockingQueue Be responsible for saving the execution results of asynchronous tasks
3、 When performing a large number of independent and isomorphic tasks , have access to CompletionService
4、CompletionService You can set a time limit for the execution of a task , Mainly through BlockingQueue Of poll(long time,TimeUnit unit) Limit the time for obtaining the results of task execution , If not, cancel the task
5、CompletionService It can make the execution results of asynchronous tasks orderly . First of all, execute the first in the blocking queue , Take advantage of this feature , You can easily achieve the order of subsequent processing , Avoid unnecessary waiting , At the same time, it can also quickly realize such as Forking Cluster Such a need .
边栏推荐
- 【指标体系】最新数仓指标体系建模方法
- One article to show you how to understand the harmonyos application on the shelves
- Deriving Kalman filter from probability theory
- Online excel file parsing and conversion to JSON format
- 频域滤波器
- JVM method area
- 周刊02|不瞞你說,我其實是MIT的學生
- [data visualization] use Apache superset to visualize Clickhouse data
- Field queryIndexFieldnameService in xxxImpl required a single bean, but 19 were found:
- JVM之堆区
猜你喜欢

Add personal statement for go file in file template in Golan

Teach you how to use win7 system to quickly build your own website

UML系列文章(29)体系结构建模---模式和框架

JVM对象分配策略TLAB

PHP strtotime 获取自然月误差问题解决方案

JVM object allocation policy TLAB

从概率论基础出发推导卡尔曼滤波

Online excel file parsing and conversion to JSON format

频域滤波器

Deriving Kalman filter from probability theory
随机推荐
The official announced the launch of Alibaba's 2023 global school recruitment: Technical Posts account for more than 60%
频域滤波器
为什么100G网络传输要使用iWARP、RoCE v2、NVMe-oF等协议
MySQL add adds multiple new fields and specifies the field location
JMeter load test finds the maximum number of concurrent users (including step analysis)
Brain cell membrane equivalent neural network training code
Iros 2021 | new idea of laser vision fusion? Lidar intensity diagram +vpr
Three waves of changes in cloud computing
Serval and Rooted Tree(CF1153D)-DP
【 C Advanced language】 Integer Storage in Memory
重投农业,加码技术服务,拼多多底盘进一步夯实
Release of version 5.6 of rainbow, add multiple installation methods, and optimize the topology operation experience
Application analysis of Poe image acquisition card in machine vision industrial computer
JVM heap
Product information | Poe network card family makes a collective appearance, the perfect partner of machine vision!
[nk] 牛客练习赛100 C 小红的删数字
LabVIEW控制Arduino实现超声波测距(进阶篇—5)
解决 img 5px 间距的问题
RANSAC提取平面(MATLAB内置函数)
The scale of the global machine vision market continues to rise. Poe image acquisition card provides a high-speed transmission channel for industrial cameras