当前位置:网站首页>【JUC系列】Executor框架之CompletionService
【JUC系列】Executor框架之CompletionService
2022-06-21 08:27:00 【顧棟】
【JUC系列】Executor框架之CompletionService
文章目录
CompletionService
这是一个接口,提供一种服务,它将新的异步任务的生产与已完成任务的结果的消费分离。生产者提交任务以供执行。消费者接受已完成的任务并按照他们完成的顺序处理他们的结果。
例如,CompletionService 可用于管理异步 I/O,其中执行读取的任务在程序或系统的一个部分中提交,然后在读取完成时在程序的不同部分执行操作,可能在与他们要求的顺序不同。通常,CompletionService 依赖一个单独的 Executor 来实际执行任务,在这种情况下,CompletionService 只管理一个内部完成队列。 ExecutorCompletionService 类提供了这种方法的实现。
内存一致性效果:在将任务提交到 CompletionService 之前线程中的操作发生在该任务所采取的操作之前,这反过来又发生在从相应的 take() 成功返回之后的操作。
接口提供了以下方法
| 方法名 | 描述 |
|---|---|
| Future submit(Callable task) | 提交一个返回值的任务以供执行,并返回一个表示该任务待处理结果的 Future。 完成后,可以采取或轮询此任务。 |
| Future submit(Runnable task, V result) | 提交 Runnable 任务以执行并返回代表该任务的 Future。 完成后,可以take或poll此任务。 |
| Future take() throws InterruptedException | 检索并删除代表下一个已完成任务的 Future,如果还没有,则等待。 |
| Future poll() | 检索并删除表示下一个已完成任务的 Future,如果不存在,则返回 null。 |
| Future poll(long timeout, TimeUnit unit) throws InterruptedException | 检索并删除代表下一个已完成任务的 Future,如果还没有,则在必要时等待指定的等待时间。 |
ExecutorCompletionService
ExecutorCompletionService实现了CompletionService。
一个 CompletionService,它使用提供的 Executor 来执行任务。 此类安排提交的任务在完成后放置在使用 take 可访问的队列中。 该类足够轻量级,适合在处理任务组时临时使用。
成员变量
// 执行任务的线程池
private final Executor executor;
// ???
private final AbstractExecutorService aes;
// 任务完成会存放在该阻塞队列中
private final BlockingQueue<Future<V>> completionQueue;
内部类QueueingFuture
继承了FutureTask
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
// 执行成功,将任务添加到completionQueue队列中。主要在FutureTask执行finishCompletion()时,调用done。
protected void done() {
completionQueue.add(task); }
private final Future<V> task;
}
构造函数
参数executor:传入的线程池,用来执行任务;默认的队列是LinkedBlockingQueue。
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
参数executor:传入的线程池,用来执行任务;
参数completionQueue:用来记录执行结果的阻塞列表
public ExecutorCompletionService(Executor executor,
BlockingQueue<Future<V>> completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
}
任务执行
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
// 将任务转换成QueueingFuture
executor.execute(new QueueingFuture(f));
return f;
}
public Future<V> submit(Runnable task, V result) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
// 将任务转换成QueueingFuture
executor.execute(new QueueingFuture(f));
return f;
}
ExecutorCompletionService的newTaskFor,将Callable或Runnable任务转成FutureTask。
若线程池不为AbstractExecutorService则,直接new FutureTask;若aes不为null,则调用AbstractExecutorService的newTaskFor。实际也是通过new FutureTask实现。
private RunnableFuture<V> newTaskFor(Callable<V> task) {
if (aes == null)
return new FutureTask<V>(task);
else
return aes.newTaskFor(task);
}
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
if (aes == null)
return new FutureTask<V>(task, result);
else
return aes.newTaskFor(task, result);
}
AbstractExecutorService的newTaskFor
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
使用案例
场景一:
假设您有一组求解某个问题的求解器,每个求解器返回某种类型的值 Result,并希望同时运行它们,处理每个返回非空值的结果,在某些方法中使用(Result r)。 你可以这样写:
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;
public class ExecutorCompletionServiceDemo {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
5,
10,
3,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
new ThreadPoolExecutor.AbortPolicy());
CompletionService<Integer> completionService = new ExecutorCompletionService<>(threadPoolExecutor);
System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "] is beginning. ");
List<Future<Integer>> futures = new ArrayList<>(3);
futures.add(completionService.submit(() -> {
Thread.sleep(3000);
System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "] sleep is over. ");
return 1;
}));
futures.add(completionService.submit(() -> {
Thread.sleep(2000);
System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "] sleep is over. ");
return 2;
}));
futures.add(completionService.submit(() -> {
Thread.sleep(1000);
System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "] sleep is over. ");
return 3;
}));
Integer r = 0;
try {
for (int i = 0; i < 3; i++) {
r = completionService.take().get();
System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "] " + r);
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
}
System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "] is ending.");
threadPoolExecutor.shutdown();
}
}
执行结果
[14:03:59:059--main] is beginning.
[14:04:00:000--pool-1-thread-3] sleep is over.
[14:04:00:000--main] 3
[14:04:01:001--pool-1-thread-2] sleep is over.
[14:04:01:001--main] 2
[14:04:02:002--pool-1-thread-1] sleep is over.
[14:04:02:002--main] 1
[14:04:02:002--main] is ending.
场景二:
假设您想使用任务集的第一个非空结果,忽略任何遇到异常的任务,并在第一个任务准备好时取消所有其他任务:
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;
public class ExecutorCompletionServiceDemo {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
5,
10,
3,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
new ThreadPoolExecutor.AbortPolicy());
CompletionService<Integer> completionService = new ExecutorCompletionService<>(threadPoolExecutor);
List<Future<Integer>> futures = new ArrayList<>(3);
futures.add(completionService.submit(() -> {
Thread.sleep(3000);
System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "] sleep is over. ");
return 1;
}));
futures.add(completionService.submit(() -> {
Thread.sleep(2000);
System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "] sleep is over. ");
return 2;
}));
futures.add(completionService.submit(() -> {
Thread.sleep(1000);
System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "] sleep is over. ");
return 3;
}));
Integer r = 0;
try {
for (int i = 0; i < 3; i++) {
r = completionService.take().get();
// 如果结果获取到了,就退出for
if (r != null) {
break;
}
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
for (Future<Integer> f : futures) {
System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "] " + f.isDone());
f.cancel(true);
}
}
System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "] " + r);
threadPoolExecutor.shutdown();
}
}
执行结果
[12:05:39:039--pool-1-thread-3] sleep is over.
[12:05:39:039--main] false
[12:05:39:039--main] false
[12:05:39:039--main] true
[12:05:39:039--main] 3
边栏推荐
- window10局域网共享文件夹流程
- LeetCode数组经典题目(一)
- TiDB3.0- 4.0 内存控制/修改日志保存天数/最大索引长度
- 安装MySQL出现白页面怎么办
- CTF show WEB10
- antd table长表格如何出现滚动条
- Client construction and Optimization Practice
- 2022-2028 global internal gear motor industry research and trend analysis report
- Haidilao is expected to have an annual net loss of 3.8 billion to 4.5 billion and close more than 300 stores
- FD:文件描述符
猜你喜欢

Client construction and Optimization Practice

日記(C語言總結)

26. Hikvision camera configuration and preliminary test

Listing of flaunting shares on Shenzhen Stock Exchange: market value of 4.2 billion, 9-month revenue decreased by 21% year-on-year

Figure neural network and cognitive reasoning - Tang Jie - Tsinghua University

Eureka的TimedSupervisorTask类(自动调节间隔的周期性任务)

Unity写多线程注意事项

This article takes you to interpret the advertising advantages of tiktok

写文章的markdown规则

【MGT】代码解读之model-MGT
随机推荐
showCTF 入门文件包含系列
Diary (C language summary)
测试入门——软件测试模型
5分钟搞懂MySQL - 行转列
CTF中命令执行知识点总结
Antd table how scroll bars appear in long tables
(thinking) C. differential sorting
Global and Chinese market for online automatic optical inspection 2022-2028: Research Report on technology, participants, trends, market size and share
Journal (résumé en langue c)
Markdown rule for writing articles
Figure out how MySQL works
TiDB、mysql修改系统变量/常用语句(杀死process中的进程)
Kotlin middle tail recursive function
Global and Chinese market of electrical connectors 2022-2028: Research Report on technology, participants, trends, market size and share
4.9 commander. js
Two image enhancement methods: image point operation and image graying
4.5 dataset usage document
写文章的markdown规则
Unity development related blog collection
TiDB3.0- 4.0 内存控制/修改日志保存天数/最大索引长度