当前位置:网站首页>Executor - Shutdown、ShutdownNow、awaitTermination 详解与实战
Executor - Shutdown、ShutdownNow、awaitTermination 详解与实战
2022-06-10 22:10:00 【BIT_666】
一.引言
使用 executor 线程池时经常用到 shutdown / shutdownNow + awaitTermination 方法关闭线程池,下面看下几种方法的定义与常见用法。
二.API 释义
1.shutdown
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
*
* @throws SecurityException {@inheritDoc}
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}shutdown 主要工作如下:
Initiates an orderly shutdown in which previously submitted
tasks are executed - 之前提交的继续执行
but no new tasks will be accepted. - 不再接收新任务
This method does not wait for previously submitted tasks to
complete execution. - 该方法不会等待以前提交的任务完成,可以配合 awaitTermination 方法等待。
2.shutdownNow
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution. These tasks are drained (removed)
* from the task queue upon return from this method.
*
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. This implementation
* cancels tasks via {@link Thread#interrupt}, so any task that
* fails to respond to interrupts may never terminate.
*
* @throws SecurityException {@inheritDoc}
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}shutdownNow 主要工作如下:
Attempts to stop all actively executing tasks - 尝试停止正在执行的任务
halts the processing of waiting tasks - 停止等待的任务
and returns a list of the tasks that were awaiting execution - 返回等待列表的任务
These tasks are drained (removed) from the task queue upon return from this method. - 从该方法返回时,将这些任务从队列中删除
Tips:
该任务不会等待主动执行的任务终止,可以配合 awaitTermination 方法等待。
该方法尽可能停止主动执行的任务,通过 Thread.interrupt 实现,未能响应中断的任务可能不会停止
该方法与 shutdown 差别在 interruptIdleWorkers 和 interruptWorkers,后者会调用 interrutp 方法到正在执行的 worker 上,而前者只会取消等待的任务。
3.awaitTermination
* Threads waiting in awaitTermination() will return when the
* state reaches TERMINATED.
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}awaitTermination 方法会等待线程到达 TERMINATED 即已终止的状态。如果线程池已经关闭,则直接返回 true;如果线程池未关闭,该方法会根据 Timeout + TimeUnit 的延时等待线程结束,并根据到期后的线程池状态返回 true 或者 false,注意该方法不会关闭线程池,只负责延时以及检测状态。
4.runState
A.状态
线程 runState 的几种状态与转换
RUNNING:接受新任务并处理排队的任务
SHUTDOWN:不接受新任务,但处理排队的任务
STOP:不接受新任务,不处理排队任务,并中断正在进行的任务
TIDYING:所有任务都已终止,workerCount 为零,线程转换到状态 TIDYING,将运行 terminate() Hook
TERMINATED:终止()已完成
B.转换
RUNNING -> SHUTDOWN : 在调用 shutdown() 时,可能隐含在 finalize() 中
(RUNNING or SHUTDOWN) -> STOP : 调用 shutdownNow()
SHUTDOWN -> TIDYING:当队列和池都为空时
STOP -> TIDYING:当池为空时
TIDYING -> TERMINATED:当 terminate() 钩子方法完成时
三.实践
1.shutdown + awaitTermination(500ms)
processNumBuffer 为 Runnable 内的逻辑,针对给定的一批数字求出最小,最大值并返回结果字符串保存,共 500000 个数字,每 50000 个数字生成一个 Runnable。执行逻辑后调用 shutdown + awaitTermination。getCurrentThread 方法负责打印当前的可用线程,用来观测调用 shutdown 和 awaitTermination 后线程池中线程的变化。
import java.util.concurrent.{CopyOnWriteArraySet, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
object ExecutorPoolShutdown {
// Runnable 内执行逻辑,寻找上下界
def processNumBuffer(nums: Array[Int], taskId: Long): String = {
val maxNum = nums.max
val minNum = nums.min
val log = s"TaskId: $taskId Length: ${nums.length} Min: $minNum Max: $maxNum"
log
}
def main(args: Array[String]): Unit = {
// 存储所有 Task 的日志
val logAll = new StringBuffer()
// 存储所有可用的 TaskId
val taskSet = new CopyOnWriteArraySet[Long]()
// 初始化线程池
val executor = new ThreadPoolExecutor(6, 10, 3000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())
val numIterator = (0 until 500000).iterator
// 每50000个数据生成一个 Task
numIterator.grouped(50000).foreach(group => {
executor.execute(new Runnable() {
override def run(): Unit = {
val taskId = Thread.currentThread().getId
taskSet.add(taskId) // 添加 taskID
val res = processNumBuffer(group.toArray, taskId)
logAll.append(res + "\n") // 添加统计日志
}
})
})
// 调用 shutdown
executor.shutdown()
// 获取当前线程
println("After shutdown...")
getCurrentThread()
val st = System.currentTimeMillis()
// 调用 awaitTerminatio
val state = executor.awaitTermination(500, TimeUnit.MILLISECONDS)
val cost = System.currentTimeMillis() - st
println(s"Executor State: $state Cost: $cost")
// 再次获取当前线程
println("After awaitTermination...")
getCurrentThread()
println(logAll.toString)
println(taskSet.toArray().mkString(","))
}
}由于任务逻辑比较简单,调用 shutdow 和 awaitTermation 后线程数均为 5 且是和 pool 无关的线程,说明线程池的 Task 在 shutdow 后就已经全部运行完毕了,这点从 awaitTermation 返回的状态为 true 和等待时间为 0 也可以看出来。最后就是 500000 / 50000 = 10 共计10条 log,再次说明 task 都执行完毕,最后显示本次任务执行中共使用了 6 个 Task。

Tips:
shutdown 后如果线程池已经结束,则 awaitTermation 方法不会等待,直接返回 true。
2.任务延时 + shutdown + awaitTermination(500ms)
上面的任务执行速度求最大最小值,执行速度很快,所以不好看到 shutdown 和 awaitTerminatioN 的作用,下面模拟一个运行时间稍长的任务,在原始 Runnable 中加入 sleep(2000),延长程序2s的运行时间:
numIterator.grouped(50000).foreach(group => {
executor.execute(new Runnable() {
override def run(): Unit = {
val taskId = Thread.currentThread().getId
taskSet.add(taskId)
Thread.sleep(2000)
val res = processNumBuffer(group.toArray, taskId)
logAll.append(res + "\n")
}
})
})再次运行:

这次的结果和上面完全不同,首先是不管是调用 shutdown 还是 awaitTermination,可以看到活跃线程中都包含 pool-1-thread-x,即这两个 API 调用后逻辑仍在运行没有结束;再看 awaitTermination 返回的状态为 false,代表线程池未完全关闭,cost=507ms,等待 500ms 后线程池仍未完全退出,但是主线程已经结束,所以 LogAll 里没有日志加入,最后只打印出了使用过的 TaskId。
Tips:
shutdown 后线程仍在继续运行,对应前面提到的 shutdown 之后当前运行的任务继续执行,只不过不会增加新任务,而 awaitTermination 后线程依然活跃,对应前面的 awaitTermination 方法只返回线程池关闭状态,不会关闭线程池。
3.任务延时 + shutdown + awaitTermination(2000ms)
调整 awaitTermination 的等待时间,从500ms提升至2000ms
val st = System.currentTimeMillis()
val state = executor.awaitTermination(2000, TimeUnit.MILLISECONDS)
val cost = System.currentTimeMillis() - st
println(s"Executor State: $state Cost: $cost")再次运行:

和上面相比,executor 的关闭状态返回的仍然是 false,但是等到的时间延迟至约 2000ms,可以看到随着等待时间增加,一部分 task已经完成了,但是并没有全部完成。将延时时间修改为 5000ms 再次运行:

等到 4000ms 时 executor 就结束了,所以 awaitTermination 返回为 true,后续也没有 pool-1-thread-x 相关的 task,最终的输出 log 也完整。
4.shutdownNow + awaitTermination(500ms)
shutdownNow 相比 shutdown 会多一个返回值,即等待列表的任务。
val tasks = executor.shutdownNow()
tasks.asScala.foreach(task => {
println(task)
})运行一下:

由于任务运行很快,所以快速任务下,shutdown 和 shutdownNow 结果相同,awaitTermination 返回为 ture 且未等待。
5.任务延时 + shutdownNow + awaitTermination(500ms)
numIterator.grouped(50000).foreach(group => {
executor.execute(new Runnable() {
override def run(): Unit = {
val taskId = Thread.currentThread().getId
taskSet.add(taskId)
Thread.sleep(2000)
val res = processNumBuffer(group.toArray, taskId)
logAll.append(res + "\n")
}
})
})任务增加 sleep 2000ms 后再运行一下:

最上面的任务显示 :
Caused by: java.lang.InterruptedException: sleep interrupted即任务 sleep 期间被 interrupt 了,所以执行的 task 结束,和 shutdown 相比,正在执行的线程并不会被 interrupt;下面打印出来4个 Runnable,因为这四个 Task 还在等待队列中,shutdownNow 直接把他们返回了;最后下面因为 executor 已经关闭,所以状态为 true,等待时间为0。
6.任务延时 + awaitTermination(xxxms) + shutdownNow
通过上面 shotdownNow + awaitTermination 的示例中可以看到,如果任务不能很快执行,那么调用 shotdownNow 的结果就是所有 task 都没结束,任务没有任何改动。如果希望对任务设定一定期间,能完成多少完成多少,可以调整顺序,修改为先 awaitTermination 再 shutdownNow:
val st = System.currentTimeMillis()
val state = executor.awaitTermination(3000, TimeUnit.MILLISECONDS)
val cost = System.currentTimeMillis() - st
println(s"Executor State: $state Cost: $cost")
println("After awaitTermination...")
getCurrentThread()
val tasks = executor.shutdownNow()
tasks.asScala.foreach(task => {
println(task)
})
println("After shutdown...")
getCurrentThread()调整完顺序后再次运行:

等待时间设置为 3000ms,相当于你对你的任务要求是: 3000ms 内能跑完多少算多少,没跑完就不要了;可以看到 awaitTermination 到期后返回状态为 false,说明线程内的任务还未全部结束;再看下面 shutdownNow 后,线程里已经不存在 pool-1-thread-x ,且打印出部分结果,共计6条;最下面是 interrupt 其他正在运行的 task 打印的异常栈,最终程序 exit(0) 正常退出。
四.总结
经过上面的代码分析,对几个方法进行一下总结:
shutdown : 等待执行的任务执行,不再添加新任务
shutdownNow:interrupt 当前执行的任务,不再添加新任务,返回等待的任务
awaitTermination:不影响线程池开关状态,只返回状态,可以堵塞线程等待一定时间
可以结合上面的6个例子以及自己任务的耗时和容忍度,决定怎么组合上面三个 API,如果一定要等到 executor 内的 task 都运行完毕再关闭 executor 且不好估算内部 task 运行时间,可以采用如下操作:
executor.shutdown()
while (!executor.awaitTermination(500, TimeUnit.MILLISECONDS)) {
println("Task is Running...")
}
println("Task is Finish!")通过 while true 保证线程池内 task 都运行完毕才进行后续操作,不过需要注意 Task 内部不要有死循环,否则会导致无法跳出该 While 循环,整个程序堵塞在这里。
边栏推荐
- "Draw the bow as strong, use the arrow as long", Manfu technology opens a new track for the data service industry
- 完美解码PureCodec 20220601
- 电子协会 C语言 1级 7 、画矩形
- [006] initial string
- Vscode common shortcuts
- 线程池的创建
- IP反查域名
- Several reasons and solutions of virtual machine Ping failure
- Image mosaic camera mosaic notes
- Object 有哪些常用方法
猜你喜欢

MA8601 pin√pin替代汤铭FE1.1s无须更改电路板|完美替代FE1.1s方案

乘风破浪,探索数据可视化开发平台 FlyFish 开源背后的秘密!

"Draw the bow as strong, use the arrow as long", Manfu technology opens a new track for the data service industry

Play electronics, poor three generations

数据与信息资源共享平台(八)

Fallback operation in SVN

项目实训11——对数据库的定时备份

Sealem Finance-基于Web3的全新去中心化金融平台

Sealem Finance - a new decentralized financial platform based on Web3

leetcode 130. Surrounded Regions 被围绕的区域(中等)
随机推荐
Several reasons and solutions of virtual machine Ping failure
项目实训12——解析建表的SQL语句
Display of successful cases of target customer matching data table
Vscode common plug-ins and configurations
Online questionnaire system based on php+web+mysql
Relevant knowledge of flowable BPMN
Sentinel
项目实训10——对特定数据库的备份
Object 有哪些常用方法
爬虫学习知识
MySQL related -0416
Flowable BPMN相关知识
Vulnhub's DC3
Software features and functions of the blind box mall app system development
【接口教程】EasyCVR如何通过接口设置平台级联?
Icml2022 | reexamine end-to-end voice to text translation from scratch
Thread pool: a magic weapon for managing threads
"Draw the bow as strong, use the arrow as long", Manfu technology opens a new track for the data service industry
Keras deep learning practice (8) -- using data enhancement to improve neural network performance
Why is the video fusion cloud service easycvr cluster video event query invalid?