当前位置:网站首页>ScheduledThreadPoolExecutor源码解读(二)
ScheduledThreadPoolExecutor源码解读(二)
2022-06-28 00:27:00 【明斯克开源】
null,
triggerTime(initialDelay, unit),
//重点! 传入的delay取反了,用delay正负来区分执行间隔是否固定
unit.toNanos(-delay));
//将任务包装成RunnableScheduledFuture对象
//decorateTask直接返回sft,这个函数的意图是让开发者DIY继承实现的
RunnableScheduledFuture t = decorateTask(command, sft);
sft.outerTask = t;
//延迟执行 加延迟阻塞队列+启动一个空的Worker线程
delayedExecute(t);
return t;
}
[](()1、decorateTask留给开发者去实现
看decorateTask源码,其有两个参数,任务原始对象runnable和把原始任务包装成RunnableScheduledFuture对象。decorateTask函数直接返回RunnableScheduledFuture对象,没有做什么事情,那其意图是什么呢?
decorateTask是想让开发者继承ScheduledThreadPoolExecutor实现定制化定时线程池时,可以实现这个函数,对原始任务对象和包装后任务对象做特殊DIY处理。
protected RunnableScheduledFuture decorateTask(
Runnable runnable, RunnableScheduledFuture task) {
return task;
}
[](()2、delayedExecute延迟调度
delayedExecute()是延迟执行和周期性执行的主函数,其基本流程如下:
判断线程池的状态,
runstate为shutdown将拒绝任务提交。任务处于正常运行状态,则将任务直接加入阻塞工作队列。
再次判断线程池的状态,
runstate为shutdown,再判断是否是周期性任务(isPeriodic),不同的性质不同的处理策略。一起正常预启动一个空
Worker线程,循环从阻塞队列中消费任务。
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
//1、直接加入延时阻塞队列
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
//2、预启动一个空的worker
ensurePrestart();
}
}
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
//创建一个空worker,并且启动
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
[](()三、ScheduledFutureTask时间调度执行的核心
可以看出提交的任务最重被包装成ScheduledFutureTask,然后加到工作队列由Worker工作线程去消费了。
延迟执行和周期性执行的核心代码也就在于ScheduledFutureTask。
[](()1、基本架构
ScheduledFutureTask继承了FutureTask并实现了接口RunnableScheduledFuture。
private class ScheduledFutureTask
extends FutureTask implements RunnableScheduledFuture {
/** Sequence number to break ties FIFO */
private final long sequenceNumber;
/** The time the task is enabled to execute in nanoTime units */
任务被调用的执行时间
private long time;
/**
- Period in nanoseconds for repeating tasks.
*/
//周期性执行的时间间隔
private final long period;
/** The actual task to be re-enqueued by reExecutePeriodic */
RunnableScheduledFuture outerTask = this;
/**
Index into delay queue, to support faster cancellation.
索引到延迟队列为了支持快速取消
*/
int heapIndex;
/**
- Creates a one-shot action with given nanoTime-based trigger time.
*/
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
}
ScheduledFutureTask实现了接口Delayed,所以需要重写两个方法getDelay、compareTo。
//获取当前延迟时间(距离下次任务执行还有多久)
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
/**
比较this 和 other谁先执行
@param other
@return <=0 this先执行
*/
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
//比较Delay
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
[](()2、延迟执行和周期执行区别
延迟执行和周期执行区别在于period:
延迟执行
period=0。周期性执行
period!=0。固定频率(
AtFixedRate)周期性执行period>0,每次开始执行的时间的间隔是固定的,不受任务执行时长影响。固定延迟时间(
WithFixedDelay)周期性执行period<0,每次执行的时间受任务执行时长影响,是任务执行结束后的当前时间+ (-p)。
public boolean isPeriodic() {
return period != 0;
}
private void setNextRunTime() {
long p = period;
//AtFixedRate 当传入period > 0 时 ,每次执行的时间的间隔是固定的
if (p > 0)
time += p;
else
//WithFixedDelay 当传入period < 0 时,每次执行的时间受任务执行时长影响,是任务执行结束后的当前时间+ (-p)
time = triggerTime(-p);
}
long triggerTime(long delay) {
return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
[](()3、heapIndex支持快速取消定时任务
ScheduledFutureTask还有一个变量heapIndex,是记录任务在阻塞队列的索引的,其方便支持快速取消任务和删除任务。但是其并不会作为删除任务的位置判断,只是当用于判断惹怒是否在阻塞队列中:heapIndex >= 0 在阻塞队列中,取消任务时需要同时从阻塞队列删除任务;heapIndex < 0不在阻塞队列中。
阻塞队列DelayedWorkQueue的每次堆化siftUp()、siftDown(),以及remove()都维护着heapIndex,想必这也是ScheduledThreadPoolExecutor自行定制延迟阻塞队列的原因之一。
public boolean cancel(boolean mayInterru 《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》无偿开源 威信搜索公众号【编程进阶路】 ptIfRunning) {
boolean cancelled = super.cancel(mayInterruptIfRunning);
if (cancelled && removeOnCancel && heapIndex >= 0)
//从延迟阻塞队列中删除任务
remove(this);
return cancelled;
}
[](()4、核心逻辑run()
ScheduledFutureTask间接实现了接口Runnable,其核心逻辑就在run():
周期性任务,
continueExistingPeriodicTasksAfterShutdown默认为false,意为调用shutdown()时,会取消和阻止周期性任务的执行。非周期性任务,
executeExistingDelayedTasksAfterShutdown默认为true,意为调用shutdown()时,不会取消和阻止非周期性任务的执行。
public void run() {
boolean periodic = isPeriodic();
//当runState为SHUTDOWN时,非周期性任务继续,周期性任务会中断取消
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
//非周期性任务,只执行一次
ScheduledFutureTask.super.run();
//runAndReset返回false周期性任务将不再执行
else if (ScheduledFutureTask.super.runAndReset()) {
//runAndReset() 周期性任务执行并reset
//设置下一次执行时间
setNextRunTime();
//把自己再放回延时阻塞队列
reExecutePeriodic(outerTask);
}
}
[](()(1)canRunInCurrentRunState不同任务性质不同策略
代码一开始判断线程池的运行状态canRunInCurrentRunState,当线程池处于SHUTDOWN状态时,是否是周期性任务有不同的策略:
周期性任务,
continueExistingPeriodicTasksAfterShutdown默认为false,意为线程池被关闭时,应该取消和阻止周期性任务的执行。非周期性任务,
executeExistingDelayedTasksAfterShutdown默认为true,意为线程池被关闭时,不会取消和阻止非周期性任务的执行。
boolean canRunInCurrentRunState(boolean periodic) {
return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}
/**
- False if should cancel/suppress periodic tasks on shutdown.
*/
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
/**
- False if should cancel non-periodic tasks on shutdown.
*/
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
[](()(2)单次执行调度父类FutureTask.run
延迟执行任务,即只执行一次,调用了父类的FutureTask.run()。提交的任务如果是Runnable型,会被包装成Callable型作为FutureTask的成员变量。FutureTask.run()中直接调度执行任务的代码call(),同时返回结果。
需要注意的是,任务代码c.call()若抛出异常会被FutureTask捕获处理,这样对外查找问题不利,所以最好在任务run()或者call()的核心代码用try-catch包起来。
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//调用callable的call,并设置返回值
//如果传进来的任务是Runnable,会被转换成callable
result = c.call();
//若运行异常,ran=false,异常会被捕获处理
//所以传进来的任务的run或者call代码块最好try-catch下
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
边栏推荐
- General timer and interrupt of stm32
- Jenkins - 邮件通知 Email Notification 插件
- Numpy----np.reshape()
- The interviewer asked: this point of JS
- How to use metauniverse technology to create a better real world
- SQL injection bypass (2)
- [Yocto RM]8 - OpenEmbedded Kickstart (.wks) Reference
- KVM相关
- 匿名挂载&具名挂载
- Jenkins - groovy postbuild plug-in enriches build history information
猜你喜欢
![[elt.zip] openharmony paper Club - memory compression for data intensive applications](/img/54/8248c1d95b04498d44a00ea94a6c85.png)
[elt.zip] openharmony paper Club - memory compression for data intensive applications

架构高可靠性应用知识图谱 ----- 微服务架构图谱

【ELT.ZIP】OpenHarmony啃论文俱乐部—数据密集型应用内存压缩

Original | 2025 to achieve the "five ones" goal! The four products of Jiefang power are officially released

CVPR22收录论文|基于标签关系树的层级残差多粒度分类网络

SQL injection bypass (2)

Jenkins - accédez à la variable de paramètre personnalisée Jenkins, en traitant les espaces dans la valeur de la variable

系统管理员设置了系统策略,禁止进行此安装。解决方案

SQL 注入绕过(五)

A set of sai2 brushes is finally finished! Share with everyone!
随机推荐
Where can I open an account for foreign exchange futures? Which platform is safer for cash in and out?
要搞清楚什么是同步,异步,串行,并行,并发,进程,线程,协程
Adding text labels to cesium polygons the problem of polygon center point offset is solved
树莓派实现温控风扇智能降温
[Yongyi XY chair] trial experience
Machine learning notes - time series as features
Jenkins - email notification plug-in
Prometeus 2.35.0 新特性
To understand what is synchronous, asynchronous, serial, parallel, concurrent, process, thread, and coroutine
Jenkins - Pipeline 语法
TD Hero 线上发布会|7月2日邀你来
Cesium color color (assignment) random color
SQL 注入绕过(四)
Numpy----np.tile()函数解析
文件传输协议--FTP
Capacitor
Learn pickle
[Yocto RM] 4 - Source Directory Structure
geojson 格式說明(格式詳解)
[elt.zip] openharmony paper Club - memory compression for data intensive applications