当前位置:网站首页>redis分布式锁和看门狗的实现
redis分布式锁和看门狗的实现
2022-08-02 14:01:00 【意大利面拌42号混凝土】
redis分布式锁和看门狗的实现
- 分布式应用进行逻辑处理时经常会遇到并发问题。
- 比如一个操作要修改用户的状态,修改状态需要先读出用户的状态,在内存里进行修 改,改完了再存回去。如果这样的操作同时进行了,就会出现并发问题,因为读取和保存状 态这两个操作不是原子的。(Wiki 解释:所谓原子操作是指不会被线程调度机制打断的操 作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch 线程切换。)
演示不使用分布式锁
- 以下均使用python代码进行演示
逻辑:开启多线程模拟并发存储数据,在最后阶段取出数据与redis中存储的数据进行比对
import threading
import redis as _redis
redis = _redis.Redis(host='localhost', port=6379, decode_responses=True, db=11)
def storage_redis(value):
redis.set('age', value)
if __name__ == '__main__':
for i in [1,2,3,4,5]:
t = threading.Thread(target=storage_redis, args=(i,))
t.start()
print('--------------------')
print(redis.get('age'))
- 启动脚本运行,结果如下图
- 看一看,是不是直接裂开,数据存的和取的结果根本不一样
像这种应该办,这个时候就需要用到分布式锁了,类似于MySQL中悲观锁与乐观锁
redis分布式锁实现
分布式锁本质上要实现的目标就是在 Redis 里面占一个“茅坑”,当别的进程也要来占 时,发现已经有人蹲在那里了,就只好放弃或者稍后再试。
占坑一般是使用 setnx(set if not exists) 指令,只允许被一个客户端占坑。先来先占, 用 完了,再调用 del 指令释放茅坑。
但是有个问题,如果逻辑执行到中间出现异常了,可能会导致 del 指令没有被调用,这样 就会陷入死锁,锁永远得不到释放
于是我们在拿到锁之后,再给锁加上一个过期时间,比如 5s,这样即使中间出现异常也 可以保证 5 秒之后锁会自动释放。
但是以上逻辑还有问题。如果在 setnx 和 expire 之间服务器进程突然挂掉了,可能是因 为机器掉电或者是被人为杀掉的,就会导致 expire 得不到执行,也会造成死锁。
这种问题的根源就在于 setnx 和 expire 是两条指令而不是原子指令。如果这两条指令可 以一起执行就不会出现问题。也许你会想到用 Redis 事务来解决。但是这里不行,因为 expire 是依赖于 setnx 的执行结果的,如果 setnx 没抢到锁,expire 是不应该执行的。事务里没有 ifelse 分支逻辑,事务的特点是一口气执行,要么全部执行要么一个都不执行。
为了治理这个乱象,Redis 2.8 版本中作者加入了 set 指令的扩展参数,使得 setnx 和 expire 指令可以一起执行,彻底解决了分布式锁的乱象。从此以后所有的第三方分布式锁 library 可以休息了。 > set lock:codehole true ex 5 nx OK … do something critical … > del lock:codehole 上面这个指令就是 setnx 和 expire 组合在一起的原子指令,它就是分布式锁的奥义所在
import threading
import time
import uuid
import redis as _redis
from redis import WatchError
from threading import Timer
redis = _redis.Redis(host='localhost', port=6379, decode_responses=True, db=11)
class DistributedLocks(object):
""" Redis分布式锁实现类 """
lock_timeout = None
lock_name = None
def __init__(self,coon,lock_name=None , acquire_timeout=3 , lock_timeout=3, **kwargs):
""" coon --> redis的连接 lock_name --> 锁的名称 acquire_timeout --> 获取锁的超时时间(秒) lock_timeout --> 锁的超时时间(秒) """
self.coon = coon
if not lock_name:
lock_name = 'default'
self.lock_name = f'lock:{
lock_name}'
self.rem_ark = str(uuid.uuid4())
self.lock_timeout = lock_timeout
self.end = time.time() + acquire_timeout
DistributedLocks.lock_timeout = self.lock_timeout
DistributedLocks.lock_name = self.lock_name
def _lock(self) -> bool:
""" 加锁 """
while time.time() <= self.end:
# 如果锁不存在,设置锁并给锁加上超时时间
if self.coon.set(self.lock_name,self.rem_ark,ex=self.lock_timeout,nx=True):
return True
time.sleep(0.001)
return False
def _unlock(self) -> bool:
""" 释放锁 方法一事务 """
# 使用redis中的事务pipeline
pipe = self.coon.pipeline(True)
while True:
try:
# 通过watch监视锁名称,如果锁名称一旦改变,就抛出WatchError
pipe.watch(self.lock_name)
rem_rak = self.coon.get(self.lock_name)
if rem_rak and rem_rak == self.rem_ark:
pipe.multi()
pipe.delete(self.lock_name)
pipe.excute()
return True
pipe.unwatch()
break
except WatchError:
pass
return False
def release_lock(self):
""" 释放锁 方法二lua脚本 """
unlock_lua = """ if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end """
unlock = self.coon.register_script(unlock_lua)
result = unlock(key=[self.lock_name,],args=[self.rem_ark])
if result:
return True
return False
def __enter__(self):
self._lock()
def __exit__(self, exc_type, exc_val, exc_tb):
try:
# self._unlock()
self.release_lock()
except Exception:
return False
def storage_redis(value):
# 加锁与释放锁使用类DistributedLocks来实现
DL = DistributedLocks(redis,'test') # 实例化分布式锁
with DL:
redis.set('age', value)
print(redis.get('age'))
if __name__ == '__main__':
for i in range(5):
t = threading.Thread(target=storage_redis, args=(i,))
t.start()
print('--------------------')
print(redis.get('age'))
这样看下来就不会发生数据错乱的问题了。但是还有一个问题,超时问题
Redis 的分布式锁不能解决超时问题,如果在加锁和释放锁之间的逻辑执行的太长,以至 于超出了锁的超时限制,就会出现问题。因为这时候锁过期了,第二个线程重新持有了这把锁, 但是紧接着第一个线程执行完了业务逻辑,就把锁给释放了,第三个线程就会在第二个线程逻 辑执行完之间拿到了锁。
为了避免这个问题,Redis 分布式锁不要用于较长时间的任务。如果真的偶尔出现了,数据出现的小波错乱可能需要人工介入解决
这个时候就需要看门狗
来做了
redis看门狗的实现
import threading
import time
import uuid
import redis as _redis
from redis import WatchError
from threading import Timer
redis = _redis.Redis(host='localhost', port=6379, decode_responses=True, db=11)
class WatchDog(object):
""" Python实现redis的看门狗机制 """
def __init__(self,timeout,user_handler=None):
self.timeout = timeout
self.user_handler = user_handler if user_handler else self.default_handler
self.timer = Timer(self.timeout,self.user_handler)
self.timer.start()
def reset(self):
"""计时器重启"""
self.timer.cancel()
timer = Timer(self.timeout,self.user_handler)
timer.start()
def stop(self):
"""计时器停止"""
self.timer.cancel()
def default_handler(self):
pass
@classmethod
def error_handler(cls):
pass
class DistributedLocks(object):
""" Redis分布式锁实现类 """
lock_timeout = None
lock_name = None
def __init__(self,coon,lock_name=None , acquire_timeout=3 , lock_timeout=3, **kwargs):
""" coon --> redis的连接 lock_name --> 锁的名称 acquire_timeout --> 获取锁的超时时间(秒) lock_timeout --> 锁的超时时间(秒) """
self.coon = coon
if not lock_name:
lock_name = 'default'
self.lock_name = f'lock:{
lock_name}'
self.rem_ark = str(uuid.uuid4())
self.lock_timeout = lock_timeout
self.end = time.time() + acquire_timeout
DistributedLocks.lock_timeout = self.lock_timeout
DistributedLocks.lock_name = self.lock_name
def _lock(self) -> bool:
""" 加锁 """
while time.time() <= self.end:
# 如果锁不存在,设置锁并给锁加上超时时间
if self.coon.set(self.lock_name,self.rem_ark,ex=self.lock_timeout,nx=True):
return True
time.sleep(0.001)
return False
def _unlock(self) -> bool:
""" 释放锁 方法一事务 """
# 使用redis中的事务pipeline
pipe = self.coon.pipeline(True)
while True:
try:
# 通过watch监视锁名称,如果锁名称一旦改变,就抛出WatchError
pipe.watch(self.lock_name)
rem_rak = self.coon.get(self.lock_name)
if rem_rak and rem_rak == self.rem_ark:
pipe.multi()
pipe.delete(self.lock_name)
pipe.excute()
return True
pipe.unwatch()
break
except WatchError:
pass
return False
def release_lock(self):
""" 释放锁 方法二lua脚本 """
unlock_lua = """ if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end """
unlock = self.coon.register_script(unlock_lua)
result = unlock(key=[self.lock_name,],args=[self.rem_ark])
if result:
return True
return False
def extra_time(self):
""" 如果锁的超时时间小于redis存取的时间(锁已经超时了,但是任务还未执行完成) """
# 默认给锁再延长10秒
try:
self.coon.expire(DistributedLocks.lock_name, 5)
except Exception:
raise TimeoutError
def __enter__(self):
self._lock()
def __exit__(self, exc_type, exc_val, exc_tb):
try:
# self._unlock()
self.release_lock()
except Exception:
return False
def storage_redis(value):
# 加锁与释放锁使用类DistributedLocks来实现
DL = DistributedLocks(redis,'test') # 实例化分布式锁
with DL:
watchdog = WatchDog(DL.lock_timeout,DL.extra_time()) # 启动看门狗
try:
redis.set('age', value)
except Exception: # 捕获异常,进行异常操作
# watchdog.error_handler() # 捕获异常,自定义异常处理
# watchdog.reset() # 看门狗重启
watchdog.stop() # 停止看门狗
print(redis.get('age'))
if __name__ == '__main__':
for i in range(5):
t = threading.Thread(target=storage_redis, args=(i,))
t.start()
print('--------------------')
print(redis.get('age'))
# start = time.time()
# storage_redis(13)
# end = time.time()-start
# print(end)
如果锁已经超时了,但是任务还未执行完成那么便再给它加5秒,当然这种也会有问题,如果5秒后任务还没执行完成就会报错,当然了你可以在watchdog.error_handler()方法内自定义异常处理
redis可重入锁的实现
可重入性是指线程在持有锁的情况下再次请求加锁,如果一个锁支持同一个线程的多次加锁,那么这个锁就是可重入的。
import threading
import time
import uuid
import redis as _redis
from redis import WatchError
from threading import Timer
redis = _redis.Redis(host='localhost', port=6379, decode_responses=True, db=11)
class WatchDog(object):
""" Python实现redis的看门狗机制 """
def __init__(self,timeout,user_handler=None):
self.timeout = timeout
self.user_handler = user_handler if user_handler else self.default_handler
self.timer = Timer(self.timeout,self.user_handler)
self.timer.start()
def reset(self):
"""计时器重启"""
self.timer.cancel()
timer = Timer(self.timeout,self.user_handler)
timer.start()
def stop(self):
"""计时器停止"""
self.timer.cancel()
def default_handler(self):
pass
@classmethod
def error_handler(cls):
pass
class DistributedLocks(object):
""" Redis分布式锁实现类 """
lock_timeout = None
lock_name = None
def __init__(self,coon,lock_name=None , acquire_timeout=3 , lock_timeout=3, **kwargs):
""" coon --> redis的连接 lock_name --> 锁的名称 acquire_timeout --> 获取锁的超时时间(秒) lock_timeout --> 锁的超时时间(秒) """
self.coon = coon
if not lock_name:
lock_name = 'default'
self.lock_name = f'lock:{
lock_name}'
self.rem_ark = str(uuid.uuid4())
self.lock_timeout = lock_timeout
self.end = time.time() + acquire_timeout
DistributedLocks.lock_timeout = self.lock_timeout
DistributedLocks.lock_name = self.lock_name
self.locks = threading.local()
# 可重入锁栈
self.locks.redis = {
}
def _lock(self) -> bool:
""" 加锁 """
while time.time() <= self.end:
# 如果栈内存在此锁,把持有锁的数量加一
if self.lock_name in self.locks.redis:
self.locks.redis[self.lock_name] +=1
return True
# 如果栈内锁不存在,设置锁并给锁加上超时时间
if self.coon.set(self.lock_name,self.rem_ark,ex=self.lock_timeout,nx=True):
# 再在栈中加入一个键值对key为当前线程,value为线程持有锁的数量
self.locks.redis[self.lock_name] = 1
return True
return False
def _unlock(self) -> bool:
""" 释放锁 方法一事务 """
# 判断锁是否在栈中
if self.lock_name in self.locks.redis:
# 将锁的数量减一
self.locks.redis[self.lock_name] -= 1
# 锁如果小于等于零的话,直接将锁释放掉
if self.locks.redis[self.lock_name] <= 0:
# 使用redis中的事务pipeline
pipe = self.coon.pipeline(True)
while True:
try:
# 通过watch监视锁名称,如果锁名称一旦改变,就抛出WatchError
pipe.watch(self.lock_name)
rem_rak = self.coon.get(self.lock_name)
if rem_rak and rem_rak == self.rem_ark:
pipe.multi()
pipe.delete(self.lock_name)
pipe.excute()
return True
pipe.unwatch()
break
except WatchError:
pass
return False
return False
def release_lock(self):
""" 释放锁 方法二lua脚本 """
unlock_lua = """ if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end """
unlock = self.coon.register_script(unlock_lua)
result = unlock(key=[self.lock_name,],args=[self.rem_ark])
if result:
return True
return False
def extra_time(self):
""" 如果锁的超时时间小于redis存取的时间(锁已经超时了,但是任务还未执行完成) """
# 默认给锁再延长10秒
try:
self.coon.expire(DistributedLocks.lock_name, 5)
except Exception:
raise TimeoutError
def __enter__(self):
self._lock()
def __exit__(self, exc_type, exc_val, exc_tb):
try:
self._unlock()
# self.release_lock()
except Exception:
return False
def storage_redis(value):
# 加锁与释放锁使用类DistributedLocks来实现
DL = DistributedLocks(redis,'test') # 实例化分布式锁
with DL:
watchdog = WatchDog(DL.lock_timeout,DL.extra_time()) # 启动看门狗
try:
redis.set('age', value)
except Exception: # 捕获异常,进行异常操作
# watchdog.error_handler() # 捕获异常,自定义异常处理
# watchdog.reset() # 看门狗重启
watchdog.stop() # 停止看门狗
print(redis.get('age'))
if __name__ == '__main__':
for i in range(5):
t = threading.Thread(target=storage_redis, args=(i,))
t.start()
print('--------------------')
print(redis.get('age'))
# start = time.time()
# storage_redis(13)
# end = time.time()-start
# print(end)
- 本人极其不推荐使用可重入锁,它加重了客户端的复杂性,在编写业务方法时注意在 逻辑结构上进行调整完全可以不使用可重入锁
select 100-mean(usage_idle) as usage_idle from (
select last(usage_idle) as usage_idle from (
select mean("usage_idle") as usage_idle from cpu where sys_name ='study' and time > now() - 10m group by host,cpu,time(10m)
) group by host,cp
)
边栏推荐
- els 长条碰撞变形判断
- 不精确微分/不完全微分(Inexact differential/Imperfect differential)
- 为什么四个字节的float表示的范围比八个字节的long要广
- Configure zabbix auto-discovery and auto-registration.
- FFmpeg AVPacket详解
- 动态刷新日志级别
- logback源码阅读(二)日志打印,自定义appender,encoder,pattern,converter
- hsql是什么_MQL语言
- Detailed explanation of ORACLE expdp/impdp
- VMM是什么?_兮是什么意思
猜你喜欢
Data Organization---Chapter 6 Diagram---Graph Traversal---Multiple Choice Questions
RKMPP库快速上手--(一)RKMPP功能及使用详解
网络安全第四次作业
Shell脚本完成pxe装机配置
The world's largest Apache open source foundation is how it works?
shell脚本“画画”
Supervision strikes again, what about the market outlook?2021-05-22
rhce第三天作业
C language improvement (3)
未来的金融服务永远不会停歇,牛市仍将继续 2021-05-28
随机推荐
C语言提高篇(三)
uview 2.x版本 tabbar在uniapp小程序里头点击两次才能选中图标
未来的金融服务永远不会停歇,牛市仍将继续 2021-05-28
vim复制粘贴_vim如何复制粘贴
专访|带着问题去学习,Apache DolphinScheduler 王福政
SQL函数 UPPER
GTK:Gdk-CRITICAL **: IA__gdk_draw_pixbuf: assertion ‘GDK_IS_DRAWABLE (drawable)’ failed
Audio processing: floating point data stream to PCM file
WiFi Association&Omnipeek抓包分析
Sentinel源码(六)ParamFlowSlot热点参数限流
世界上最大的开源基金会 Apache 是如何运作的?
Word | 关于删除分节符(下一页)前面的版式就乱了解决方案
腾讯安全游戏行业研讨会:生态共建,护航游戏产业健康发展
MySQL数据库语法格式
FFmpeg 的AVCodecContext结构体详解
存储过程详解
C language improvement (3)
标量替换、栈上分配、同步消除
FreeBSD bnxt以太网驱动源码阅读记录三:
Shell脚本完成pxe装机配置