当前位置:网站首页>Golang+redis distributed mutex

Golang+redis distributed mutex

2022-06-24 15:47:00 lestat

introduction

Suppose one of our businesses involves data updating , At the same time, there is a large amount of concurrency in the actual scenario . technological process : Read -> modify -> preservation , Based on DB In the case of concurrent processing of layer , This scenario may cause unexpected execution results for some data , At this point, you can consider using distributed locks to solve this problem

Problems to be solved

  1. False release of lock
  2. Business execution timeout causes concurrency
  3. Retry mechanism
  4. GET and DEL Non atomicity

Code

Directory structure :

│  main.go
│
└─demo
        lock.go

lock.go:

package demo

import (
	"context"
	"fmt"
	"github.com/go-redis/redis/v8"
	"math/rand"
	"time"
)

//  Retry count 
var retryTimes = 5

//  Retry frequency 
var retryInterval = time.Millisecond * 50

var rdb = redis.NewClient(&redis.Options{
	Addr:     "localhost:6379",
	Password: "", // no password set
	DB:       0,  // use default DB
})

//  The default expiration time of the lock 
var expiration time.Duration

//  Simulate the locking scenario of distributed services 
func MockTest(tag string) {
	var ctx, cancel = context.WithCancel(context.Background())

	defer func() {
		//  stop it goroutine
		cancel()
	}()

	//  Random value
	lockV := getRandValue()

	lockK := "EXAMPLE_LOCK"

	//  Default expiration time 
	expiration = time.Millisecond * 200

	fmt.Println(tag + " Try to lock ")

	set, err := rdb.SetNX(ctx, lockK, lockV, expiration).Result()

	if err != nil {
		panic(err.Error())
	}

	//  Locking failed , retry 
	if set == false && retry(ctx, rdb, lockK, lockV, expiration, tag) == false {
		fmt.Println(tag + " server unavailable, try again later")
		return
	}

	fmt.Println(tag + " Lock successfully ")

	//  Locking success , Add a new daemon thread 
	go watchDog(ctx, rdb, lockK, expiration, tag)

	//  Deal with business ( Through random time delay simulation )
	fmt.Println(tag + " Wait for the business processing to complete ...")
	time.Sleep(getRandDuration())

	//  Business processing completed 
	//  Release the lock 
	val := delByKeyWhenValueEquals(ctx, rdb, lockK, lockV)
	fmt.Println(tag+" Release results :", val)
}

//  Release the lock 
func delByKeyWhenValueEquals(ctx context.Context, rdb *redis.Client, key string, value interface{}) bool {
	lua := `
--  If the current value is consistent with the lock value , Delete key
if redis.call('GET', KEYS[1]) == ARGV[1] then
	return redis.call('DEL', KEYS[1])
else
	return 0
end
`
	scriptKeys := []string{key}

	val, err := rdb.Eval(ctx, lua, scriptKeys, value).Result()
	if err != nil {
		panic(err.Error())
	}

	return val == int64(1)
}

//  Generate random time 
func getRandDuration() time.Duration {
	rand.Seed(time.Now().UnixNano())
	min := 50
	max := 100
	return time.Duration(rand.Intn(max-min)+min) * time.Millisecond
}

//  Generate random values 
func getRandValue() int {
	rand.Seed(time.Now().UnixNano())
	return rand.Int()
}

//  The guardian thread 
func watchDog(ctx context.Context, rdb *redis.Client, key string, expiration time.Duration, tag string) {
	for {
		select {
		//  Business done 
		case <-ctx.Done():
			fmt.Printf("%s Task to complete , close %s Automatic renewal of \n", tag, key)
			return
			//  Business not completed 
		default:
			//  Automatic renewal 
			rdb.PExpire(ctx, key, expiration)
			//  Continue to wait for 
			time.Sleep(expiration / 2)
		}
	}
}

//  retry 
func retry(ctx context.Context, rdb *redis.Client, key string, value interface{}, expiration time.Duration, tag string) bool {
	i := 1
	for i <= retryTimes {
		fmt.Printf(tag+" The first %d Attempts to lock ...\n", i)
		set, err := rdb.SetNX(ctx, key, value, expiration).Result()

		if err != nil {
			panic(err.Error())
		}

		if set == true {
			return true
		}

		time.Sleep(retryInterval)
		i++
	}
	return false
}

Process description

hypothesis MockTest Methods are business processing methods

  1. initialization context Used to control the exit of the daemon thread
  2. Set random value to try to lock ( Random value can avoid false release when releasing lock )
  3. If locking fails , Try again , The retry mechanism depends on the business , Retry failure processing depends on the business
  4. Open a daemon thread after locking successfully (watchDog), The expiration time used to continuously refresh the lock , Ensure that the lock will not expire during business execution
  5. Simulation of random time-consuming business processing
  6. Release the lock after business processing (lua Processing ensures atomicity , And compare value Avoid accidental release )
  7. adopt cancel Close the daemon thread (watchDog), Avoid deadlock

Respond to the scene

  1. The thread terminates abnormally after obtaining the lock , The lock will be there expire Automatically release after expiration
  2. Thread execution time exceeds the default value of the lock expire, adopt watchDog Automatic renewal , Avoid this

test

main.go:

package main

import (
	"play/demo"
	"time"
)

func main() {
	go demo.MockTest("A")
	go demo.MockTest("B")
	go demo.MockTest("C")
	go demo.MockTest("D")
	go demo.MockTest("E")
	//  Used for testing goroutine Received ctx.Done() Printing after signal 
	time.Sleep(time.Second * 2)
}

result :

$ go run main.go
A Try to lock 
D Try to lock 
E Try to lock 
B Try to lock 
C Try to lock 
D Lock successfully 
D Wait for the business processing to complete ...
B The first 1 Attempts to lock ...
E The first 1 Attempts to lock ...
A The first 1 Attempts to lock ...
C The first 1 Attempts to lock ...
B The first 2 Attempts to lock ...
D Release results : true
B Lock successfully 
E The first 2 Attempts to lock ...
B Wait for the business processing to complete ...
C The first 2 Attempts to lock ...
A The first 2 Attempts to lock ...
D Task to complete , close EXAMPLE_LOCK Automatic renewal of 
A The first 3 Attempts to lock ...
C The first 3 Attempts to lock ...
E The first 3 Attempts to lock ...
B Release results : true
A Lock successfully 
A Wait for the business processing to complete ...
B Task to complete , close EXAMPLE_LOCK Automatic renewal of 
E The first 4 Attempts to lock ...
C The first 4 Attempts to lock ...
A Release results : true
A Task to complete , close EXAMPLE_LOCK Automatic renewal of 
C The first 5 Attempts to lock ...
E The first 5 Attempts to lock ...
C Lock successfully 
C Wait for the business processing to complete ...
E server unavailable, try again later
C Release results : true
C Task to complete , close EXAMPLE_LOCK Automatic renewal of 

If you are lazy, you don't write unit tests ‍

The original blog

原网站

版权声明
本文为[lestat]所创,转载请带上原文链接,感谢
https://yzsam.com/2021/05/20210505100004916b.html