当前位置:网站首页>Minimum heap improves the efficiency of each sort
Minimum heap improves the efficiency of each sort
2022-07-28 15:40:00 【Zhang San admin】
Previously, I wrote a distributed task scheduling system , The tasks should be sorted after each execution , Using the smallest heap does optimize efficiency and cpu
A simple framework for scheduling scheduled tasks is needed in the project , Initially directly from GitHub I searched a star More , Namely https://github.com/robfig/cron, There are 8000+ star. When I first started using it, I found that there was no problem , However, as more and more tasks need to be scheduled by a single machine , The peak is almost close 500QPS, With the promotion and use of business , It can be expected that the growth will be relatively fast , But I have encountered CPU The problem of high utilization , adopt pprof analysis , Many are sorting , Look at the code of this project , The overall implementation process is as follows :
Sort all tasks , Sort according to the next execution time
Select the first task in the array , Calculate the next execution time minus the current time to get the time t, then sleep t
Then the task is traversed from the first element of the array , If this task needs to be scheduled < now, Then perform this task , Recalculate the task after execution next execution time
After each task to be performed is completed , Will reorder this array
Then recycle to find the first task to be executed from the ordered array .
The code is as follows :
for {
// Determine the next entry to run.
sort.Sort(byTime(c.entries))
var timer *time.Timer
if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
// If there are no entries yet, just sleep - it still handles new entries
// and stop requests.
timer = time.NewTimer(100000 * time.Hour)
} else {
timer = time.NewTimer(c.entries[0].Next.Sub(now))
}
for {
select {
case now = <-timer.C:
now = now.In(c.location)
c.logger.Info("wake", "now", now)
// Run every entry whose next time was less than now
for _, e := range c.entries {
if e.Next.After(now) || e.Next.IsZero() {
break
}
c.startJob(e.WrappedJob)
e.Prev = e.Next
e.Next = e.Schedule.Next(now)
c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
}
case newEntry := <-c.add:
timer.Stop()
now = c.now()
newEntry.Next = newEntry.Schedule.Next(now)
c.entries = append(c.entries, newEntry)
c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)
case replyChan := <-c.snapshot:
replyChan <- c.entrySnapshot()
continue
case <-c.stop:
timer.Stop()
c.logger.Info("stop")
return
case id := <-c.remove:
timer.Stop()
now = c.now()
c.removeEntry(id)
c.logger.Info("removed", "entry", id)
}
break
}
}
The problem is obvious , Perform a task ( Or a few tasks ) All recalculate next execution time , reorder , The worst case is every execution 1 A mission , Sort it again , Then perform k The time complexity of a task is O(k*nlogn), This is undoubtedly very inefficient .
So I thought about how to optimize this framework , It is not difficult to think that the first task to be performed every time is to find from a pile of tasks schedule_time The youngest one ( set up schedule_time Is the execution time of the task ), Then it's easy to think of using the smallest heap :
When initializing the task list, directly build a minimum heap
Every time you execute the view peek Whether the element needs to be executed
If you need to execute pop Heap top element , Calculation next execution time , again push Into the heap
No need to execute break Cycle to the outer layer to get the top element , Calculation next_time-now() = need_sleep_time, then select sleep 、add、remove Wait for the operation .
I changed it to min-heap After that , Each time a task is added, it is done through the attributes of the heap up and down adjustment , Time complexity of each task added O(logn), perform k The time complexity of each task is O(klogn). Verified online CPU Reduce the use of 4~5 times .CPU from 50% Reduce to 10% about .
The optimized code is as follows , It's just part of it .
All the code is already in github You've already created a Fork And pushed it up , All single tests Case YedU PASS. If you are interested, you can click to see .https://github.com/tovenja/cron
for {
// Determine the next entry to run.
// Use min-heap no need sort anymore
// There is no need to sort here , because add Directly adjust the heap
//sort.Sort(byTime(c.entries))
var timer *time.Timer
if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
// If there are no entries yet, just sleep - it still handles new entries
// and stop requests.
timer = time.NewTimer(100000 * time.Hour)
} else {
timer = time.NewTimer(c.entries[0].Next.Sub(now))
//fmt.Printf(" %v, %+v\n", c.entries[0].Next.Sub(now), c.entries[0].ID)
}
for {
select {
case now = <-timer.C:
now = now.In(c.location)
c.logger.Info("wake", "now", now)
// Run every entry whose next time was less than now
for {
e := c.entries.Peek()
if e.Next.After(now) || e.Next.IsZero() {
break
}
e = heap.Pop(&c.entries).(*Entry)
c.startJob(e.WrappedJob)
e.Prev = e.Next
e.Next = e.Schedule.Next(now)
heap.Push(&c.entries, e)
c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
}
case newEntry := <-c.add:
timer.Stop()
now = c.now()
newEntry.Next = newEntry.Schedule.Next(now)
heap.Push(&c.entries, newEntry)
c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)
case replyChan := <-c.snapshot:
replyChan <- c.entrySnapshot()
continue
case <-c.stop:
timer.Stop()
c.logger.Info("stop")
return
case id := <-c.remove:
timer.Stop()
now = c.now()
c.removeEntry(id)
c.logger.Info("removed", "entry", id)
}
break
}
}
from :
cnblogs.com/aboutblank/p/14860571.html
More good articles Focus on

边栏推荐
- Leetcode - random set, longest multiclass subsequence
- 如何获取及嵌入Go二进制执行包信息
- 800V高压系统
- Explain the difference set, intersection set and union set of complex type set in detail.Net
- Execution process of SQL statement
- 机器学习的3大“疑难杂症”,因果学习是突破口 | 重庆大学刘礼
- 字符串(3)
- Summary and arrangement of postgraduate entrance examination information of 211 colleges and universities nationwide
- flowable工作流所有业务概念
- MATLAB不覆盖导入EXCEL
猜你喜欢
随机推荐
跟我学Rx编程——Concat
CANoe使用教程
Stateflow逻辑系统建模
Canoe tutorial
4.8 HD-GR GNSS导航软件源码
Baidu proposes a dynamic self distillation method to realize dense paragraph retrieval by combining interactive model and double tower model
File and directory operations (5)
有奖活动分享:使用WordPress搭建一个专属自己的博客后最高可领取iPhone13
手把手带你编写一个规范的字符设备驱动
An article about rsocket protocol
腾讯面试之--请你设计一个实现线程池顺序执行
7、实时数据备份和实时时钟相关定义
Tencent interview -- please design a thread pool to implement sequential execution
3. Basic constants and macro definitions
2022年最火的十大测试工具,你掌握了几个
8、实时数据备份和实时时钟功能实现
Grpc frequently asked questions
Summary and arrangement of postgraduate entrance examination information of 985 colleges and universities nationwide
全国211院校考研信息汇总整理
4.8 hd-gr GNSS navigation software source code









