当前位置:网站首页>Future & CompletionService
Future & CompletionService
2022-06-27 10:24:00 【InfoQ】
How to create threads
- Inherit Thread class
- Realization Runable Interface
- Realization Callable Interface
- Using thread pools
- Cannot return a return value
- Can't throw checked Exception
public class FutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
new Thread(() -> {
System.out.println(" adopt Runnable Way to perform tasks ");
}).start();
FutureTask task = new FutureTask(new Callable() {
@Override
public Object call() throws Exception {
System.out.println(" adopt Callable Way to perform tasks ");
// wait for 3s Simulation execution task
Thread.sleep(3000);
return " Return to task result ";
}
});
new Thread(task).start();
System.out.println(" result :" + task.get());
}
}
Future
getConstruction method
public FutureTask(Callable<V> callable)
public FutureTask(Runnable runnable, V result)
Common methods
boolean cancel (boolean mayInterruptIfRunning)
- Cancel the execution of the task . Parameter specifies whether to interrupt task execution immediately , Or wait until the mission is over .
boolean isCancelled ()
- Whether the task has been cancelled , Cancel the task before it is completed , Then return to true.
boolean isDone ()
- Whether the task has been completed . It should be noted that if the task terminates normally 、 Exception or cancel , Will return to true.
V get () throws InterruptedException, ExecutionException
- Wait for the task to finish , Then get V Result of type .InterruptedException Thread interrupted exception ,ExecutionException Task execution exception , If the mission is cancelled , And throw CancellationException .
V get(long timeout,TimeUnit unit) throws InterruptedException,ExecutionException, TimeoutException
- Same as above get Function as , Too much time out . Parameters timeout Specify the timeout period ,uint Specify the unit of time , In enumerating classes TimeUnit There are related definitions in . If the calculation times out , Will throw out TimeoutException.
FutureTask

Usage mode
public class FutureTaskDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// Realization Callable
Task task = new Task();
// structure futureTask
FutureTask<Integer> futureTask = new FutureTask<>(task);
// As Runnable Enter the reference
new Thread(futureTask).start();
System.out.println("task Running results :" + futureTask.get());
}
static class Task implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println(" The child thread is calculating ");
int sum = 0;
for (int i = 0; i < 100; i++) {
sum += i;
}
return sum;
}
}
}
Use scenarios

public class FutureTaskDemo2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> ft1 = new FutureTask<>(new T1Task());
FutureTask<String> ft2 = new FutureTask<>(new T2Task());
FutureTask<String> ft3 = new FutureTask<>(new T3Task());
FutureTask<String> ft4 = new FutureTask<>(new T4Task());
FutureTask<String> ft5 = new FutureTask<>(new T5Task());
// Build thread pool
ExecutorService executorService = Executors.newFixedThreadPool(5);
executorService.submit(ft1);
executorService.submit(ft2);
executorService.submit(ft3);
executorService.submit(ft4);
executorService.submit(ft5);
// Get execution results
System.out.println(ft1.get());
System.out.println(ft2.get());
System.out.println(ft3.get());
System.out.println(ft4.get());
System.out.println(ft5.get());
executorService.shutdown();
}
static class T1Task implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("T1: Query flight information ...");
TimeUnit.MILLISECONDS.sleep(5000);
return " Flight information query succeeded ";
}
}
static class T2Task implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("T2: Check the information of the passengers ...");
TimeUnit.MILLISECONDS.sleep(50);
return " Flight attendant information query succeeded ";
}
}
static class T3Task implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("T3: Query contact information ...");
TimeUnit.MILLISECONDS.sleep(50);
return " Contact information query succeeded ";
}
}
static class T4Task implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("T4: Query the refund and change information ...");
TimeUnit.MILLISECONDS.sleep(50);
return " Refund and change information query succeeded ";
}
}
static class T5Task implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("T5: Query other information ...");
TimeUnit.MILLISECONDS.sleep(50);
return " Other information query succeeded ";
}
}
}

Future The limitations of
get()- Multitasking concurrently
- Future Provided only get() Method to get the result , And it's blocked . therefore , There is no other way but to wait for you .
- Cannot chain call multiple tasks
- If you want to perform a specific action after the calculation task is completed , E-mail , but Future But it doesn't provide such ability .
- Cannot combine multiple tasks
- If you run 10 A mission , And expect to perform specific actions after they are all executed , So in Future There's nothing I can do about it .
- No exception handling
- Future There are no exception handling methods in the interface .
CompletionService
Construction method
ExecutorCompletionService(Executor executor)
ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)
Common methods

submit
- To submit a Callable perhaps Runnable Type of task , And back to Future.
take
- Blocking method , Get and remove the result of a completed task from the result queue , If not, it will block , Until a task is completed, the result is returned .
poll
- Get and remove the result of a completed task from the result queue , If not, it will return null, This method doesn't block .
Use scenarios
public class CompletionServiceDemo {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// Creating a thread pool
ExecutorService executor = Executors.newFixedThreadPool(10);
// establish CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
// Asynchronous to e-commerce S1 inquiry
cs.submit(() -> getPriceByS1());
// Asynchronous to e-commerce S2 inquiry
cs.submit(() -> getPriceByS2());
// Asynchronous to e-commerce S3 inquiry
cs.submit(() -> getPriceByS3());
// Asynchronously save the inquiry result to the database
for (int i = 0; i < 3; i++) {
// Get from blocking queue futureTask
Integer r = cs.take().get();
executor.execute(() -> save(r));
}
executor.shutdown();
}
private static void save(Integer r) {
System.out.println(" Save inquiry results :" + r);
}
private static Integer getPriceByS1() throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(5000);
System.out.println(" Online retailers S1 Inquiry information 1200");
return 1200;
}
private static Integer getPriceByS2() throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(8000);
System.out.println(" Online retailers S2 Inquiry information 1000");
return 1000;
}
private static Integer getPriceByS3() throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(3000);
System.out.println(" Online retailers S3 Inquiry information 800");
return 800;
}
}

Realization principle
QueueingFuture
QueueingFutureprivate static class QueueingFuture<V> extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task,
BlockingQueue<Future<V>> completionQueue) {
super(task, null);
this.task = task;
this.completionQueue = completionQueue;
}
private final Future<V> task;
private final BlockingQueue<Future<V>> completionQueue;
// rewrite FutureTask Of done Method , After the Executor The result of the calculation is put into BlockingQueue in
protected void done() {
completionQueue.add(task);
}
}
ExecutorService and CompletionService
Futureget()get()FutureFutureFuture
Performance optimization practice
- Isolate the problems that need to be solved (60~120 individual , The average is about 80 About two topics );
- Analyze and deal with the problem , Download the pictures in the title to the local , Then call the third party tool generation. PDF file ( It takes about 3~10 second );
- take PDF Upload document to OSS Cloud space for storage ( It takes about 1~3 second );
- Provide the document address for users to download and print .
4~13s8sfor/**
* Pending document information as needed
*/
public class PDFDocVO {
// Name of document to be processed
private String docName;
// Omit others
public String getDocName() {
return docName;
}
public void setDocName(String docName) {
this.docName = docName;
}
}
public class ProduceDocService {
/**
* Process the pending document as a local actual document
*
* @param doc
* @return
*/
public static String makePDF(PDFDocVO doc) throws InterruptedException {
// use sleep Time consuming range of simulated document generation 3~10s
Random r = new Random();
Thread.sleep(3000 + r.nextInt(7000));
return "local" + doc.getDocName();
}
}
public class UploadDocService {
/**
* Analog upload
*
* @param localName Where the actual document is stored locally
* @return oss File path
* @throws InterruptedException
*/
public static String upload(String localName) throws InterruptedException {
// use sleep Time consuming range of simulated document generation 1~3s
Random r = new Random();
Thread.sleep(1000 + r.nextInt(2000));
return "https://aliyun.oss.xxx/file/" + localName;
}
}
Serial
import java.util.ArrayList;
import java.util.List;
public class SerializeModel {
/**
* Eligible documents
*
* @param count
* @return
*/
public static List<PDFDocVO> getPDFDocList(int count) {
List<PDFDocVO> list = new ArrayList<>();
for (int i = 0; i < count; i++) {
PDFDocVO doc1 = new PDFDocVO();
list.add(doc1);
}
return list;
}
public static void main(String[] args) throws InterruptedException {
List<PDFDocVO> docList = getPDFDocList(10);
// Starting time
long start = System.currentTimeMillis();
for (PDFDocVO doc : docList) {
// Generating documentation
String localName = ProduceDocService.makePDF(doc);
// Upload documents
UploadDocService.upload(localName);
}
long total = System.currentTimeMillis() - start;
System.out.println(" The total time is :" + total / 1000 + " second ," + total / 1000 / 60 + " minute ");
}
}

82/10=8.2sparallel
parallel + asynchronous
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* TODO
*
* @date 2022/4/7 22:08
*/
public class ParallelAsyncModel {
// Thread pool threads
public final static int THREAD_COUNT = Runtime.getRuntime().availableProcessors();
// Thread pool for processing document generation IO Intensive task
private static ExecutorService makeDocService = Executors.newFixedThreadPool(THREAD_COUNT * 2);
// Thread pool for processing document upload
private static ExecutorService uploadDocService = Executors.newFixedThreadPool(THREAD_COUNT * 2);
// Document generation queue
private static CompletionService<String> makeDocCompletionService = new ExecutorCompletionService(makeDocService);
// Document upload queue
private static CompletionService<String> uploadDocCompletionService = new ExecutorCompletionService(uploadDocService);
/**
* Eligible documents
*
* @param count
* @return
*/
public static List<PDFDocVO> getPDFDocList(int count) {
List<PDFDocVO> list = new ArrayList<>();
for (int i = 0; i < count; i++) {
PDFDocVO doc1 = new PDFDocVO();
list.add(doc1);
}
return list;
}
public static void main(String[] args) throws InterruptedException {
int count = 100;
List<PDFDocVO> docList = getPDFDocList(count);
// Starting time
long start = System.currentTimeMillis();
// Multithreading document generation
for (PDFDocVO doc : docList) {
makeDocCompletionService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
String localName = ProduceDocService.makePDF(doc);
return localName;
}
});
}
// Upload documents
for (int i = 0; i < count; i++) {
Future<String> take = makeDocCompletionService.take();
uploadDocCompletionService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
String uploadUrl = UploadDocService.upload(take.get());
return uploadUrl;
}
});
}
long total = System.currentTimeMillis() - start;
System.out.println(" The total time is :" + total / 1000 + " second ," + total / 1000 / 60 + " minute ");
}
}

32/100=0.32s8.2/0.32 ≈ 25 times Thread number setting
- IO Intensive task , The number of threads is
CPU The core number *2;
- CPU Intensive task , The number of threads is
CPU The core number +1.
CPU The core number *2CPU The core number *41:41:3CPU The core number *4*3
summary
边栏推荐
- Stop using system Currenttimemillis() takes too long to count. It's too low. Stopwatch is easy to use!
- 有关WIN10的内存压缩
- JS all network request modes
- R语言plotly可视化:plotly可视化基础小提琴图(basic violin plot in R with plotly)
- Experiment notes - Convert Carmen (.Log.Clf) file to rosbag
- 3D移动 translate3d
- [registration] infrastructure design: from architecture hot issues to industry changes | tf63
- C any() and aii() methods
- lvi-sam 总结
- leetcode待做题目
猜你喜欢

Use aspese slides to convert PPT to PDF

手机影像内卷几时休?

Audiotrack and audiolinker

2-4Kali下安装nessus

User authentication technology

Feedforward feedback control system design (process control course design matlab/simulink)

Win10快捷键整理
![File name setting causes an error to be written to writelines: oserror: [errno 22] invalid argument](/img/08/2d4f425e6941af35616911672b6fed.png)
File name setting causes an error to be written to writelines: oserror: [errno 22] invalid argument

在外企远程办公是什么体验? | 社区征文
![[hcie-rs review mind map] - STP](/img/b5/b89e59fe7f23bf23feeadb991acba7.png)
[hcie-rs review mind map] - STP
随机推荐
R language uses econcharts package to create microeconomic or macro-economic charts, demand function to visualize demand curve, and customize the parameters of demand function to enrich the visualizat
Dimitt's law
Memory compression for win10
Privacy computing fat offline prediction
C language learning day_ 04
[registration] infrastructure design: from architecture hot issues to industry changes | tf63
前馈-反馈控制系统设计(过程控制课程设计matlab/simulink)
Win10快捷键整理
感应电机直接转矩控制系统的设计与仿真(运动控制matlab/simulink)
CPU设计(单周期和流水线)
【TcaplusDB知识库】Tmonitor后台一键安装介绍(一)
Use aspese slides to convert PPT to PDF
12 necessary tools for network engineers
Installation manuelle de MySQL par UBUNTU
.NET 中的引用程序集
Audiotrack and audiolinker
【TcaplusDB知识库】TcaplusDB新增机型介绍
【TcaplusDB知识库】TcaplusDB Tmonitor模块架构介绍
[noodle classic] Yunze Technology
[hcie-rs review mind map] - STP