当前位置:网站首页>【面试:并发篇39:多线程:线程池】ThreadPoolExecutor类-提交、停止
【面试:并发篇39:多线程:线程池】ThreadPoolExecutor类-提交、停止
2022-08-01 08:19:00 【I cream】
【面试:并发篇39:多线程:线程池】ThreadPoolExecutor类-提交、停止
00.前言
如果有任何问题请指出,感谢。
01.提交任务
// 执行任务
void execute(Runnable command);
// 提交任务 task,用返回值 Future 获得任务执行结果
<T> Future<T> submit(Callable<T> task);
// 提交 tasks 中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
// 提交 tasks 中所有任务,带超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
submit方法
submit方法与execute方法的区别在于,execute方法接收的参数是Runnable类型的参数 没有返回值,而submit方法接收的是Callable类型的参数有返回值 且 返回值用Future<>接收,Future<>获取线程的返回值 实现原理 就是之前所讲的保护性暂停模式。
例子
@Slf4j(topic = "c.TestPool1")
public class TestPool1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadPoolExecutor threadpool=new ThreadPoolExecutor(2, 10,
20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
new ThreadPoolExecutor.DiscardOldestPolicy());
Future<String> future = threadpool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
log.debug("running");
Thread.sleep(1000);
return "ok";
}
});
log.debug("{}",future.get());
}
}
结果
17:12:30.718 c.TestPool1 [pool-1-thread-1] - running
17:12:31.731 c.TestPool1 [main] - ok
解释
可以看出我们实现了Callable接口 并返回了字符串"ok" 给Future< String>,之后通过get方法获取到返回数据。
invokeAll方法
invokeAll方法接收的是一个任务集合 且有返回值,线程池中的线程执行这个任务集合。
例子
@Slf4j(topic = "c.TestPool2")
public class TestPool2 {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadpool=new ThreadPoolExecutor(2, 10,
20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
new ThreadPoolExecutor.DiscardOldestPolicy());
List<Future<String>> futures = threadpool.invokeAll(Arrays.asList(
() -> {
log.debug("begin1");
Thread.sleep(1000);
return "1";
},
() -> {
log.debug("begin2");
Thread.sleep(500);
return "2";
},
() -> {
log.debug("begin3");
Thread.sleep(2000);
return "3";
}
));
futures.forEach(f -> {
try {
log.debug("{}",f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
}
}
结果
17:48:49.429 c.TestPool2 [pool-1-thread-2] - begin2
17:48:49.429 c.TestPool2 [pool-1-thread-1] - begin1
17:48:49.930 c.TestPool2 [pool-1-thread-2] - begin3
17:48:51.937 c.TestPool2 [main] - 1
17:48:51.937 c.TestPool2 [main] - 2
17:48:51.937 c.TestPool2 [main] - 3
解释
我们可以看到 任务1与任务2同时被执行,但是因为我们的线程池的核心线程数为2 所以任务3就先放入了任务队列,之后等待任务2执行完后 线程2执行任务3,中间差了0.5s 是因为 任务2执行了0.5s,最终我们遍历返回结果 并打印。
invokeAny方法
invokeAny方法的作用是得到一个最先返回的结果,之后的线程就不再运行。
例子
@Slf4j(topic = "c.TestPool3")
public class TestPool3 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadPoolExecutor threadpool=new ThreadPoolExecutor(3, 10,
20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
new ThreadPoolExecutor.DiscardOldestPolicy());
String future = threadpool.invokeAny(Arrays.asList(
() -> {
log.debug("begin1");
Thread.sleep(1000);
log.debug("end1");
return "1";
},
() -> {
log.debug("begin2");
Thread.sleep(500);
log.debug("end2");
return "2";
},
() -> {
log.debug("begin3");
Thread.sleep(2000);
log.debug("end3");
return "3";
}
));
log.debug("{}",future);
}
}
结果
18:03:18.892 c.TestPool3 [pool-1-thread-3] - begin3
18:03:18.892 c.TestPool3 [pool-1-thread-2] - begin2
18:03:18.892 c.TestPool3 [pool-1-thread-1] - begin1
18:03:19.412 c.TestPool3 [pool-1-thread-2] - end2
18:03:19.412 c.TestPool3 [main] - 2
解释
注意我们此时把核心线程数 设置为3,此时三个任务同时运行,且任务2先运行完 可以看到最后的结果只有任务2的返回值,但是注意一个场景,假如我们现在的核心线程数只有一个 那么很明显我们的线程会先运行任务1 剩下两个任务加入任务队列,等待任务1运行完才会 有空闲线程运行剩下的任务,也就是说任务1是先运行完的 所以返回的结果只有任务1的返回值。
02.关闭线程池
/* 线程池状态变为 SHUTDOWN - 不会接收新任务 - 但已提交任务会执行完 - 此方法不会阻塞调用线程的执行 */
void shutdown();
// 源码
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 修改线程池状态
advanceRunState(SHUTDOWN);
// 仅会打断空闲线程
interruptIdleWorkers();
onShutdown(); // 扩展点 ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等)
tryTerminate();
}
/* 线程池状态变为 STOP - 不会接收新任务 - 会将队列中的任务返回 - 并用 interrupt 的方式中断正在执行的任务 */
List<Runnable> shutdownNow();
// 源码
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 修改线程池状态
advanceRunState(STOP);
// 打断所有线程
interruptWorkers();
// 获取队列中剩余任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 尝试终结
tryTerminate();
return tasks;
}
// 其他方法
// 不在 RUNNING 状态的线程池,此方法就返回 true
boolean isShutdown();
// 线程池状态是否是 TERMINATED
boolean isTerminated();
// 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事情,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
shutdown与shutdownNow例子
@Slf4j(topic = "c.TestShutDown")
public class TestShutDown {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadPoolExecutor threadpool=new ThreadPoolExecutor(2, 10,
20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
new ThreadPoolExecutor.DiscardOldestPolicy());
Future<Integer> result1 = threadpool.submit(() -> {
log.debug("task 1 running...");
Thread.sleep(1000);
log.debug("task 1 finish...");
return 1;
});
Future<Integer> result2 = threadpool.submit(() -> {
log.debug("task 2 running...");
Thread.sleep(1000);
log.debug("task 2 finish...");
return 2;
});
Future<Integer> result3 = threadpool.submit(() -> {
log.debug("task 3 running...");
Thread.sleep(1000);
log.debug("task 3 finish...");
return 3;
});
// shutdown() 部分的代码
log.debug("shutdown");
threadpool.shutdown();
threadpool.submit(()->{
log.debug("task 4 running...");
Thread.sleep(1000);
log.debug("task 4 finish");
return "4";
});
threadpool.awaitTermination(3, TimeUnit.SECONDS);
log.debug("other...");
// shutdownNow部分代码
// log.debug("shutdownNow");
// List<Runnable> runnables = threadpool.shutdownNow();
// log.debug("other.... {}" , runnables);
}
}
shutdown代码结果
18:35:02.128 c.TestShutDown [main] - shutdown
18:35:02.128 c.TestShutDown [pool-1-thread-1] - task 1 running…
18:35:02.128 c.TestShutDown [pool-1-thread-2] - task 2 running…
18:35:03.134 c.TestShutDown [pool-1-thread-1] - task 1 finish…
18:35:03.134 c.TestShutDown [pool-1-thread-2] - task 2 finish…
18:35:03.134 c.TestShutDown [pool-1-thread-1] - task 3 running…
18:35:04.139 c.TestShutDown [pool-1-thread-1] - task 3 finish…
18:35:04.139 c.TestShutDown [main] - other…
解释
可以看出我们在shutdown后依然可以把 shutdown之前的 任务运行完毕,但是shutdown之后的任务就没有再运行了。另外我们关注一下awaitTermination方法 它的作用是等待shutdown部分的任务运行完后 主线程再运行awaitTermination方法之后的代码。
shutdownNow代码结果
18:47:52.344 c.TestShutDown [main] - shutdownNow
18:47:52.344 c.TestShutDown [pool-1-thread-2] - task 2 running…
18:47:52.344 c.TestShutDown [pool-1-thread-1] - task 1 running…
18:47:52.354 c.TestShutDown [main] - other… [[email protected]]
解释
可以看出我们在shutdownNow之后 只有一个任务运行成功了,也就是别的任务都已经被打断了。
边栏推荐
- HoloView--Customization
- 13 - JUC CountDownLatch concurrent programming
- VSCode插件推荐(Rust环境)
- Chapter 9 of Huawei Deep Learning Course - Convolutional Neural Network and Case Practice
- POJ2421道路建设题解
- leetcode 42. 接雨水
- pytest接口自动化测试框架 | 执行失败跳转pdb
- 案例实践 --- Resnet经典卷积神经网络(Mindspore)
- TiDB的真实数据库数据是存在kv和还是pd上?
- pytest接口自动化测试框架 | parametrize中ids的用法
猜你喜欢
随机推荐
Static Pod, Pod Creation Process, Container Resource Limits
22 Niu Ke Duo School 1 I. Chiitoitsu (Probability dp)
gethostbyname \ getaddrinfo 解析域名IP地址不安全的原因
MySQL query advanced - from the use of functions to table joins, do you remember?
app 自动化 打开app (二)
pytest接口自动化测试框架 | 跳过模块
LabVIEW中局部变量和全局变量的分配
GO error handling
[Tear AHB-APB Bridge by hand]~ Why aren't the lower two bits of the AHB address bus used to represent the address?
搜索框字符自动补全
最新的Cesium和Three的整合方法(附完整代码)
Generate pictures based on the content of the specified area and share them with a summary
pytest接口自动化测试框架 | 单个/多个参数
C语言中编译时出现警告C4013(C语言不加函数原型产生的潜在错误)
leetcode-6133:分组的最大数量
小程序更多的手势事件(左右滑动、放大缩小、双击、长按)
扁平数组转树结构实现方式
Monitor the width and height of the parent element, adapt to the size of the plug-in
22牛客多校1 I. Chiitoitsu (概率dp)
pytest接口自动化测试框架 | 执行失败跳转pdb









