当前位置:网站首页>Golang+redis reentrant lock
Golang+redis reentrant lock
2022-06-24 15:52:00 【lestat】
Concept
Computer science in , Reentrant mutexes ( English :reentrant mutex) yes The mutex A kind of , same Threads Locking it multiple times will not produce Deadlock . Reentrant mutexes are also called Recursive mutexes ( English :recursive mutex) or Recursive lock ( English :recursive lock).
If the locked ordinary mutex is modified 「 Lock 」 operation , The result is either failure , Or it will block to unlock . And if you replace it with a reentrant mutex , If and only if When the thread trying to lock is the thread holding the lock , A similar locking operation will be successful . Reentrant mutexes generally record the number of times they are locked , Only when you perform the same number of unlocking operations can you really unlock .
Recursive mutex solves the common mutex Do not reenter The problem of : If the function holds the lock first , Then execute the callback , But the content of the callback is to call itself , It will produce Deadlock .
Refer to Wikipedia : Reentrant mutexes
Personal view
stay Go There should be few such scenes in , Mutexes are understood literally , Reentry should not be accepted , Scenarios that require reentry should not consider mutexes . Personally, I think the better solution is to avoid such scenarios from the design level . therefore , And be based on redis The mutex of Different , This article is just an attempt at technical implementation , Such scenarios should be avoided as much as possible in practical applications
function
stay be based on redis The mutex of ( Automatic renewal , Automatic retry ) Allow re-entry based on
Key function points realized :
- Lock : When the same thread is locked multiple times, it can be identified as the thread that currently holds the lock through a certain ID , And locking times +1
- Unlock : Number of times to lock when unlocking -1, Until the number is 0, You can unlock (
DEL)
hash Lock structure
Thread | KEY | FIELD | VALUE |
|---|---|---|---|
A | EXAMPLE_LOCK | 304597349587439( The random number corresponding to the thread , Identification lock , Prevent accidental unlocking ) | 1( The number of times the current thread has been locked ) |
The basic flow
In the implementation of non reentrant lock , Just care about the mutex of the lock , False cancellation and automatic renewal , So you can use string Type fit SETNX,PEXPIRE,DEL Complete locking , Unlock and renew
However, a reentrant lock requires a lock that can record the identity of the current thread and the number of times the current thread has been locked , It needs to be used. redis Of hash Instead of string. Because the structure has changed , So I'm locking , There will be corresponding changes in the unlocking process
Time | ThreadA | ThreadB |
|---|---|---|
T1 | Try to lock | Try to lock |
T2 | Locking success (key:EXAMPLE_LOCK,field:304597349587439,value:1) | Locking failed |
T3 | Execute the current method business code | Try to retry locking and wait ThreadA Unlock ( According to the configured interval and the maximum number of retries ) |
T4 | Execute another method business code , It may also be a recursive call , And try locking again | |
T5 | Locking success (key:EXAMPLE_LOCK,field:304597349587439,value:2) | |
T6 | Execute the business code in the new calling method , Until all nested calls are completed | |
T7 | Unlock from the innermost layer ,(key:EXAMPLE_LOCK,field:304597349587439,value:1) | |
T8 | Return to the position where the outermost layer was locked for the first time , Unlock (key:EXAMPLE_LOCK,field:304597349587439,value:0) | |
T9 | If the current number of locks is 0, Release the lock | |
T10 | Locking success |
Lock :
-- KEYS[1]: Lock corresponding key
-- ARGV[1]: The lock expire
-- ARGV[2]: The counter corresponding to the lock field( Random value , Prevent accidental unlocking ), Records the number of times the current thread has been locked
-- Determine whether the lock is idle
if (redis.call('EXISTS', KEYS[1]) == 0) then
-- The thread locks for the first time ( Lock initialization , Value and expiration time )
redis.call('HINCRBY', KEYS[1], ARGV[2], 1);
redis.call('PEXPIRE', KEYS[1], ARGV[1]);
return 1;
end;
-- Determine whether the current thread holds a lock ( The lock is held by a thread , It is usually the first step of the program N Time (N>1) When called in the thread, it will be executed here )
if (redis.call('HEXISTS', KEYS[1], ARGV[2]) == 1) then
-- The number of calls increases
redis.call('HINCRBY', KEYS[1], ARGV[2], 1);
-- Do not process renewal , Renew by daemon thread
return 1;
end;
-- The lock is occupied by another thread , Locking failed
return 0;Unlock :
-- KEYS[1]: Lock corresponding key
-- ARGV[1]: The counter corresponding to the lock field( Random value , Prevent accidental unlocking ), Records the number of times the current thread has been locked
-- Judge hash set Whether there is
if (redis.call('HEXISTS', KEYS[1], ARGV[1]) == 0) then
-- err = redis.Nil
return nil;
end;
-- Calculate the current number of locks
local counter = redis.call('HINCRBY', KEYS[1], ARGV[1], -1);
if (counter > 0) then
-- After multiple calls within the same thread are completed, attempting to release the lock will enter this if Branch
return 0;
else
-- The outermost layer of the same thread ( for the first time ) An attempt to release the lock after the call completes will enter this if Branch
-- <=0 Represents that the inner nested call has been completed , You can unlock
redis.call('DEL', KEYS[1]);
return 1;
end;
-- err = redis.Nil
return nil;Code implementation
The following code only implements reentrant locking , Automatic renewal , Automatic retry function and local test , No consideration is given to encapsulation or reuse !
Directory structure :
├── main.go
└── reentrant_mutex
└── lock.golock.go:
package reentrant_mutex
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"math/rand"
"sync"
"time"
)
const KEY = "EXAMPLE_LOCK"
// Lock Lock for testing
type Lock struct {
// redis Connection pool
Rdb *redis.Client
// hash lock key
Key string
// hash lock field( random number , Real time unique )
Field int
// Lock validity
Expiration time.Duration
// The initial number of recursion levels used for testing
RecursionLevel int
// The maximum number of recursion levels for testing
MaxRecursionLevel int
// Minimum task execution time for testing
Min int
// The maximum execution time of the task used for testing
Max int
// Retry interval for lock failure
RetryInterval time.Duration
// Number of retries failed to lock
RetryTimes int
// Inherit *sync.Once Characteristics of
*sync.Once
// Thread label for test printing
Tag string
}
func init() {
fmt.Println("initializing rand seed for rand testing...")
rand.Seed(time.Now().UnixNano())
}
// Generate a random tag
func getRandTag(n int) string {
var runes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890")
tag := make([]rune, n)
for i := range tag {
tag[i] = runes[rand.Intn(len(runes))]
}
return string(tag)
}
// NewLock initialization
func NewLock(rdb *redis.Client) *Lock {
l := Lock{
Rdb: rdb,
Key: KEY, // Fixed value
Field: rand.Int(),
Expiration: time.Millisecond * 200,
RecursionLevel: 1,
MaxRecursionLevel: 1,
Min: 50,
Max: 100,
RetryInterval: time.Millisecond * 50,
RetryTimes: 5,
Once: new(sync.Once),
Tag: getRandTag(2),
}
return &l
}
// MockBusiness Simulate the locking scenario of distributed services
func (l *Lock) MockBusiness() {
fmt.Printf("%s Of the %d Secondary call ,Field:%d\n", l.Tag, l.RecursionLevel, l.Field)
// Initializes only for the currently invoked ctx, Avoid executing after the reentry call is complete cancel() The resulting upper layer call appears context canceled error
var ctx, cancel = context.WithCancel(context.Background())
defer func() {
// Delay stopping daemon threads
cancel()
}()
set, err := l.lock(ctx)
if err != nil {
fmt.Println(l.Tag + " Locking failed :" + err.Error())
return
}
// Locking failed , retry
if set == false {
res, err := l.retry(ctx)
if err != nil {
fmt.Println(l.Tag + " Failed to retry locking :" + err.Error())
return
}
// Maximum number of retries
if res == false {
fmt.Println(l.Tag + " server unavailable, try again later")
return
}
}
fmt.Println(l.Tag + " Lock successfully ")
// Locking success , Automatic renewal through daemon threads ( Here you can execute asynchronously , Even if the automatic renewal has not been completed before the execution of the business , It will not affect the process )
go l.watchDog(ctx)
fmt.Println(l.Tag + " Wait for the business processing to complete ...")
// Simulation processing business ( Simulate service delay through random time )
time.Sleep(time.Duration(rand.Intn(l.Max-l.Min)+l.Min) * time.Millisecond)
// Simulate a reentrant call ( Test the reentrancy of the lock )
if l.RecursionLevel <= l.MaxRecursionLevel {
l.RecursionLevel += 1
l.MockBusiness()
}
// Business processing completed
// Release the lock
val, err := l.unlock(ctx)
if err != nil {
fmt.Println(l.Tag + " Lock release failed :" + err.Error())
return
}
// The result of a recursive call is false, because lua Script if Branch counter>0, There is no release
fmt.Println(l.Tag+" Release results :", val)
}
// The guardian thread ( adopt sync.Once.Do Ensure that automatic renewal is performed only on the first invocation of the thread )
func (l *Lock) watchDog(ctx context.Context) {
l.Once.Do(func() {
fmt.Printf(" Open the %s The guardian thread of \n", l.Tag)
for {
select {
// Business done
case <-ctx.Done():
fmt.Printf("%s Task to complete , close %s Automatic renewal of \n", l.Tag, l.Key)
return
// Business not completed
default:
// Automatic renewal
l.Rdb.PExpire(ctx, l.Key, l.Expiration)
// Continue to wait for
time.Sleep(l.Expiration / 2)
}
}
})
}
// Lock
func (l *Lock) lock(ctx context.Context) (res bool, err error) {
lua := `
-- KEYS[1]: Lock corresponding key
-- ARGV[1]: The lock expire
-- ARGV[2]: The counter corresponding to the lock field( Random value , Prevent accidental unlocking ), Records the number of times the current thread has been locked
-- Determine whether the lock is idle
if (redis.call('EXISTS', KEYS[1]) == 0) then
-- The thread locks for the first time ( Lock initialization , Value and expiration time )
redis.call('HINCRBY', KEYS[1], ARGV[2], 1);
redis.call('PEXPIRE', KEYS[1], ARGV[1]);
return 1;
end;
-- Determine whether the current thread holds a lock ( The lock is held by a thread , It is usually the first step of the program N Time (N>1) When called in the thread, it will be executed here )
if (redis.call('HEXISTS', KEYS[1], ARGV[2]) == 1) then
-- The number of calls increases
redis.call('HINCRBY', KEYS[1], ARGV[2], 1);
-- Do not process renewal , Renew by daemon thread
return 1;
end;
-- The lock is occupied by another thread , Locking failed
return 0;
`
scriptKeys := []string{l.Key}
val, err := l.Rdb.Eval(ctx, lua, scriptKeys, int(l.Expiration), l.Field).Result()
if err != nil {
return
}
res = val == int64(1)
return
}
// Unlock
func (l *Lock) unlock(ctx context.Context) (res bool, err error) {
lua := `
-- KEYS[1]: Lock corresponding key
-- ARGV[1]: The counter corresponding to the lock field( Random value , Prevent accidental unlocking ), Records the number of times the current thread has been locked
-- Judge hash set Whether there is
if (redis.call('HEXISTS', KEYS[1], ARGV[1]) == 0) then
-- err = redis.Nil
return nil;
end;
-- Calculate the current number of reentrants
local counter = redis.call('HINCRBY', KEYS[1], ARGV[1], -1);
if (counter > 0) then
-- After multiple calls within the same thread are completed, attempting to release the lock will enter this if Branch
return 0;
else
-- The outermost layer of the same thread ( for the first time ) An attempt to release the lock after the call completes will enter this if Branch
-- Less than or equal to 0 Represents that the inner nested call has been completed , You can unlock
redis.call('DEL', KEYS[1]);
return 1;
end;
-- err = redis.Nil
return nil;
`
scriptKeys := []string{l.Key}
val, err := l.Rdb.Eval(ctx, lua, scriptKeys, l.Field).Result()
if err != nil {
return
}
res = val == int64(1)
return
}
// retry
func (l *Lock) retry(ctx context.Context) (res bool, err error) {
i := 1
for i <= l.RetryTimes {
fmt.Printf(l.Tag+" The first %d Retry locking ,Field:%d\n", i, l.Field)
res, err = l.lock(ctx)
if err != nil {
return
}
if res == true {
return
}
time.Sleep(l.RetryInterval)
i++
}
return
}main.go( Test locking ):
package main
import (
"example/reentrant_mutex"
"github.com/go-redis/redis/v8"
"time"
)
func main() {
// Initialize connection pool
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password set
DB: 0, // use default DB
})
max := 2
for i := 0; i < max; i++ {
go reentrant_mutex.NewLock(rdb).MockBusiness()
}
time.Sleep(time.Second * time.Duration(max/2))
}test
Test environment :
Redis:Redis server v=6.2.3
Go:go version go1.14.6 darwin/amd64
Test configuration :
- Number of reentrants per thread 1 Time ( Total locking 2 Time )
- Each thread is turned on 1 A daemon thread that automatically renews (sync.Once.Do Make sure that only 1 Time )
- The delay time of each simulated service is 50~100ms The range of is randomly generated
hashThe lockfieldGenerated by thread initialization , In the process of executionfieldunchanged ,fieldIt is the only criterion to judge whether a lock belongs to the current thread- The number of retries after locking failure is 5, Retry interval is 50ms
- By randomly generated
TagTo identify threads and print processes - Mutually exclusive
KEYbyEXAMPLE_LOCK
test result :
$ go run main.go initializing rand seed for rand testing... oH Of the 1 Secondary call ,Field:3502865528850892548 8U Of the 1 Secondary call ,Field:4832526999886838931 oH Lock successfully oH Wait for the business processing to complete ... Open the oH The guardian thread of 8U The first 1 Retry locking ,Field:4832526999886838931 8U The first 2 Retry locking ,Field:4832526999886838931 oH Of the 2 Secondary call ,Field:3502865528850892548 oH Lock successfully oH Wait for the business processing to complete ... 8U The first 3 Retry locking ,Field:4832526999886838931 8U The first 4 Retry locking ,Field:4832526999886838931 oH Release results : false oH Release results : true oH Task to complete , close EXAMPLE_LOCK Automatic renewal of 8U The first 5 Retry locking ,Field:4832526999886838931 8U Lock successfully 8U Wait for the business processing to complete ... Open the 8U The guardian thread of 8U Of the 2 Secondary call ,Field:4832526999886838931 8U Lock successfully 8U Wait for the business processing to complete ... 8U Release results : false 8U Release results : true 8U Task to complete , close EXAMPLE_LOCK Automatic renewal of
边栏推荐
- Decomposition of Uber dependency injection into dig source code analysis
- clang: warning: argument unused during compilation: ‘-no-pie‘ [-Wunused-command-line-argument]
- Nifi from introduction to practice (nanny level tutorial) - environment
- This website teaches you to imitate more than 100 well-known websites!
- PHP export data as excel table
- Istio FAQ: region awareness does not take effect
- Rush for IPO, Hello, I'm in a hurry
- Jenkins 镜像无法更新插件中心的3种解决方法
- 如何扩展aws主机上的磁盘空间
- 一文详解JackSon配置信息
猜你喜欢

Here comes Wi Fi 7. How strong is it?

推荐几款超级实用的数据分析利器

Solution of intelligent all in one machine in expressway service area

Jenkins 镜像无法更新插件中心的3种解决方法

设备通过国标GB28181接入EasyCVR平台,出现断流情况该如何解决?

Remote connection raspberry pie in VNC Viewer Mode

日志记录真没你想的那么简单

我与“Apifox”的网络情缘

一文详解JackSon配置信息

The catch-up of domestic chips has scared Qualcomm, the leader of mobile phone chips in the United States, and made moves to cope with the competition
随机推荐
PHP application container deployment practice
Linux记录-4.22 MySQL5.37安装(补充)
【云原生 | Kubernetes篇】Kubernetes基础入门(三)
Arrays API
Fine! Huawei firewall dual computer hot standby Technology: HRP, vgmp, VRRP
国产最长寿的热销手机,苹果也不是对手,总算让国产手机找回面子
Precautions for using JMeter suite to build a pressure test environment
Design of CAN bus controller based on FPGA (Part 2)
Istio practical skill: enable accesslog locally
Here comes Wi Fi 7. How strong is it?
60 divine vs Code plug-ins!!
Summary of common tools and usage
设备通过国标GB28181接入EasyCVR平台,出现断流情况该如何解决?
Install the imagemagick7.1 library and the imageick extension for PHP
Nature刊登量子计算重大进展:有史以来第一个量子集成电路实现
Design of CAN bus controller based on FPGA (Part 2)
Nifi from introduction to practice (nanny level tutorial) - environment
Jenkins的便捷式安装
Still worried about missing measurements? Let's use Jacobo to calculate the code coverage
Parameterized tests guide in junit5