当前位置:网站首页>Golang implements reliable delay queue based on redis
Golang implements reliable delay queue based on redis
2022-06-22 18:49:00 【1024 Q】
Preface
The principle,
pending2ReadyScript
ready2UnackScript
unack2RetryScript
ack
consume
PrefaceIn the previous article on delay queues, we mentioned redisson delayqueue Use redis The delay queue is implemented by the ordered set structure of , Unfortunately go There is no such library in the language community . But it's not a big problem , We make our own wheels without them .
The complete code of this article is implemented in hdt3213/delayqueue, Can directly go get Install and use .
The method of implementing delay queue using ordered set structure is well known , It is nothing more than an ordered collection of messages member, Post timestamp as score Use zrangebyscore The command searches for messages that have reached the delivery time and sends them to the consumer .
However, message queuing is not about sending messages to consumers , They also have an important responsibility to ensure delivery and consumption . The usual implementation is to return an acknowledgement to the message queue after the consumer receives the message (ack), If the consumer returns a negative confirmation (nack) Or timeout did not return , The message queue will resend according to the predetermined rules , Stop until the maximum number of retries is reached . How to achieve ack And retry mechanism are the key issues we should consider .
Our message queuing allows distributed deployment of multiple producers and consumers , The consumer instance executes regularly lua Script driven message flow in the queue does not need to deploy additional components . because Redis To ensure the lua Atomicity of script execution , There is no need to lock the whole process .
Consumers use pull mode to get messages , Ensure that each message is delivered at least once , Message queuing will retry messages that have timed out or have been negatively acknowledged (nack) Until the maximum number of retries is reached . At most one consumer is processing a message , Reduces concurrency issues to consider .
Please note that : If the consumption time exceeds MaxConsumeDuration The message queue will think that the consumption has timed out and redeliver , At this time, multiple consumers may consume at the same time .
The specific use is also very simple , Just register the callback function that processes the message and call start() that will do :
package mainimport ("github.com/go-redis/redis/v8""github.com/hdt3213/delayqueue""strconv""time")func main() {redisCli := redis.NewClient(&redis.Options{Addr: "127.0.0.1:6379",})queue := delayqueue.NewQueue("example-queue", redisCli, func(payload string) bool {// Register the callback function for processing messages // return true Indicates that you have successfully consumed , return false The message queue will redeliver the message return true})// Send a delay message for i := 0; i < 10; i++ {err := queue.SendDelayMsg(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3))if err != nil {panic(err)}}// start consumedone := queue.StartConsume()<-done}Because the data is stored in redis So the best we can guarantee is redis No fault and message queue related key Messages will not be lost without external tampering .
The principle,Message queuing involves several key redis data structure :
msgKey: In order to avoid unexpected effects caused by two messages with exactly the same content , We put each message into a key of type string , And assign one UUID As its unique logo . Only... Is stored in other data structures UUID Instead of storing the complete message content . Every msg Have an independent key Instead of putting all the messages into a hash table, it is to take advantage of TTL Mechanism to avoid
pendingKey: Ordered set type ,member For message ID, score For the delivery time unix Time stamp .
readyKey: List the type , Messages to be delivered ID.
unAckKey: Ordered set type ,member For message ID, score Of the retry time unix Time stamp .
retryKey: List the type , Message that the retry time has expired ID
garbageKey: Collection types , It is used to temporarily store messages that have reached the time limit to retry going online ID
retryCountKey: Hash table type , Key is message ID, The value is the number of retries remaining
The flow is shown in the following figure :
Because we allow distributed deployment of multiple consumers , Every consumer is doing it regularly lua Script , Therefore, multiple consumers may be in different states in the above process , We can't predict ( Or control ) The sequence of the five operations in the above figure , There is no control over how many instances are performing the same operation .
Therefore, we need to ensure that the five operations in the figure above meet three conditions :
It's all atomic
The same message will not be processed repeatedly
The message queue is always in the correct state before and after the operation
As long as these three conditions are met , We can deploy multiple instances without using distributed locking and other technologies for state synchronization .
Does it sound a little scary ? In fact, it's very simple , Let's take a closer look ~
pending2ReadyScriptpending2ReadyScript Use zrangebyscore Scan messages that have reached the delivery time ID And move them to ready in :
-- keys: pendingKey, readyKey-- argv: currentTimelocal msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- from pending key Find out the messages that have reached the delivery time if (#msgs == 0) then return endlocal args2 = {'LPush', KEYS[2]} -- Put them in ready key in for _,v in ipairs(msgs) dotable.insert(args2, v) endredis.call(unpack(args2))redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- from pending key Delete posted messages in ready2UnackScriptready2UnackScript from ready perhaps retry To send a message to the consumer and put it in unack in , Be similar to RPopLPush:
-- keys: readyKey/retryKey, unackKey-- argv: retryTimelocal msg = redis.call('RPop', KEYS[1])if (not msg) then return endredis.call('ZAdd', KEYS[2], ARGV[1], msg)return msgunack2RetryScriptunack2RetryScript from retry Find all messages that have reached the retry time and move them to unack in :
-- keys: unackKey, retryCountKey, retryKey, garbageKey-- argv: currentTimelocal msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- A message was found that the retry time has expired if (#msgs == 0) then return endlocal retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- Query the number of retries remaining for i,v in ipairs(retryCounts) dolocal k = msgs[i]if tonumber(v) > 0 then -- The remaining times are greater than 0redis.call("HIncrBy", KEYS[2], k, -1) -- Reduce the number of retries remaining redis.call("LPush", KEYS[3], k) -- Add to retry key in else -- The number of retries remaining is 0redis.call("HDel", KEYS[2], k) -- Delete retry count record redis.call("SAdd", KEYS[4], k) -- Add to trash , Waiting for subsequent deletion endendredis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- Transfer processed messages from unack key Delete in because redis requirement lua The script must be executed before KEYS Parameter to declare what you want to access key, And we will each msg There's a separate one key, We're executing unack2RetryScript I didn't know before msg key Need to be deleted . therefore lua The script only records the messages that need to be deleted in garbage key in , After the script is executed, you can pass del Order them to be deleted :
func (q *DelayQueue) garbageCollect() error {ctx := context.Background()msgIds, err := q.redisCli.SMembers(ctx, q.garbageKey).Result()if err != nil {return fmt.Errorf("smembers failed: %v", err)}if len(msgIds) == 0 {return nil}// allow concurrent cleanmsgKeys := make([]string, 0, len(msgIds))for _, idStr := range msgIds {msgKeys = append(msgKeys, q.genMsgKey(idStr))}err = q.redisCli.Del(ctx, msgKeys...).Err()if err != nil && err != redis.Nil {return fmt.Errorf("del msgs failed: %v", err)}err = q.redisCli.SRem(ctx, q.garbageKey, msgIds).Err()if err != nil && err != redis.Nil {return fmt.Errorf("remove from garbage key failed: %v", err)}return nil}Previously mentioned lua Scripts are executed atomically , No other commands will be inserted into it . gc The function is defined by 3 strip redis Command composition , Other commands may be inserted during execution , However, considering that a message will not be revived after it enters the garbage collection process, there is no need to guarantee 3 Command atomicity .
ackack Just delete the message completely :
func (q *DelayQueue) ack(idStr string) error {ctx := context.Background()err := q.redisCli.ZRem(ctx, q.unAckKey, idStr).Err()if err != nil {return fmt.Errorf("remove from unack failed: %v", err)}// msg key has ttl, ignore result of delete_ = q.redisCli.Del(ctx, q.genMsgKey(idStr)).Err()q.redisCli.HDel(ctx, q.retryCountKey, idStr)return nil}A negative confirmation only requires that unack key The retry time of the message in is changed to now , Subsequent execution unack2RetryScript Will immediately move it to retry key
func (q *DelayQueue) nack(idStr string) error {ctx := context.Background()// update retry time as now, unack2Retry will move it to retry immediatelyerr := q.redisCli.ZAdd(ctx, q.unAckKey, &redis.Z{Member: idStr,Score: float64(time.Now().Unix()),}).Err()if err != nil {return fmt.Errorf("negative ack failed: %v", err)}return nil}consumeThe core logic of message queuing is executed once per second consume function , It is responsible for calling the above script to transfer the message to the correct collection and callback consumer To consume news :
func (q *DelayQueue) consume() error {// perform pending2ready, Transfer the time expired message to readyerr := q.pending2Ready()if err != nil {return err}// Cycle call ready2Unack Pull messages for consumption var fetchCount uintfor {idStr, err := q.ready2Unack()if err == redis.Nil { // consumed allbreak}if err != nil {return err}fetchCount++ack, err := q.callback(idStr)if err != nil {return err}if ack {err = q.ack(idStr)} else {err = q.nack(idStr)}if err != nil {return err}if fetchCount >= q.fetchLimit {break}}// take nack Or put the timeout message into the retry queue err = q.unack2Retry()if err != nil {return err} // Clean up messages that have reached the maximum number of retries err = q.garbageCollect()if err != nil {return err}// Consumption retry queue fetchCount = 0for {idStr, err := q.retry2Unack()if err == redis.Nil { // consumed allbreak}if err != nil {return err}fetchCount++ack, err := q.callback(idStr)if err != nil {return err}if ack {err = q.ack(idStr)} else {err = q.nack(idStr)}if err != nil {return err}if fetchCount >= q.fetchLimit {break}}return nil}So far, a simple and reliable delay queue is ready , Why don't you start trying it out ?
That's all Golang Implementation is based on Redis Details of the reliable delay queue for , More about Golang Redis For information about reliable delay queues, please pay attention to other related articles on the software development network !
边栏推荐
- Unity中通过射线躲避障碍物寻路的一些初步探索
- Five practical tips for power Bi (complimentary books at the end of the article)
- 静态链表(一)
- 2022年R2移动式压力容器充装试题模拟考试平台操作
- I became a big enemy when I bought wanghong ice cream
- Is it safe for Ping An Securities to open an account? What is its relationship with Ping An Bank?
- Zero basic programming / reverse learning / over detection (Frida practice)
- Complete the sqlsession interface and implementation classes
- Correct method of converting Inkscape into DXF file SVG exporting DXF file
- 基于转换器 (MMC) 技术和电压源转换器 (VSC) 的高压直流 (HVDC) 模型(Matlab&Simulink实现)
猜你喜欢

中国移动手机用户缓慢增长,但努力争取高利润的5G套餐用户
![[learn shell programming easily]-4. The difference between single quotation marks and double quotation marks, the operation of integer values, the definition of arrays in the shell and the detailed us](/img/88/5d8800e5723b4e34e832271d139eaa.png)
[learn shell programming easily]-4. The difference between single quotation marks and double quotation marks, the operation of integer values, the definition of arrays in the shell and the detailed us

Tasks and responsibilities of the test team and basic concepts of testing

详解openGauss多线程架构启动过程

Game NFT Market: opensea's most easily cut cake

利用Inkscape转换为dxf文件的正确方法 svg导出dxf文件

Grafana 9 正式发布,更易用,更酷炫了!

SystemVerilog (12) - $unit declaration space

直播预告 | 12位一作华人学者开启 ICLR 2022

Some preliminary explorations of avoiding obstacles and finding paths by rays in unity
随机推荐
【工具】pip和conda的相关使用
< JVM part I: memory and garbage collection part > 08 object instantiation and direct memory
Huawei cloud "digital intelligence" operation and maintenance
国产手机干翻苹果?原来是靠百元机和猛降价实现的
平安证券开户安全吗?它和平安银行是什么关系呢?
项目经理们在哪个时刻特别想逃离工作?
SystemVerilog(十二)-$unit声明空间
传输层 知识点总结
利用Inkscape转换为dxf文件的正确方法 svg导出dxf文件
Live streaming: dongyuhui is so popular. Is there anyone watching liuzhenhong?
Nuxt - 超详细环境搭建及创建项目整体流程(create-nuxt-app)
Killed by the script, and "resurrected" by camping
<JVM上篇:内存与垃圾回收篇>08-对象实例化及直接内存
Golang实现基于Redis的可靠延迟队列
Q: how bad can a programmer be?
Pytoch -- error reporting solution: "torch/optim/adamw.py" beta1, unboundlocalerror: local variable 'beta1‘
网络智能运维助力运维效率提升
The Fourth Youth Life Science Forum | first round notice
中国两颗风云气象“新星”数据产品向全球用户共享
Alibaba cloud cannot find the account security group id problem during the account transfer