当前位置:网站首页>How to use Redis for distributed applications in Golang
How to use Redis for distributed applications in Golang
2022-07-30 15:58:00 【Yisuyun】
Golangdistributed applicationRedis怎么使用
这篇文章主要介绍了Golangdistributed applicationRedis怎么使用的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇Golangdistributed applicationRedis怎么使用文章都会有所收获,下面我们一起来看看吧.
正文
RedisIt is a high-performance in-memory database,Often used in distributed systems,In addition to being a distributed cache or a simple in-memory database, there are some special application scenarios,本文结合Golangto write the corresponding middleware.
分布式锁
We can use it in a stand-alone systemsync.Mutex来保护临界资源,There is also such a need in distributed systems,When multiple hosts preempt the same resource,需要加对应的“分布式锁”.
在Redis中我们可以通过setnx命令来实现
如果keyThere is no corresponding value that can be set,If the setting is successful, the locking is successful,key不存在返回失败
释放锁可以通过
del实现.
主要逻辑如下:
type RedisLock struct { client *redis.Client key string expiration time.Duration // 过期时间,Prevent downtime or exceptions}func NewLock(client *redis.Client, key string, expiration time.Duration) *RedisLock { return &RedisLock{ client: client, key: key, expiration: expiration, }}// The lock will succeed the callerid保存到redis中func (l *RedisLock) Lock(id string) (bool, error) { return l.client.SetNX(context.TODO(), l.key, id, l.expiration).Result()}const unLockScript = `if (redis.call("get", KEYS[1]) == KEYS[2]) then redis.call("del", KEYS[1]) return trueendreturn false`// 解锁通过lua脚本来保证原子性,Only locks added by the current caller can be unlockedfunc (l *RedisLock) UnLock(id string) error { _, err := l.client.Eval(context.TODO(), unLockScript, []string{l.key, id}).Result() if err != nil && err != redis.Nil { return err } return nil}An additional timeout period is required to prevent system downtime or deadlock caused by abnormal requests,The timeout period is the maximum estimated running time2倍.
解锁时通过lua脚本来保证原子性,The caller will only unlock the lock it has added.Avoid confusion due to timeouts,例如:进程A在时间t1获取了锁,But due to slow execution,在时间t2锁超时失效,进程B在t3获取了锁,This is if processAUnlocking after execution will cancel the processB的锁.
运行测试
func main() { client := redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "123456", DB: 0, // use default DB }) lock := NewLock(client, "counter", 30*time.Second) counter := 0 worker := func(i int) { for { id := fmt.Sprintf("worker%d", i) ok, err := lock.Lock(id) log.Printf("worker %d attempt to obtain lock, ok: %v, err: %v", i, ok, err) if !ok { time.Sleep(100 * time.Millisecond) continue } defer lock.UnLock(id) counter++ log.Printf("worker %d, add counter %d", i, counter) break } } wg := sync.WaitGroup{} for i := 1; i <= 5; i++ { wg.Add(1) id := i go func() { defer wg.Done() worker(id) }() } wg.Wait()}运行结果,可以看到与sync.Mutex使用效果类似
2022/07/22 09:58:09 worker 5 attempt to obtain lock, ok: true, err: <nil>
2022/07/22 09:58:09 worker 5, add counter 1
2022/07/22 09:58:09 worker 4 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:09 worker 1 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:09 worker 2 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:09 worker 3 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 1 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 4 attempt to obtain lock, ok: true, err: <nil>
2022/07/22 09:58:10 worker 4, add counter 2
2022/07/22 09:58:10 worker 1 attempt to obtain lock, ok: true, err: <nil>
2022/07/22 09:58:10 worker 1, add counter 3
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: true, err: <nil>
2022/07/22 09:58:10 worker 2, add counter 4
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: true, err: <nil>
2022/07/22 09:58:10 worker 3, add counter 5
特别注意的是,在分布式Redis集群中,如果发生异常时(主节点宕机),May reduce the availability of distributed locks,It is possible to pass strongly consistent componentsetcd、ZooKeeper等实现.
分布式过滤器
Suppose you want to develop a crawler service,Crawl millions of web pages,How to tell if a web page has been crawled,In addition to relying on databases and HashMap,We can do it with the help of a Bloom filter.Bloom filters take up extremely low space compared to other methods,And the insert query time is very fast.
Bloom filters are used to determine whether an element is in a collection,利用BitSet
Multiply values when inserting dataHash,将BitSet对应位置1
The same is done multiple times when queryingHashCompare whether it is on all bits1,If so, it exists.
布隆过滤器有一定的误判率,Not suitable for precise query scenarios.Also, removing elements is not supported.通常适用于URL去重、垃圾邮件过滤、Prevent cache breakdown and other scenarios.
在Redis中,我们可以使用自带的BitSet实现,Also useluaThe atomicity of the script avoids inconsistency of data in multiple queries.
const ( // 插入数据,调用setbitSet the corresponding bit setScript = `for _, offset in ipairs(ARGV) do redis.call("setbit", KEYS[1], offset, 1)end` // 查询数据,如果所有位都为1返回true getScript = `for _, offset in ipairs(ARGV) do if tonumber(redis.call("getbit", KEYS[1], offset)) == 0 then return false endendreturn true`)type BloomFilter struct { client *redis.Client key string // 存在redis中的key bits uint // BitSet的大小 maps uint // Hash的次数}func NewBloomFilter(client *redis.Client, key string, bits, maps uint) *BloomFilter { client.Del(context.TODO(), key) if maps == 0 { maps = 14 } return &BloomFilter{ key: key, client: client, bits: bits, maps: maps, }}// 进行多次Hash, Get a list of locationsfunc (f *BloomFilter) getLocations(data []byte) []uint { locations := make([]uint, f.maps) for i := 0; i < int(f.maps); i++ { val := murmur3.Sum64(append(data, byte(i))) locations[i] = uint(val) % f.bits } return locations}func (f *BloomFilter) Add(data []byte) error { args := getArgs(f.getLocations(data)) _, err := f.client.Eval(context.TODO(), setScript, []string{f.key}, args).Result() if err != nil && err != redis.Nil { return err } return nil}func (f *BloomFilter) Exists(data []byte) (bool, error) { args := getArgs(f.getLocations(data)) resp, err := f.client.Eval(context.TODO(), getScript, []string{f.key}, args).Result() if err != nil { if err == redis.Nil { return false, nil } return false, err } exists, ok := resp.(int64) if !ok { return false, nil } return exists == 1, nil}func getArgs(locations []uint) []string { args := make([]string, 0) for _, l := range locations { args = append(args, strconv.FormatUint(uint64(l), 10)) } return args}运行测试
func main() { bf := NewBloomFilter(client,"bf-test", 2^16, 14) exists, err := bf.Exists([]byte("test1")) log.Printf("exist %t, err %v", exists, err) if err := bf.Add([]byte("test1")); err != nil { log.Printf("add err: %v", err) } exists, err = bf.Exists([]byte("test1")) log.Printf("exist %t, err %v", exists, err) exists, err = bf.Exists([]byte("test2")) log.Printf("exist %t, err %v", exists, err)// output// 2022/07/22 10:05:58 exist false, err <nil>// 2022/07/22 10:05:58 exist true, err <nil>// 2022/07/22 10:05:58 exist false, err <nil>}分布式限流器
在golang.org/x/time/rateA token bucket based current limiter is provided in the package,If you want to implement a current limit in a distributed environment, it can be based onRedis Lua脚本实现.
The main principle of the token bucket is as follows:
Suppose a token bucket capacity is burst,per secondqpsrate to put tokens into it
Initially filled with tokens,Token overflow is directly discarded,when requesting a token,Allowed if there are enough tokens in the bucket,否则拒绝
当burst==qps时,严格按照qps限流;当burst>qps时,A certain burst of traffic can be allowed
The main reference here is the official onerate包的实现,Change the core logic to Lua实现.
--- 相关Key--- limit rate key值,对应valueis the current number of tokenslocal limit_key = KEYS[1]--- 输入参数--[[qps: 每秒请求数;burst: 令牌桶容量;now: 当前Timestamp;cost: 请求令牌数;max_wait: 最大等待时间--]]local qps = tonumber(ARGV[1])local burst = tonumber(ARGV[2])local now = ARGV[3]local cost = tonumber(ARGV[4])local max_wait = tonumber(ARGV[5])--- 获取redisThe number of tokens in local tokens = redis.call("hget", limit_key, "token")if not tokens then tokens = burstend--- 上次修改时间local last_time = redis.call("hget", limit_key, "last_time")if not last_time then last_time = 0end--- Latest wait timelocal last_event = redis.call("hget", limit_key, "last_event")if not last_event then last_event = 0end--- Pass the difference between the current time and the last modified time,qpsCalculate the number of tokens obtained at the current timelocal delta = math.max(0, now-last_time)local new_tokens = math.min(burst, delta * qps + tokens)new_tokens = new_tokens - cost --- The latest number of tokens,Reduce request tokens--- If the latest token count is less than0,计算需要等待的时间local wait_period = 0if new_tokens < 0 and qps > 0 then wait_period = wait_period - new_tokens / qpsendwait_period = math.ceil(wait_period)local time_act = now + wait_period --- Timestamp that satisfies the wait interval--- There are two situations in which a request is allowed--- When the number of request tokens is less than burst, The waiting time does not exceed the maximum waiting time,Requests can be fulfilled with supplemental tokens--- qps为0时,As long as the latest token count is not less than0即可local ok = (cost <= burst and wait_period <= max_wait and qps > 0) or (qps == 0 and new_tokens >= 0)--- 设置对应值if ok then redis.call("set", limit_key, new_tokens) redis.call("set", last_time_key, now) redis.call("set", last_event_key, time_act)end--- 返回列表,{是否允许, 等待时间}return {ok, wait_period}在Golang中的相关接口Allow、AllowN、Waitetc. are all by callingreserveN实现
// 调用lua脚本func (lim *RedisLimiter) reserveN(now time.Time, n int, maxFutureReserveSecond int) (*Reservation, error) { // ... res, err := lim.rdb.Eval(context.TODO(), reserveNScript, []string{lim.limitKey}, lim.qps, lim.burst, now.Unix(), n, maxFutureReserveSecond).Result() if err != nil && err != redis.Nil { return nil, err } //... return &Reservation{ ok: allow == 1, lim: lim, tokens: n, timeToAct: now.Add(time.Duration(wait) * time.Second), }, nil}运行测试
func main() { rdb := redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "123456", DB: 0, // use default DB }) r, err := NewRedisLimiter(rdb, 1, 2, "testrate") if err != nil { log.Fatal(err) } r.Reset() for i := 0; i < 5; i++ { err := r.Wait(context.TODO()) log.Printf("worker %d allowed: %v", i, err) }}// output// 2022/07/22 12:50:31 worker 0 allowed: <nil>// 2022/07/22 12:50:31 worker 1 allowed: <nil>// 2022/07/22 12:50:32 worker 2 allowed: <nil>// 2022/07/22 12:50:33 worker 3 allowed: <nil>// 2022/07/22 12:50:34 worker 4 allowed: <nil>The first two requests are inburst内,直接可以获得,Follow the request laterqpsrate of generation.
其他
除此之外,RedisCan also be used as a global count、去重(set)、Publish and subscribe scenarios.RedisThe official also provides some general modules,Filtering can also be achieved by loading these modules、限流等特性,参考modules.
关于“Golangdistributed applicationRedis怎么使用”这篇文章的内容就介绍到这里,感谢各位的阅读!相信大家对“Golangdistributed applicationRedis怎么使用”知识都有一定的了解,大家如果还想学习更多知识,欢迎关注亿速云行业资讯频道.
边栏推荐
- 被捧上天的Scrum敏捷管理为何不受大厂欢迎了?
- How to intercept the first few digits of a string in php
- Placement Rules 使用文档
- MySql 和 PostgreSQL 数据库 根据一张表update另一张表数据
- 华为ADS获取转化跟踪参数报错:getInstallReferrer IOException: getInstallReferrer not found installreferrer
- 应用接入华为分析在应用调试模式下为何没有数据上报?
- 几种常见的存储器
- Store Limit 使用文档
- 谷歌工程师『代码补全』工具;『Transformers NLP』随书代码;FastAPI开发模板;PyTorch模型加速工具;前沿论文 | ShowMeAI资讯日报
- FME读写cass数据的方案及操作流程
猜你喜欢

(Popular Science) What is Fractional NFT (Fractional NFT)

【AGC】质量服务2-性能管理示例

应用OPC解决方案实现控制系统数据的安全交换

ISELED---the new choice of ambient lighting scheme

HTTP缓存小结

AI遮天传 DL-CNN

配置Path环境变量

服务器装好系统的电脑怎么分区

Why is there no data reported when the application is connected to Huawei Analytics in the application debugging mode?
![[flutter] What is MaterialApp and Material design](/img/72/d0845467b33b2291f47e7f54171088.jpg)
[flutter] What is MaterialApp and Material design
随机推荐
应用OPC解决方案实现控制系统数据的安全交换
R中按照数字大小进行排序
L2-007 家庭房产(vector、set、map的使用)
FME实现CAD带属性转SHP数据方法
一文读懂Elephant Swap,为何为ePLATO带来如此高的溢价?
Overview of TiDB Tool Functions
TiUP 术语及核心概念
【重磅来袭】教你如何在RGBD三维重建中获取高质量模型纹理
tiup completion
Shell脚本的概念
动态规划 --- 状态压缩DP 详细解释
TiUP FAQ
ECCV2022 | FPN错位对齐,实现高效半监督目标检测 (PseCo)
使用 TiUP 命令管理组件
服务器装好系统的电脑怎么分区
配置Path环境变量
数据库-SQL
Introduction to TiUP
Mysql database query is very slow. Besides the index, what else can be caused?
Sparse-PointNet: See Further in Autonomous Vehicles 论文笔记