当前位置:网站首页>Golang分布式应用之定时任务
Golang分布式应用之定时任务
2022-08-02 01:52:00 【qingwave】
在系统开发中,有一类任务不是立即执行,而是在未来某个时间点或者按照一定间隔去执行,比如日志定期压缩、报表制作、过期数据清理等,这就是定时任务。
在单机中,定时任务通常需要实现一个类似crontab的系统,一般有两种方式:
- 最小堆,按照任务执行时间建堆,每次取最近的任务执行
- 时间轮,将任务放到时间轮列表中,每次转动取对应的任务列表执行
最小堆
最小堆是一种特殊的完全二叉树,任意非叶子节点的值不大于其子节点,如图
通过最小堆,根据任务最近执行时间键堆,每次取堆顶元素即最近需要执行的任务,设置timer定时器,到期后触发任务执行。由于堆的特性每次调整的时间复杂度为O(lgN),相较于普通队列性能更快。
在container/heap
中已经实现操作堆的相关函数,我们只需要实现定期任务核心逻辑即可。
// 运行
func (c *Cron) Run() error {
// 设置cron已启动,atomic.Bool来保证并发安全
c.started.Store(true)
// 主循环
for {
// 如果停止则退出
if !c.started.Load() {
break
}
c.runTask()
}
return nil
}
// 核心逻辑
func (c *Cron) runTask() {
now := time.Now()
duration := infTime
// 获取堆顶元素
task, ok := c.tasks.Peek()
if ok {
// 如果已删除则弹出
if !c.set.Has(task.Name()) {
c.tasks.Pop()
return
}
// 计算于当前时间查找,设置定时器
if task.next.After(now) {
duration = task.next.Sub(now)
} else {
duration = 0
}
}
timer := time.NewTimer(duration)
defer timer.Stop()
// 当有新元素插入直接返回,防止新元素执行时间小于当前堆顶元素
select {
case <-c.new:
return
case <-timer.C:
}
// 弹出任务,执行
go task.Exec()
// 计算下次执行时间,如果为0说明任务已结束,否则重新入堆
task.next = task.Next(time.Now())
if task.next.IsZero() {
c.set.Delete(task.Name())
} else {
c.tasks.Push(task)
}
}
主要逻辑可总结为:
- 将任务按照下次执行时间建最小堆
- 每次取堆顶任务,设置定时器
- 如果中间有新加入任务,转入步骤2
- 定时器到期后执行任务
- 再次取下个任务,转入步骤2,依次执行
时间轮
另一种实现Cron的方式是时间轮,时间轮通过一个环形队列,每个插槽放入需要到期执行的任务,按照固定间隔转动时间轮,取插槽中任务列表执行,如图所示:
时间轮可看作一个表盘,如图中时间间隔为1秒,总共60个格子,如果任务在3秒后执行则放为插槽3,每秒转动次取插槽上所有任务执行。
如果执行时间超过最大插槽,比如有个任务需要63秒后执行(超过了最大格子刻度),一般可以通过多层时间轮,或者设置一个额外变量圈数,只执行圈数为0的任务。
时间轮插入的时间复杂度为O(1),获取任务列表复杂度为O(1),执行列表最差为O(n)。对比最小堆,时间轮插入删除元素更快。
核心代码如下:
// 定义
type TimeWheel struct {
interval time.Duration // 触发间隔
slots int // 总插槽数
currentSlot int // 当前插槽数
tasks []*list.List // 环形列表,每个元素为对应插槽的任务列表
set containerx.Set[string] // 记录所有任务key值,用来检查任务是否被删除
tricker *time.Ticker // 定时触发器
logger logr.Logger
}
func (tw *TimeWheel) Run() error {
tw.tricker = time.NewTicker(tw.interval)
for {
// 通过定时器模拟时间轮转动
now, ok := <-tw.tricker.C
if !ok {
break
}
// 转动一次,执行任务列表
tw.RunTask(now, tw.currentSlot)
tw.currentSlot = (tw.currentSlot + 1) % tw.slots
}
return nil
}
func (tw *TimeWheel) RunTask(now time.Time, slot int) {
// 一次执行任务列表
for item := taskList.Front(); item != nil; {
task, ok := item.Value.(*TimeWheelTask)
// 任务圈数大于0,不需要执行,将圈数减一
if task.circle > 0 {
task.circle--
item = item.Next()
continue
}
// 运行任务
go task.Exec()
// 计算任务下次运行时间
next := item.Next()
taskList.Remove(item)
item = next
task.next = task.Next(now)
if !task.next.IsZero() {
tw.add(now, task)
} else {
tw.Remove(task.Name())
}
}
}
// 添加任务,计算下一次任务执行的插槽与圈数
func (tw *TimeWheel) add(now time.Time, task *TimeWheelTask) {
if !task.initialized {
task.next = task.Next(now)
task.initialized = true
}
duration := task.next.Sub(now)
if duration <= 0 {
task.slot = tw.currentSlot + 1
task.circle = 0
} else {
mult := int(duration / tw.interval)
task.slot = (tw.currentSlot + mult) % tw.slots
task.circle = mult / tw.slots
}
tw.tasks[task.slot].PushBack(task)
tw.set.Insert(task.Name())
}
时间轮的主要逻辑如下:
- 将任务存在对应插槽的时间
- 通过定时间模拟时间轮转动
- 每次到期后遍历当前插槽的任务列表,若任务圈数为0则执行
- 如果任务未结束,计算下次执行的插槽与圈数
- 转入步骤2,依次执行
总结
本文主要总结了定时任务的两种实现方式,最小堆与时间轮,并分析其核心实现逻辑。
对于执行分布式定时任务,可以借助延时消息队列或者直接使用Kubernetes的CronJob。
自己开发的话可以借助Etcd:
- 中心节点Coordinator将任务按照一定算法(Hash、轮询、或者更复杂的分配算法)将任务与工作节点Worker绑定
- 每个Worker添加到有绑定到自己的任务则取出放到本地的Cron中
- 如果Worker挂掉,执行将其上任务重新绑定即可
本文所有代码见https://github.com/qingwave/gocorex/tree/main/x/cron
Explore more in https://qingwave.github.io
边栏推荐
- LeetCode brushing diary: 53, the largest sub-array and
- bool Frame::PosInGrid(const cv::KeyPoint &kp, int &posX, int &posY)
- oracle查询扫描全表和走索引
- LeetCode刷题日记:153、寻找旋转排序数组中的最小值
- kubernetes之服务发现
- R语言使用cph函数和rcs函数构建限制性立方样条cox回归模型、使用anova函数进行方差分析通过p值确认指定连续变量和风险值HR之间是否存在非线性关系
- 【图像融合】基于加权和金字塔实现图像融合附matlab代码
- Named parameter implementation of JDBC PreparedStatement
- 第一次写对牛客的编程面试题:输入一个字符串,返回该字符串出现最多的字母
- A full set of common interview questions for software testing functional testing [open thinking questions] interview summary 4-3
猜你喜欢
Multi-Party Threshold Private Set Intersection with Sublinear Communication-2021: Interpretation
typescript30 - any type
typescript33 - high-level overview of typescript
Reflex WMS中阶系列6:对一个装货重复run pick会有什么后果?
Entry name ‘org/apache/commons/codec/language/bm/gen_approx_greeklatin.txt’ collided
typescript30-any类型
Data transfer at the data link layer
大话西游无法登陆解决
安全(2)
The characteristics and principle of typescript29 - enumeration type
随机推荐
垃圾回收器CMS和G1
Record the pits where an error occurs when an array is converted to a collection, and try to use an array of packaging types for conversion
6-24 exploit-vnc password cracking
typescript37-class的构造函数实例方法继承(extends)
Multi-Party Threshold Private Set Intersection with Sublinear Communication-2021: Interpretation
Newton's theorem and related corollaries
Kubernetes之本地存储
HSDC is related to Independent Spanning Tree
Redis 底层的数据结构
MySQL8 下载、启动、配置、验证
Ask God to answer, how should this kind of sql be written?
信息化和数字化的本质区别是什么?
typescript30-any类型
字节给我狠狠上了一课:危机来的时候你连准备时间都没有...
Rust P2P网络应用实战-1 P2P网络核心概念及Ping程序
Reflex WMS中阶系列6:对一个装货重复run pick会有什么后果?
使用百度EasyDL实现厂区工人抽烟行为识别
Constructor instance method inheritance of typescript37-class (extends)
Constructor instance method of typescript36-class
Understand the big model in seconds | 3 steps to get AI to write a summary