当前位置:网站首页>How to implement timing tasks for distributed applications in Golang
How to implement timing tasks for distributed applications in Golang
2022-07-30 15:56:00 【Yisuyun】
GolangHow to implement scheduled tasks in distributed applications
这篇“GolangHow to implement scheduled tasks in distributed applications”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“GolangHow to implement scheduled tasks in distributed applications”文章吧.
最小堆
A min heap is a special kind of complete binary tree,The value of any non-leaf node is not greater than its child nodes,如图

via min heap,Key heap based on the most recent execution time of the task,Each time the top element of the heap is taken, that is, the task that needs to be executed recently,设置timer定时器,Trigger task execution after expiration.Due to the characteristics of the heap, the time complexity of each adjustment isO(lgN),Faster performance than normal queues.
在container/heapThe related functions that operate on the heap have been implemented in,We only need to implement the core logic of periodic tasks.
// 运行func (c *Cron) Run() error { // 设置cron已启动,atomic.Bool来保证并发安全 c.started.Store(true) // 主循环 for { // If the stop the exit if !c.started.Load() { break } c.runTask() } return nil}// 核心逻辑func (c *Cron) runTask() { now := time.Now() duration := infTime // 获取堆顶元素 task, ok := c.tasks.Peek() if ok { // If the deleted the popup if !c.set.Has(task.Name()) { c.tasks.Pop() return } // Calculated at current time lookup,设置定时器 if task.next.After(now) { duration = task.next.Sub(now) } else { duration = 0 } } timer := time.NewTimer(duration) defer timer.Stop() // Returns directly when a new element is inserted,Prevent new elements from executing less than the current top-of-heap element select { case <-c.new: return case <-timer.C: } // 弹出任务,执行 go task.Exec() // 计算下次执行时间,如果为0说明任务已结束,Otherwise, reload task.next = task.Next(time.Now()) if task.next.IsZero() { c.set.Delete(task.Name()) } else { c.tasks.Push(task) }}The main logic can be summarized as:
Will be according to the next task execution time to build the minimum heap
Take the top-of-the-heap task each time,设置定时器
If there is a new task added in the middle,转入步骤2
Execute task after timer expires
Take the next task again,转入步骤2,依次执行
时间轮
另一种实现CronThe way is the time wheel,The time wheel goes through a circular queue,Each slot puts tasks that need to be executed due to,According to the fixed interval time wheel rotation,Take the task list in the slot and execute it,如图所示:

The time wheel can be seen as a dial,The time interval in the figure is1秒,总共60个格子,如果任务在3If executed in seconds, it will be placed as a slot3,Executes all tasks on the slot in revolutions per second.
If the execution time exceeds the maximum slot,For example, a task requires63秒后执行(Exceeded maximum grid scale),Generally, through the multi-layer time wheel,Or set an extra variable number of turns,Only the number of laps performed is0的任务.
The time complexity of time wheel insertion isO(1),The complexity of getting the task list isO(1),The worst execution list isO(n).Compare the min heap,Time wheel inserts and deletes elements faster.
核心代码如下:
// 定义type TimeWheel struct { interval time.Duration // 触发间隔 slots int // The total number of slots currentSlot int // Current number of slots tasks []*list.List // 环形列表,Each element is a task list for the corresponding slot set containerx.Set[string] // 记录所有任务key值,Used to check if the task has been deleted tricker *time.Ticker // 定时触发器 logger logr.Logger}func (tw *TimeWheel) Run() error { tw.tricker = time.NewTicker(tw.interval) for { // Simulate time wheel rotation by timer now, ok := <-tw.tricker.C if !ok { break } // 转动一次,执行任务列表 tw.RunTask(now, tw.currentSlot) tw.currentSlot = (tw.currentSlot + 1) % tw.slots } return nil}func (tw *TimeWheel) RunTask(now time.Time, slot int) { // One-time task list for item := taskList.Front(); item != nil; { task, ok := item.Value.(*TimeWheelTask) // The number of task laps is greater than0,不需要执行,reduce the number of turns by one if task.circle > 0 { task.circle-- item = item.Next() continue } // 运行任务 go task.Exec() // Calculate the next run time of the task next := item.Next() taskList.Remove(item) item = next task.next = task.Next(now) if !task.next.IsZero() { tw.add(now, task) } else { tw.Remove(task.Name()) } }}// 添加任务,Calculate slots and laps for next task executionfunc (tw *TimeWheel) add(now time.Time, task *TimeWheelTask) { if !task.initialized { task.next = task.Next(now) task.initialized = true } duration := task.next.Sub(now) if duration <= 0 { task.slot = tw.currentSlot + 1 task.circle = 0 } else { mult := int(duration / tw.interval) task.slot = (tw.currentSlot + mult) % tw.slots task.circle = mult / tw.slots } tw.tasks[task.slot].PushBack(task) tw.set.Insert(task.Name())}The main logic of the time wheel is as follows:
The time to store the task in the corresponding slot
Simulate the rotation of the time wheel through a fixed time
Traverse the current slot's task list after each expiration,If the number of task laps is0则执行
如果任务未结束,Next time slot and the ring number of execution
转入步骤2,依次执行
以上就是关于“GolangHow to implement scheduled tasks in distributed applications”这篇文章的内容,相信大家都有了一定的了解,希望小编分享的内容对大家有帮助,若想了解更多相关的知识内容,请关注亿速云行业资讯频道.
边栏推荐
- 【HMS core】【Media】【视频编辑服务】 在线素材无法展示,一直Loading状态或是网络异常
- 科研中一些常用软件清单
- TiUP 命令概览
- 解析字符串拼接的两种情况
- TiUP terms and core concepts
- 武汉星起航:海外仓基础建设成为跨境电商企业的一大出海利器
- SEATA distributed transaction
- Google engineer "code completion" tool; "Transformers NLP" accompanying book code; FastAPI development template; PyTorch model acceleration tool; cutting-edge papers | ShowMeAI News Daily
- JHM:芳环羟化双加氧酶数据库DARHD建立及相关引物评价
- timed task corn
猜你喜欢

Placement Rules 使用文档

481-82(105、24、82、34、153)

GeoServer

yarn安装详细教程说明、升级教程、修改yarn的全局和缓存目录、yarn基本命令

Google engineer "code completion" tool; "Transformers NLP" accompanying book code; FastAPI development template; PyTorch model acceleration tool; cutting-edge papers | ShowMeAI News Daily

yarn的安装及使用教程

Placement Rules usage documentation

【开发者必看】【push kit】推送服务典型问题合集2

Excel uses Visual Basic Editor to modify macros

Flask之路由(app.route)详解
随机推荐
vite 多页面应用刷新页面时,不会在当前路由中,会返回到根路由
Recent learning defragmentation (24)
Flask入门学习教程
AL遮天传 DL-深度学习模型的训练技巧
FME读写cass数据的方案及操作流程
Databases - create databases, tables, functions, etc.
php如何去除字符串最后一位字符
L2-007 Family property (use of vector, set, map)
php如何截取字符串的前几位
golang modules initialization project
[AGC] Quality Service 1 - Example of Crash Service
RISC-V calling conventions
CAD几个优化设置
nodejs环境变量设置
开源WebGIS架构
GeoServer + openlayers
Sparse-PointNet: See Further in Autonomous Vehicles 论文笔记
【AGC】Open Test Example
[flutter]什么是MaterialApp和Material design
MySql 和 PostgreSQL 数据库 根据一张表update另一张表数据