当前位置:网站首页>關於分布式鎖的續命問題——基於Redis實現的分布式鎖
關於分布式鎖的續命問題——基於Redis實現的分布式鎖
2022-06-11 13:03:00 【熟透的蝸牛】
目錄
一、背景
關於分布式鎖就不多說了,現在出現了一種場景,如果在分布式鎖中,業務代碼沒有執行完,然後鎖的鍵值過期了,那麼其他的JVM就可能會獲取到鎖而發生幂等的問題。那麼這種情况怎麼解决呢?

1、如果線程1獲取到了鎖,那麼在業務代碼還沒執行完時,redis鍵值過期了,那麼就會發生幂等問題。解决的思路是,當線程1獲取到鎖之後,開啟一個線程去監聽線程1是否執行完成,如果沒有執行完成,就去延長鍵值的過期時間。
2、如果因為某些原因(比如代碼bug,網絡不穩定),鎖一直不能釋放,然後就要一直進行延長過期時間麼?答案是否定的,我們可以設置延長時間的次數,如果超過設定的次數還是失敗,就自動釋放鎖,然後回滾業務。
3、釋放鎖時需要考慮的問題,誰加的鎖,就讓誰來釋放鎖。
二、自定義實現
鎖信息實體類
package com.xiaojie.lock;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author xiaojie
* @version 1.0
* @description: 記錄鎖信息
* @date 2022/6/9 17:44
*/
public class RedisLockInfo {
/**
* 鎖的狀態 state為true 則錶示獲取鎖成功
*/
private boolean state;
/**
* 鎖的id
*/
private String lockId;
/**
* 鎖的持有線程
*/
private Thread lockThread;
/**
* 鎖的過期時間
*/
private Long expire;
/**
* 續命次數
*/
private AtomicInteger lifeCount;
/**
* 獲取鎖次數
*/
private AtomicInteger lockCount;
// 鎖的可重入次數
public RedisLockInfo(String lockId, Thread lockThread, Long expire) {
state = true;
this.lockId = lockId;
this.lockThread = lockThread;
this.expire = expire;
lifeCount = new AtomicInteger(0);
lockCount = new AtomicInteger(0);
}
public RedisLockInfo(Thread lockThread, Long expire) {
this.lockThread = lockThread;
this.expire = expire;
lifeCount = new AtomicInteger(0);
lockCount = new AtomicInteger(0);
state = true;
}
public RedisLockInfo() {
state = true;
}
public String getLockId() {
return lockId;
}
public boolean isState() {
return state;
}
public Thread getLockThread() {
return lockThread;
}
public Long getExpire() {
return expire;
}
//重試的次數
public Integer getLifeCount() {
return lifeCount.incrementAndGet();
}
//鎖獲取的次數
public Integer incrLockCount() {
return lockCount.incrementAndGet();
}
//釋放鎖次數
public Integer decreLockCount() {
return lockCount.decrementAndGet();
}
}
獲取鎖的方法和續命方法
package com.xiaojie.lock.impl;
import com.xiaojie.lock.RedisLock;
import com.xiaojie.lock.RedisLockInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @author xiaojie
* @version 1.0
* @description: 實現分布式鎖的實現類
* @date 2022/6/9 18:05
*/
@Component
@Slf4j
public class RedisLockImpl implements RedisLock {
@Autowired
private StringRedisTemplate stringRedisTemplate;
private String redisLockKey = "redisLockKey";
/**
* 緩存redis鎖
*/
private static Map<Thread, RedisLockInfo> lockCacheMap = new ConcurrentHashMap<>();
/**
* 重試時間
*/
private Long timeout = 3000L;
private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
@Override
public boolean tryLock() {
Thread currentThread = Thread.currentThread();
RedisLockInfo redisLockInfo = lockCacheMap.get(currentThread);
//判斷是不是當前線程獲取到了鎖
if (null != redisLockInfo && redisLockInfo.isState()) {
//證明已經獲取到了鎖,鎖重入
log.info(">>>>>>>>>>>>>>>>>已經獲取到了鎖");
redisLockInfo.incrLockCount(); //獲取鎖的次數加1
return true;
}
Long startTime = System.currentTimeMillis();
Long lockExpire = 30l; //鍵值的過期時間
//重試獲取鎖
for (; ; ) {
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(redisLockKey, "1", lockExpire, TimeUnit.SECONDS);
if (lock) {
//獲取鎖成功
log.info(">>>>>>>>>>>>>>獲取鎖成功");
lockCacheMap.put(currentThread, new RedisLockInfo(currentThread, lockExpire));
return true;
}
// 控制一個超時的時間
Long endTime = System.currentTimeMillis();
if (endTime - startTime > timeout) {
log.info("在3秒內已經過了重試時間了>>>>>>>>>>>");
return false;
}
// 繼續循環
try {
Thread.sleep(10); //休眠避免CPU飆高
} catch (Exception e) {
}
}
}
@Override
public boolean releaseLock() {
Thread currentThread = Thread.currentThread();
RedisLockInfo redisLockInfo = lockCacheMap.get(currentThread);
if (null != redisLockInfo && redisLockInfo.isState()) {
if (redisLockInfo.decreLockCount() <= 0) {
lockCacheMap.remove(currentThread);
//删除鍵值
stringRedisTemplate.delete(redisLockKey);
return true;
}
}
return false;
}
public RedisLockImpl() {
//開始定時任務實現續命,每3秒續命一次
this.scheduledExecutorService.scheduleAtFixedRate(new LifeExtensionThread(), 0, 3, TimeUnit.SECONDS);
}
//定義監聽線程
class LifeExtensionThread implements Runnable {
@Override
public void run() {
log.info("開始續命線程>>>>>>>>>>>>>>>>>>>>>>>>>");
if (lockCacheMap.size() > 0) {
lockCacheMap.forEach((k, lockInfo) -> {
try {
Thread lockServiceThread = lockInfo.getLockThread();
if (lockServiceThread.isInterrupted()) {
//判斷線程是否執行完畢
log.info("當前線程已經被停止>>>>>>>>>>>>>>>");
lockCacheMap.remove(k); //從緩存中移除線程
return;
}
// 續命鍵值過期時間
Integer lifeCount = lockInfo.getLifeCount();
if (lifeCount > 5) {
log.info(">>>>>>>>>>>>>>>>>您已經續命了多次當前線程還沒有釋放鎖,現在主動將該鎖釋放 避免死鎖的問題");
// 1.回滾業務(例如事務回滾)
// 2.釋放鎖
releaseLock();
// 3.將該線程主動停止
lockServiceThread.interrupt();
// 4.移除監聽
lockCacheMap.remove(k);
return;
}
//提前實現續命 延長過期key的時間
stringRedisTemplate.expire(redisLockKey, lockInfo.getExpire(), TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
}
}
模擬效果使用 Jemeter模擬秒殺的業務
package com.xiaojie.controller;
import com.xiaojie.entity.Goods;
import com.xiaojie.lock.RedisLock;
import com.xiaojie.mapper.GoodsMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author xiaojie
* @version 1.0
* @description: 秒殺的接口
* @date 2022/6/9 22:55
*/
@RestController
public class SeckillController {
@Autowired
private GoodsMapper goodsMapper;
@Autowired
private RedisLock redisLock;
@GetMapping("/secKill")
public String secKill(Long id) {
try {
if (!redisLock.tryLock()) {
//獲取鎖失敗
return "活動太火了,請稍後重試";
}
Goods goods = goodsMapper.selectById(id);
if (null == goods) {
return "該商品沒有秒殺活動";
}
if (goods.getStock() <= 0) {
return "商品庫存為空了。。。。。";
}
//减庫存
Integer result = goodsMapper.deceGoods(id);
return result > 0 ? "秒殺成功" : "秒殺失敗";
} catch (Exception e) {
e.printStackTrace();
} finally {
//釋放鎖
redisLock.releaseLock();
}
return "fail";
}
}
三、Redisson框架實現

圖片來源 https://blog.csdn.net/menxu_work/article/details/123827526
1、Redisson向redis寫入鍵值的時候不再使用setNx的命令,而是使用Lua脚本的形式寫入
2、寫入鍵值成功之後會設置一個後臺線程(看門狗線程)來對鍵值進行續命,默認值是每10秒續命一次,續命時間為30秒
3、鍵值的類型為hash類型,如下圖,其中1是鎖入的次數

4、釋放鎖的時候,也是通過Lua脚本的方式,當鎖入的次數小於0時,就會删除鍵值。
源碼分析
加鎖代碼
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
//如果沒有過期時間,證明鍵值不存在,直接返回
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}
//鍵值存在,自旋方式去獲取鎖
try {
while (true) {
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
try {
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
future.getNow().getLatch().acquire();
} else {
future.getNow().getLatch().acquireUninterruptibly();
}
}
}
} finally {
//取消訂閱
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " + //判斷鍵值是否存在,==0不存在
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + //創建哈希類型鍵值,並設置值為1
"redis.call('pexpire', KEYS[1], ARGV[1]); " +//設置過期時間 ,默認是30s
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +//鍵值存在
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +//將鍵值的value值加1
"redis.call('pexpire', KEYS[1], ARGV[1]); " + //設置過期時間為30s
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);", //鍵值存在,並且不是重入鎖,則返回鍵值剩餘存活時間
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}釋放鎖的代碼
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + //鍵值不存在則錶示鎖已經釋放了
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + //簡直存在,則鍵值减1
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " + //如果减1後鍵值還是存在,則重新設置過期時間
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " + //删除鍵值
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}續命的代碼

模擬秒殺代碼實現
package com.xiaojie.redisson;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author xiaojie
* @version 1.0
* @description: reddisson配置
* @date 2022/6/10 2:05
*/
@Configuration
public class RedissonConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Value("${spring.redis.password}")
private String password;
@Bean
public RedissonClient getRedisson() {
Config config = new Config();
//設置看門狗時間
// config.setLockWatchdogTimeout(30000L);
//設置單機版本redis
config.useSingleServer().setAddress("redis://" + host + ":" + port);
//設置密碼
config.useSingleServer().setPassword(password);
//設置集群的方式
// config.useClusterServers().addNodeAddress("redis://" + host + ":" + port);
// config.useClusterServers().addNodeAddress("redis://" + host2 + ":" + port2);
//添加主從配置
// config.useMasterSlaveServers().setMasterAddress("").setPassword("").addSlaveAddress(new String[]{"",""});
return Redisson.create(config);
}
}
秒殺的接口
package com.xiaojie.controller;
import com.xiaojie.entity.Goods;
import com.xiaojie.lock.RedisLock;
import com.xiaojie.mapper.GoodsMapper;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author xiaojie
* @version 1.0
* @description: 秒殺的接口
* @date 2022/6/9 22:55
*/
@RestController
public class RedissonSeckillController {
@Autowired
private GoodsMapper goodsMapper;
@Autowired
private RedissonClient redissonClient;
@GetMapping("/secKillRedisson")
public String secKillRedisson(Long id) {
RLock rLock = null;
try {
rLock = redissonClient.getLock(id + "");
rLock.lock(); //加鎖,加幾次鎖,finally釋放鎖的時候就要釋放幾次鎖
Goods goods = goodsMapper.selectById(id);
if (null == goods) {
return "該商品沒有秒殺活動";
}
if (goods.getStock() <= 0) {
return "商品庫存為空了。。。。。";
}
//减庫存
Integer result = goodsMapper.deceGoods(id);
return result > 0 ? "秒殺成功" : "秒殺失敗";
} catch (Exception e) {
e.printStackTrace();
} finally {
rLock.unlock();
}
return "fail";
}
}
边栏推荐
- 游泳馆暑期业绩翻倍的方法
- 综合场馆的优势有哪些?
- C language - data storage
- Quic resistance
- 逆向学习入门-优秀的汇编调试工具OllyDbg
- In the list of 618 projector hedging brands in 2022, dangbei projection ranked top 1 in the hedging rate of idle fish
- ProblemB. Phoenix and Beauty
- 4K投影儀哪款性價比最高,當貝X3 Pro高亮128G存儲618值得看
- C event bus
- What are the advantages of comprehensive venues?
猜你喜欢

What are the elements of running a gymnasium?

How Oracle exports data to CSV (Excel) files

Audio adaptation of openharmony Standard System Porting

【后台交互】select 绑定后台传递的数据

苹果将造搜索引擎?

机械设备制造企业,如何借助ERP系统做好委外加工管理?

openharmony标准系统移植之音频适配

综合场馆的优势有哪些?

Adobe Premiere基础-批量素材导入序列-变速和倒放(回忆)-连续动作镜头切换-字幕要求(十三)

刚高考完有些迷茫不知道做些什么?谈一谈我的看法
随机推荐
场馆坪效这么低?关键在这两方面
Technical difficulties of secsha
非标自动化设备制造企业,如何借助ERP系统实现快速精准报价?
[interface] view the interface path and check the interface
openharmony标准系统之app手动签名
[acwing 11. solution number of knapsack problem] 01 knapsack + 01 knapsack + understand the specific meaning of 01 knapsack
Audio adaptation of openharmony Standard System Porting
C language - data storage
Syntax of SQL
求你了,不要再在对外接口中使用枚举类型了!
经营养生理疗馆要注意什么问题?
馆客多游泳馆会员管理系统可以实现哪些场景?
Simple score statistics
2022年618投影仪保值品牌榜,当贝投影闲鱼保值率排行TOP1
1. Thread Basics
How Oracle exports data to CSV (Excel) files
【滤波器】基于matlab时变维纳滤波器设计【含Matlab源码 1870期】
母婴店的利润来源有哪些?
31w赛题奖金!当 AI for Science 撞上“先导杯”,会擦出什么样的火花?
Matrix elimination game