当前位置:网站首页>[Part 14] source code analysis and application details of completionservice class [key]
[Part 14] source code analysis and application details of completionservice class [key]
2022-06-11 21:21:00 【__ Struggling Kaka】
1.1 summary
When we use ExecutorService Start multiple Callable when , Every Callable Return to one Future, And when we execute Future Of get Method to get the result , May get Future Not the first to complete the execution Callable Of Future, It will Blocking , Thus, the first completed Callable result , This will cause serious performance loss .
Let's look at an example :
Future f1 = excutor.submit(c1);
f1.get();
Future f2 = excutor.submit(c2);
f2.get();
f1.get() It will be blocked before success , It will block c2 Implementation , Seriously reduces efficiency .
CompletionService It is to solve this problem , It is Java8 New interface , its The implementation class is ExecutorCompletionService.CompletionService According to the thread pool Task The execution results of are sorted according to the order of execution completion , If the task is completed first, you can get... First .
1.2 The principle,
1.2.1 A small chestnut
To make an RFQ application , This application requires three e-commerce inquiries , Then save it in your own database .
// Creating a thread pool
ExecutorService executor = Executors.newFixedThreadPool(3);
// Asynchronous to e-commerce S1 inquiry
Future<Integer> f1 = executor.submit( ()->getPriceByS1());
// Asynchronous to e-commerce S2 inquiry
Future<Integer> f2 = executor.submit( ()->getPriceByS2());
// Asynchronous to e-commerce S3 inquiry
Future<Integer> f3 = executor.submit( ()->getPriceByS3());
// Create a blocking queue
BlockingQueue<Integer> bq = new LinkedBlockingQueue<>();
// Online retailers S1 The quotation enters the blocking queue asynchronously
executor.execute(()-> bq.put(f1.get()));
// Online retailers S2 The quotation enters the blocking queue asynchronously
executor.execute(()-> bq.put(f2.get()));
// Online retailers S3 The quotation enters the blocking queue asynchronously
executor.execute(()-> bq.put(f3.get()));
// Save all quotes asynchronously
for (int i=0; i<3; i++) {
Integer r = bq.take();
executor.execute(()->save(r));
}
CompletionService The principle of implementation is to maintain a blocking queue internally , When the task execution ends, the execution result of the task is added to the blocking queue , The difference is CompletionService It's the result of the task Future Object to join the blocking queue , The above example code puts the final execution result of the task into the blocking queue .
1.2.1 CompletionService structure

CompletionSerive Interface has an implementation class ExecutorCompletionService, The structure is as follows
1.2.2 ExecutorCompletionService structure

1.2.2.1 Construction method
structure ExecutorCompletionService object
executor: Associated thread pool
completionQueue: Custom result storage queue
ExecutorCompletionService(Executor executor)
ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)
1.2.2.2 submit Method
To submit a Callable perhaps Runnable Type of task , And back to Future
Future<V> submit(Callable<V> task)
Future<V> submit(Runnable task, V result)
1.2.2.3 take Method
Blocking method , Get and remove the result of a completed task from the result queue , If not, it will block , Until a task is completed, the result is returned .
Future<V> take() throws InterruptedException
1.2.2.3 poll Method
Get and remove the result of a completed task from the result queue , If not, it will return null, This method doesn't block .
timeout: How long is the maximum waiting time
unit: Time unit
Future<V> poll()
Future<V> poll(long timeout, TimeUnit unit)
1.2.3 Case study
1.2.3.1 Problem recurrence ( Don't use CompletionService)
Don't use CompletionService Problems in
package com.brycen.part3.threadpool;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
public class CompletionServiceExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
List<Callable<Integer>> callables = Arrays.asList(
()->{
mySleep(20);
System.out.println("=============20 end==============");
return 20;
},
()->{
mySleep(10);
System.out.println("=============10 end==============");
return 10;
}
);
List<Future<Integer>> futures = new ArrayList<>();
// Submit tasks , And will future Add to list Collection
futures.add(executorService.submit(callables.get(0)));
futures.add(executorService.submit(callables.get(1)));
// Traverse Future, Because I don't know which task to complete first , So the first thing we get from the simulation here is the task with the longest execution time , Then the task with shorter execution time must wait for the task with longer execution time to complete
for (Future future:futures) {
System.out.println(" result : "+future.get());
}
System.out.println("============main end=============");
}
private static void mySleep(int seconds){
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Running results :
Even if it's dormant 10 Second tasks are executed first and no results are output , Because when you get the result, you may get it first 20 Second task results , And dormant 20 The task of seconds has not been completed , At this point, it will block , This affects the performance .
=10 end==
=20 end==
result : 20
result : 10
main end=
1.2.3.2 utilize CompletionService solve the problem
package com.brycen.part3.threadpool;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
public class CompletionServiceExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
List<Callable<Integer>> callables = Arrays.asList(
()->{
mySleep(20);
System.out.println("=============20 end==============");
return 20;
},
()->{
mySleep(10);
System.out.println("=============10 end==============");
return 10;
}
);
// structure ExecutorCompletionService, Associated with thread pool
CompletionService completionService = new ExecutorCompletionService(executorService);
// Submit Callable Mission
completionService.submit(callables.get(0));
completionService.submit(callables.get(1));
// obtain future result , It won't block
Future<Integer> pollFuture = completionService.poll();
// Here, because there is no implementation completed Callable, So back null
System.out.println(pollFuture);
// obtain future result , Waiting for the most 3 second , It won't block
Future<Integer> pollTimeOutFuture = completionService.poll(3,TimeUnit.SECONDS);
// Here, because there is no implementation completed Callable, So back null
System.out.println(pollTimeOutFuture);
// adopt take obtain Future result , This method will block
for(int i=0;i<callables.size();i++){
System.out.println(completionService.take().get());
}
System.out.println("============main end=============");
}
private static void mySleep(int seconds){
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Running results :
null
null
=10 end==
10
=20 end==
20
main end=
1.3 summary
1、 comparison ExecutorService,CompletionService Can be more accurate and simple to complete the execution of asynchronous tasks
2、CompletionService One implementation of is ExecutorCompletionService, It is Executor and BlockingQueue Functional fusion ,Executor Complete the calculation task ,BlockingQueue Be responsible for saving the execution results of asynchronous tasks
3、 When performing a large number of independent and isomorphic tasks , have access to CompletionService
4、CompletionService You can set a time limit for the execution of a task , Mainly through BlockingQueue Of poll(long time,TimeUnit unit) Limit the time for obtaining the results of task execution , If not, cancel the task
5、CompletionService It can make the execution results of asynchronous tasks orderly . First of all, execute the first in the blocking queue , Take advantage of this feature , You can easily achieve the order of subsequent processing , Avoid unnecessary waiting , At the same time, it can also quickly realize such as Forking Cluster Such a need .
边栏推荐
- One article to show you how to understand the harmonyos application on the shelves
- Online excel file parsing and conversion to JSON format
- Why is your LDO output unstable?
- JS monitor scrolling touch bottom load more_ Browser scrolls to the bottom to load more
- go语言的goto语句
- [data visualization] Apache superset 1.2.0 tutorial (III) - detailed explanation of chart functions
- [advanced C language] integer storage in memory
- 技术交流|网络安全设备为什么要用Bypass功能
- ORA-04098: trigger ‘xxx. xxx‘ is invalid and failed re-validation
- Cs144 lab0 lab1 record
猜你喜欢

How to manually drag nodes in the Obsidian relationship graph

Part I physical layer

Frequency domain filter

重投农业,加码技术服务,拼多多底盘进一步夯实

Syntax of SQL

【C语言进阶】整型在内存中的存储

JVM object allocation policy TLAB
![[data visualization] Apache superset 1.2.0 tutorial (III) - detailed explanation of chart functions](/img/1f/00f2085186971198928b012a3792ea.jpg)
[data visualization] Apache superset 1.2.0 tutorial (III) - detailed explanation of chart functions

How to Load Data from CSV (Data Preparation Part)
![[game theory complete information static game] strategic game](/img/d2/743e8d14e4fb27cbe883d1df1bca27.jpg)
[game theory complete information static game] strategic game
随机推荐
2021-09-11 训练场补题
【数据可视化】使用 Apache Superset 可视化 ClickHouse 数据
Why is your LDO output unstable?
Go language for loop
JVM对象分配策略TLAB
全球机器视觉市场规模持续上涨,PoE图像采集卡为工业相机提供高速传输通道
Regular check matches positive integer or decimal limit between [0-100] and [0-1000]
Cs144 lab0 lab1 record
JS monitor scrolling touch bottom load more_ Browser scrolls to the bottom to load more
Goto statement of go language
【数据可视化】Apache Superset 1.2.0教程 (二)——快速入门(可视化王者英雄数据)
ASCII码对照表
The scale of the global machine vision market continues to rise. Poe image acquisition card provides a high-speed transmission channel for industrial cameras
[index system] the latest modeling method of data warehouse index system
[advanced C language] integer storage in memory
JMeter load test finds the maximum number of concurrent users (including step analysis)
table_ Display status
周刊02|不瞞你說,我其實是MIT的學生
Go language functions
2021牛客多校5 Double Strings