当前位置:网站首页>How to implement timing tasks for distributed applications in Golang
How to implement timing tasks for distributed applications in Golang
2022-07-30 15:56:00 【Yisuyun】
GolangHow to implement scheduled tasks in distributed applications
这篇“GolangHow to implement scheduled tasks in distributed applications”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“GolangHow to implement scheduled tasks in distributed applications”文章吧.
最小堆
A min heap is a special kind of complete binary tree,The value of any non-leaf node is not greater than its child nodes,如图
via min heap,Key heap based on the most recent execution time of the task,Each time the top element of the heap is taken, that is, the task that needs to be executed recently,设置timer定时器,Trigger task execution after expiration.Due to the characteristics of the heap, the time complexity of each adjustment isO(lgN),Faster performance than normal queues.
在container/heap
The related functions that operate on the heap have been implemented in,We only need to implement the core logic of periodic tasks.
// 运行func (c *Cron) Run() error { // 设置cron已启动,atomic.Bool来保证并发安全 c.started.Store(true) // 主循环 for { // If the stop the exit if !c.started.Load() { break } c.runTask() } return nil}// 核心逻辑func (c *Cron) runTask() { now := time.Now() duration := infTime // 获取堆顶元素 task, ok := c.tasks.Peek() if ok { // If the deleted the popup if !c.set.Has(task.Name()) { c.tasks.Pop() return } // Calculated at current time lookup,设置定时器 if task.next.After(now) { duration = task.next.Sub(now) } else { duration = 0 } } timer := time.NewTimer(duration) defer timer.Stop() // Returns directly when a new element is inserted,Prevent new elements from executing less than the current top-of-heap element select { case <-c.new: return case <-timer.C: } // 弹出任务,执行 go task.Exec() // 计算下次执行时间,如果为0说明任务已结束,Otherwise, reload task.next = task.Next(time.Now()) if task.next.IsZero() { c.set.Delete(task.Name()) } else { c.tasks.Push(task) }}
The main logic can be summarized as:
Will be according to the next task execution time to build the minimum heap
Take the top-of-the-heap task each time,设置定时器
If there is a new task added in the middle,转入步骤2
Execute task after timer expires
Take the next task again,转入步骤2,依次执行
时间轮
另一种实现CronThe way is the time wheel,The time wheel goes through a circular queue,Each slot puts tasks that need to be executed due to,According to the fixed interval time wheel rotation,Take the task list in the slot and execute it,如图所示:
The time wheel can be seen as a dial,The time interval in the figure is1秒,总共60个格子,如果任务在3If executed in seconds, it will be placed as a slot3,Executes all tasks on the slot in revolutions per second.
If the execution time exceeds the maximum slot,For example, a task requires63秒后执行(Exceeded maximum grid scale),Generally, through the multi-layer time wheel,Or set an extra variable number of turns,Only the number of laps performed is0的任务.
The time complexity of time wheel insertion isO(1),The complexity of getting the task list isO(1),The worst execution list isO(n).Compare the min heap,Time wheel inserts and deletes elements faster.
核心代码如下:
// 定义type TimeWheel struct { interval time.Duration // 触发间隔 slots int // The total number of slots currentSlot int // Current number of slots tasks []*list.List // 环形列表,Each element is a task list for the corresponding slot set containerx.Set[string] // 记录所有任务key值,Used to check if the task has been deleted tricker *time.Ticker // 定时触发器 logger logr.Logger}func (tw *TimeWheel) Run() error { tw.tricker = time.NewTicker(tw.interval) for { // Simulate time wheel rotation by timer now, ok := <-tw.tricker.C if !ok { break } // 转动一次,执行任务列表 tw.RunTask(now, tw.currentSlot) tw.currentSlot = (tw.currentSlot + 1) % tw.slots } return nil}func (tw *TimeWheel) RunTask(now time.Time, slot int) { // One-time task list for item := taskList.Front(); item != nil; { task, ok := item.Value.(*TimeWheelTask) // The number of task laps is greater than0,不需要执行,reduce the number of turns by one if task.circle > 0 { task.circle-- item = item.Next() continue } // 运行任务 go task.Exec() // Calculate the next run time of the task next := item.Next() taskList.Remove(item) item = next task.next = task.Next(now) if !task.next.IsZero() { tw.add(now, task) } else { tw.Remove(task.Name()) } }}// 添加任务,Calculate slots and laps for next task executionfunc (tw *TimeWheel) add(now time.Time, task *TimeWheelTask) { if !task.initialized { task.next = task.Next(now) task.initialized = true } duration := task.next.Sub(now) if duration <= 0 { task.slot = tw.currentSlot + 1 task.circle = 0 } else { mult := int(duration / tw.interval) task.slot = (tw.currentSlot + mult) % tw.slots task.circle = mult / tw.slots } tw.tasks[task.slot].PushBack(task) tw.set.Insert(task.Name())}
The main logic of the time wheel is as follows:
The time to store the task in the corresponding slot
Simulate the rotation of the time wheel through a fixed time
Traverse the current slot's task list after each expiration,If the number of task laps is0则执行
如果任务未结束,Next time slot and the ring number of execution
转入步骤2,依次执行
以上就是关于“GolangHow to implement scheduled tasks in distributed applications”这篇文章的内容,相信大家都有了一定的了解,希望小编分享的内容对大家有帮助,若想了解更多相关的知识内容,请关注亿速云行业资讯频道.
边栏推荐
- 几种常见的存储器
- 481-82 (105, 24, 82, 34, 153),
- FME realizes the method of converting CAD with attribute to SHP data
- 武汉星起航:海外仓基础建设成为跨境电商企业的一大出海利器
- TiUP 术语及核心概念
- Overview of TiUP commands
- ISELED---the new choice of ambient lighting scheme
- SEATA distributed transaction
- EST综述:eDNA的多种状态以及在水环境中持久性的认知
- Manage components using TiUP commands
猜你喜欢
Placement Rules 使用文档
【重磅来袭】教你如何在RGBD三维重建中获取高质量模型纹理
Local Transactions vs Distributed Transactions
70 lines of code, a desktop automatic translation artifact
FME realizes the method of converting CAD with attribute to SHP data
【HMS core】【Media】【Video Editing Service】 The online material cannot be displayed, it is always in the loading state or the network is abnormal
Mysql database query is very slow. Besides the index, what else can be caused?
深度学习遇到报错Bug解决方法(不定时更新)
华为「天才少年」计划招募的博士们,迎来首秀!
【HMS core】【Media】【视频编辑服务】 在线素材无法展示,一直Loading状态或是网络异常
随机推荐
【开发者必看】【push kit】推送服务典型问题合集2
JHM:芳环羟化双加氧酶数据库DARHD建立及相关引物评价
Example of video switching playback (video switching example) code
tiup env
【HMS core】【Media】【Video Editing Service】 The online material cannot be displayed, it is always in the loading state or the network is abnormal
nodejs environment variable settings
TiDB 工具适用场景
Sleuth+Zipkin (visualization) service link tracking
Introduction to golang image processing library image
[flutter]什么是MaterialApp和Material design
Why is there no data reported when the application is connected to Huawei Analytics in the application debugging mode?
xxl-job源码解析(技术分享)
影像信息提取DEM
【嵌入式】适用于Cortex-M3(STM32F10x)的IQmath库
[AGC] Quality Service 1 - Example of Crash Service
TiDB 工具下载
websocket flv 客户端解封包
[HMS core] [FAQ] Collection of typical problems of push kit, AR Engine, advertising service, scanning service 2
2022最新 | 室外单目深度估计研究综述
(Popular Science) What is Fractional NFT (Fractional NFT)