当前位置:网站首页>关于分布式锁的续命问题——基于Redis实现的分布式锁
关于分布式锁的续命问题——基于Redis实现的分布式锁
2022-06-11 12:51:00 【熟透的蜗牛】
目录
一、背景
关于分布式锁就不多说了,现在出现了一种场景,如果在分布式锁中,业务代码没有执行完,然后锁的键值过期了,那么其他的JVM就可能会获取到锁而发生幂等的问题。那么这种情况怎么解决呢?

1、如果线程1获取到了锁,那么在业务代码还没执行完时,redis键值过期了,那么就会发生幂等问题。解决的思路是,当线程1获取到锁之后,开启一个线程去监听线程1是否执行完成,如果没有执行完成,就去延长键值的过期时间。
2、如果因为某些原因(比如代码bug,网络不稳定),锁一直不能释放,然后就要一直进行延长过期时间么?答案是否定的,我们可以设置延长时间的次数,如果超过设定的次数还是失败,就自动释放锁,然后回滚业务。
3、释放锁时需要考虑的问题,谁加的锁,就让谁来释放锁。
二、自定义实现
锁信息实体类
package com.xiaojie.lock;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author xiaojie
* @version 1.0
* @description: 记录锁信息
* @date 2022/6/9 17:44
*/
public class RedisLockInfo {
/**
* 锁的状态 state为true 则表示获取锁成功
*/
private boolean state;
/**
* 锁的id
*/
private String lockId;
/**
* 锁的持有线程
*/
private Thread lockThread;
/**
* 锁的过期时间
*/
private Long expire;
/**
* 续命次数
*/
private AtomicInteger lifeCount;
/**
* 获取锁次数
*/
private AtomicInteger lockCount;
// 锁的可重入次数
public RedisLockInfo(String lockId, Thread lockThread, Long expire) {
state = true;
this.lockId = lockId;
this.lockThread = lockThread;
this.expire = expire;
lifeCount = new AtomicInteger(0);
lockCount = new AtomicInteger(0);
}
public RedisLockInfo(Thread lockThread, Long expire) {
this.lockThread = lockThread;
this.expire = expire;
lifeCount = new AtomicInteger(0);
lockCount = new AtomicInteger(0);
state = true;
}
public RedisLockInfo() {
state = true;
}
public String getLockId() {
return lockId;
}
public boolean isState() {
return state;
}
public Thread getLockThread() {
return lockThread;
}
public Long getExpire() {
return expire;
}
//重试的次数
public Integer getLifeCount() {
return lifeCount.incrementAndGet();
}
//锁获取的次数
public Integer incrLockCount() {
return lockCount.incrementAndGet();
}
//释放锁次数
public Integer decreLockCount() {
return lockCount.decrementAndGet();
}
}
获取锁的方法和续命方法
package com.xiaojie.lock.impl;
import com.xiaojie.lock.RedisLock;
import com.xiaojie.lock.RedisLockInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @author xiaojie
* @version 1.0
* @description: 实现分布式锁的实现类
* @date 2022/6/9 18:05
*/
@Component
@Slf4j
public class RedisLockImpl implements RedisLock {
@Autowired
private StringRedisTemplate stringRedisTemplate;
private String redisLockKey = "redisLockKey";
/**
* 缓存redis锁
*/
private static Map<Thread, RedisLockInfo> lockCacheMap = new ConcurrentHashMap<>();
/**
* 重试时间
*/
private Long timeout = 3000L;
private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
@Override
public boolean tryLock() {
Thread currentThread = Thread.currentThread();
RedisLockInfo redisLockInfo = lockCacheMap.get(currentThread);
//判断是不是当前线程获取到了锁
if (null != redisLockInfo && redisLockInfo.isState()) {
//证明已经获取到了锁,锁重入
log.info(">>>>>>>>>>>>>>>>>已经获取到了锁");
redisLockInfo.incrLockCount(); //获取锁的次数加1
return true;
}
Long startTime = System.currentTimeMillis();
Long lockExpire = 30l; //键值的过期时间
//重试获取锁
for (; ; ) {
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(redisLockKey, "1", lockExpire, TimeUnit.SECONDS);
if (lock) {
//获取锁成功
log.info(">>>>>>>>>>>>>>获取锁成功");
lockCacheMap.put(currentThread, new RedisLockInfo(currentThread, lockExpire));
return true;
}
// 控制一个超时的时间
Long endTime = System.currentTimeMillis();
if (endTime - startTime > timeout) {
log.info("在3秒内已经过了重试时间了>>>>>>>>>>>");
return false;
}
// 继续循环
try {
Thread.sleep(10); //休眠避免CPU飙高
} catch (Exception e) {
}
}
}
@Override
public boolean releaseLock() {
Thread currentThread = Thread.currentThread();
RedisLockInfo redisLockInfo = lockCacheMap.get(currentThread);
if (null != redisLockInfo && redisLockInfo.isState()) {
if (redisLockInfo.decreLockCount() <= 0) {
lockCacheMap.remove(currentThread);
//删除键值
stringRedisTemplate.delete(redisLockKey);
return true;
}
}
return false;
}
public RedisLockImpl() {
//开始定时任务实现续命,每3秒续命一次
this.scheduledExecutorService.scheduleAtFixedRate(new LifeExtensionThread(), 0, 3, TimeUnit.SECONDS);
}
//定义监听线程
class LifeExtensionThread implements Runnable {
@Override
public void run() {
log.info("开始续命线程>>>>>>>>>>>>>>>>>>>>>>>>>");
if (lockCacheMap.size() > 0) {
lockCacheMap.forEach((k, lockInfo) -> {
try {
Thread lockServiceThread = lockInfo.getLockThread();
if (lockServiceThread.isInterrupted()) {
//判断线程是否执行完毕
log.info("当前线程已经被停止>>>>>>>>>>>>>>>");
lockCacheMap.remove(k); //从缓存中移除线程
return;
}
// 续命键值过期时间
Integer lifeCount = lockInfo.getLifeCount();
if (lifeCount > 5) {
log.info(">>>>>>>>>>>>>>>>>您已经续命了多次当前线程还没有释放锁,现在主动将该锁释放 避免死锁的问题");
// 1.回滚业务(例如事务回滚)
// 2.释放锁
releaseLock();
// 3.将该线程主动停止
lockServiceThread.interrupt();
// 4.移除监听
lockCacheMap.remove(k);
return;
}
//提前实现续命 延长过期key的时间
stringRedisTemplate.expire(redisLockKey, lockInfo.getExpire(), TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
}
}
模拟效果使用 Jemeter模拟秒杀的业务
package com.xiaojie.controller;
import com.xiaojie.entity.Goods;
import com.xiaojie.lock.RedisLock;
import com.xiaojie.mapper.GoodsMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author xiaojie
* @version 1.0
* @description: 秒杀的接口
* @date 2022/6/9 22:55
*/
@RestController
public class SeckillController {
@Autowired
private GoodsMapper goodsMapper;
@Autowired
private RedisLock redisLock;
@GetMapping("/secKill")
public String secKill(Long id) {
try {
if (!redisLock.tryLock()) {
//获取锁失败
return "活动太火了,请稍后重试";
}
Goods goods = goodsMapper.selectById(id);
if (null == goods) {
return "该商品没有秒杀活动";
}
if (goods.getStock() <= 0) {
return "商品库存为空了。。。。。";
}
//减库存
Integer result = goodsMapper.deceGoods(id);
return result > 0 ? "秒杀成功" : "秒杀失败";
} catch (Exception e) {
e.printStackTrace();
} finally {
//释放锁
redisLock.releaseLock();
}
return "fail";
}
}
三、Redisson框架实现

图片来源 https://blog.csdn.net/menxu_work/article/details/123827526
1、Redisson向redis写入键值的时候不再使用setNx的命令,而是使用Lua脚本的形式写入
2、写入键值成功之后会设置一个后台线程(看门狗线程)来对键值进行续命,默认值是每10秒续命一次,续命时间为30秒
3、键值的类型为hash类型,如下图,其中1是锁入的次数

4、释放锁的时候,也是通过Lua脚本的方式,当锁入的次数小于0时,就会删除键值。
源码分析
加锁代码
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
//如果没有过期时间,证明键值不存在,直接返回
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}
//键值存在,自旋方式去获取锁
try {
while (true) {
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
try {
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
future.getNow().getLatch().acquire();
} else {
future.getNow().getLatch().acquireUninterruptibly();
}
}
}
} finally {
//取消订阅
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " + //判断键值是否存在,==0不存在
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + //创建哈希类型键值,并设置值为1
"redis.call('pexpire', KEYS[1], ARGV[1]); " +//设置过期时间 ,默认是30s
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +//键值存在
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +//将键值的value值加1
"redis.call('pexpire', KEYS[1], ARGV[1]); " + //设置过期时间为30s
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);", //键值存在,并且不是重入锁,则返回键值剩余存活时间
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}释放锁的代码
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + //键值不存在则表示锁已经释放了
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + //简直存在,则键值减1
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " + //如果减1后键值还是存在,则重新设置过期时间
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " + //删除键值
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}续命的代码

模拟秒杀代码实现
package com.xiaojie.redisson;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author xiaojie
* @version 1.0
* @description: reddisson配置
* @date 2022/6/10 2:05
*/
@Configuration
public class RedissonConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Value("${spring.redis.password}")
private String password;
@Bean
public RedissonClient getRedisson() {
Config config = new Config();
//设置看门狗时间
// config.setLockWatchdogTimeout(30000L);
//设置单机版本redis
config.useSingleServer().setAddress("redis://" + host + ":" + port);
//设置密码
config.useSingleServer().setPassword(password);
//设置集群的方式
// config.useClusterServers().addNodeAddress("redis://" + host + ":" + port);
// config.useClusterServers().addNodeAddress("redis://" + host2 + ":" + port2);
//添加主从配置
// config.useMasterSlaveServers().setMasterAddress("").setPassword("").addSlaveAddress(new String[]{"",""});
return Redisson.create(config);
}
}
秒杀的接口
package com.xiaojie.controller;
import com.xiaojie.entity.Goods;
import com.xiaojie.lock.RedisLock;
import com.xiaojie.mapper.GoodsMapper;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author xiaojie
* @version 1.0
* @description: 秒杀的接口
* @date 2022/6/9 22:55
*/
@RestController
public class RedissonSeckillController {
@Autowired
private GoodsMapper goodsMapper;
@Autowired
private RedissonClient redissonClient;
@GetMapping("/secKillRedisson")
public String secKillRedisson(Long id) {
RLock rLock = null;
try {
rLock = redissonClient.getLock(id + "");
rLock.lock(); //加锁,加几次锁,finally释放锁的时候就要释放几次锁
Goods goods = goodsMapper.selectById(id);
if (null == goods) {
return "该商品没有秒杀活动";
}
if (goods.getStock() <= 0) {
return "商品库存为空了。。。。。";
}
//减库存
Integer result = goodsMapper.deceGoods(id);
return result > 0 ? "秒杀成功" : "秒杀失败";
} catch (Exception e) {
e.printStackTrace();
} finally {
rLock.unlock();
}
return "fail";
}
}
边栏推荐
- [ArcGIS]城市关联度分析
- [arcgis] City relevance analysis
- What scenarios can the member management system of the multi guest swimming pool achieve?
- 火山引擎云数据库 veDB 在字节内部的业务实践
- [untitled]
- 机械设备制造企业,如何借助ERP系统做好委外加工管理?
- live share使用体验
- Which 4K projector is the most cost-effective? When the Bay X3 Pro highlights the 128G storage 618, it is worth seeing
- Which brand of bone conduction Bluetooth headset is good? Five most popular bone conduction Bluetooth headsets
- 求你了,不要再在对外接口中使用枚举类型了!
猜你喜欢

PADS使用之繪制原理圖

PADS使用之绘制原理图

馆客多游泳馆会员管理系统可以实现哪些场景?

Luo Jing: connection Efficiency Optimization Practice

CS structure and BS structure

4K投影儀哪款性價比最高,當貝X3 Pro高亮128G存儲618值得看

1. Thread Basics

美容院管理系统如何解决门店运营的三大难题?

综合场馆的优势有哪些?

Tawang food industry insight | China's dairy market analysis, competition pattern, development trend and thinking
随机推荐
Another way to achieve family reunion, 2022 flagship projection nut j10s is planted with grass
Mctalk's entrepreneurial voice - erudition and discernment: be interested in socializing, and provide a "small and beautiful" space for old friends before and after retirement
SQL的语法
Tawang food industry insight | China's dairy market analysis, competition pattern, development trend and thinking
After Oracle deletes a user, it can still use the user to log in
经营养生理疗馆要注意什么问题?
启封easy QF PDA帮助企业提升ERP的管理水平
【backtrader源码解析46】cerebro.py代码注释(枯燥,backtrader核心代码之一,推荐阅读,注释仅供参考)
Condition debug of pycharm
Research on DB2 Database Reconstruction and table data migration
求你了,不要再在对外接口中使用枚举类型了!
tf.data(二) —— 并行化 tf.data.Dataset 生成器
Why are the current membership warehouse stores bursting out collectively?
两件小事,感受到了和大神的差距
Imx6ul development board porting EMMC startup process of mainline u-boot
科海融生&正航,以信息化驱动管理升级,携手共迎数智未来
馆客多游泳馆会员管理系统可以实现哪些场景?
综合场馆的优势有哪些?
常用字体介绍
Number selection (greed)