当前位置:网站首页>Minimum heap improves the efficiency of each sort
Minimum heap improves the efficiency of each sort
2022-07-28 15:40:00 【Zhang San admin】
Previously, I wrote a distributed task scheduling system , The tasks should be sorted after each execution , Using the smallest heap does optimize efficiency and cpu
A simple framework for scheduling scheduled tasks is needed in the project , Initially directly from GitHub I searched a star More , Namely https://github.com/robfig/cron, There are 8000+ star. When I first started using it, I found that there was no problem , However, as more and more tasks need to be scheduled by a single machine , The peak is almost close 500QPS, With the promotion and use of business , It can be expected that the growth will be relatively fast , But I have encountered CPU The problem of high utilization , adopt pprof analysis , Many are sorting , Look at the code of this project , The overall implementation process is as follows :
Sort all tasks , Sort according to the next execution time
Select the first task in the array , Calculate the next execution time minus the current time to get the time t, then sleep t
Then the task is traversed from the first element of the array , If this task needs to be scheduled < now, Then perform this task , Recalculate the task after execution next execution time
After each task to be performed is completed , Will reorder this array
Then recycle to find the first task to be executed from the ordered array .
The code is as follows :
for {
// Determine the next entry to run.
sort.Sort(byTime(c.entries))
var timer *time.Timer
if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
// If there are no entries yet, just sleep - it still handles new entries
// and stop requests.
timer = time.NewTimer(100000 * time.Hour)
} else {
timer = time.NewTimer(c.entries[0].Next.Sub(now))
}
for {
select {
case now = <-timer.C:
now = now.In(c.location)
c.logger.Info("wake", "now", now)
// Run every entry whose next time was less than now
for _, e := range c.entries {
if e.Next.After(now) || e.Next.IsZero() {
break
}
c.startJob(e.WrappedJob)
e.Prev = e.Next
e.Next = e.Schedule.Next(now)
c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
}
case newEntry := <-c.add:
timer.Stop()
now = c.now()
newEntry.Next = newEntry.Schedule.Next(now)
c.entries = append(c.entries, newEntry)
c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)
case replyChan := <-c.snapshot:
replyChan <- c.entrySnapshot()
continue
case <-c.stop:
timer.Stop()
c.logger.Info("stop")
return
case id := <-c.remove:
timer.Stop()
now = c.now()
c.removeEntry(id)
c.logger.Info("removed", "entry", id)
}
break
}
}
The problem is obvious , Perform a task ( Or a few tasks ) All recalculate next execution time , reorder , The worst case is every execution 1 A mission , Sort it again , Then perform k The time complexity of a task is O(k*nlogn), This is undoubtedly very inefficient .
So I thought about how to optimize this framework , It is not difficult to think that the first task to be performed every time is to find from a pile of tasks schedule_time The youngest one ( set up schedule_time Is the execution time of the task ), Then it's easy to think of using the smallest heap :
When initializing the task list, directly build a minimum heap
Every time you execute the view peek Whether the element needs to be executed
If you need to execute pop Heap top element , Calculation next execution time , again push Into the heap
No need to execute break Cycle to the outer layer to get the top element , Calculation next_time-now() = need_sleep_time, then select sleep 、add、remove Wait for the operation .
I changed it to min-heap After that , Each time a task is added, it is done through the attributes of the heap up and down adjustment , Time complexity of each task added O(logn), perform k The time complexity of each task is O(klogn). Verified online CPU Reduce the use of 4~5 times .CPU from 50% Reduce to 10% about .
The optimized code is as follows , It's just part of it .
All the code is already in github You've already created a Fork And pushed it up , All single tests Case YedU PASS. If you are interested, you can click to see .https://github.com/tovenja/cron
for {
// Determine the next entry to run.
// Use min-heap no need sort anymore
// There is no need to sort here , because add Directly adjust the heap
//sort.Sort(byTime(c.entries))
var timer *time.Timer
if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
// If there are no entries yet, just sleep - it still handles new entries
// and stop requests.
timer = time.NewTimer(100000 * time.Hour)
} else {
timer = time.NewTimer(c.entries[0].Next.Sub(now))
//fmt.Printf(" %v, %+v\n", c.entries[0].Next.Sub(now), c.entries[0].ID)
}
for {
select {
case now = <-timer.C:
now = now.In(c.location)
c.logger.Info("wake", "now", now)
// Run every entry whose next time was less than now
for {
e := c.entries.Peek()
if e.Next.After(now) || e.Next.IsZero() {
break
}
e = heap.Pop(&c.entries).(*Entry)
c.startJob(e.WrappedJob)
e.Prev = e.Next
e.Next = e.Schedule.Next(now)
heap.Push(&c.entries, e)
c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
}
case newEntry := <-c.add:
timer.Stop()
now = c.now()
newEntry.Next = newEntry.Schedule.Next(now)
heap.Push(&c.entries, newEntry)
c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)
case replyChan := <-c.snapshot:
replyChan <- c.entrySnapshot()
continue
case <-c.stop:
timer.Stop()
c.logger.Info("stop")
return
case id := <-c.remove:
timer.Stop()
now = c.now()
c.removeEntry(id)
c.logger.Info("removed", "entry", id)
}
break
}
}
from :
cnblogs.com/aboutblank/p/14860571.html
More good articles Focus on

边栏推荐
- [leetcode] binary search given an N-element ordered (ascending) integer array num and a target value target, write a function to search the target in num. if the target value exists, return the subscr
- Leetcode bracket validity problem
- Endnote is associated with word
- Among the three "difficult and miscellaneous diseases" of machine learning, causal learning is the breakthrough | Liu Li, Chongqing University
- 软件架构与设计(四)-----数据流架构
- 21、电文处理任务定义
- 使用Mock技术帮助提升测试效率的小tips,你知道几个?
- 详解.NET的求复杂类型集合的差集、交集、并集
- Communication between client and server based on rsocket protocol
- Five connection modes of QT signal and slot
猜你喜欢
随机推荐
4. Main program and cumulative interrupt processing routine implementation code
生命的感悟
融云实时社区解决方案
Grpc protocol buffer
An article about rsocket protocol
monkey压力测试
详解.NET的求复杂类型集合的差集、交集、并集
Pytorch - autograd automatic differentiation
EasyExcel复杂表头导出(一对多)
20、通道分配任务实现
软件架构与设计(九)-----基于组件的架构
Flowable workflow all business concepts
已拿下华为85亿元屏幕订单?维信诺回应:客户要求保密,无法回复!
File and directory operations (5)
Execution process of SQL statement
flowable工作流所有业务概念
Vs usage skills
如何通过adb打开和关闭飞行模式
数牍 X Rust,那些不得不说的事
Summary and arrangement of postgraduate entrance examination information of 985 colleges and universities nationwide









