当前位置:网站首页>Redisson 高性能 Redis 分布式锁源码分析
Redisson 高性能 Redis 分布式锁源码分析
2022-07-02 15:56:00 【运维开发故事】
微信公众号:运维开发故事,作者:老郑
Redisson 实现分布式锁的机制如下:
原理描述
先线程 1 获取锁,如果获取锁成功,那么会开启一个后台线程,每次间隔 10 秒进行续期。
并发情况,线程 2 会进行加锁,如果无法获取锁,那么就会进行自旋等待,等待到达一定次数过后,就会进行线程阻塞,并且订阅解锁消息。
当线程 1 释放锁之后,会触发 redis 的解锁消息,消息的观察者会观察到然后去唤醒解锁的逻辑,线程 2 继续竞争锁。
对于锁的重入,Redisson 是通过 hash 为数据类型的,会存储当前线程的 tid (本质是生成的 uuid 唯一id).
测试代码
下面我们将以一个秒杀的例子来说明:
依赖版本
implementation 'org.redisson:redisson-spring-boot-starter:3.17.0'
测试代码
下面是模拟一个商品秒杀的场景,示例代码如下:
public class RedissonTest {
public static void main(String[] args) {
//1. 配置部分
Config config = new Config();
String address = "redis://127.0.0.1:6379";
SingleServerConfig serverConfig = config.useSingleServer();
serverConfig.setAddress(address);
serverConfig.setDatabase(0);
config.setLockWatchdogTimeout(5000);
Redisson redisson = (Redisson) Redisson.create(config);
RLock rLock = redisson.getLock("goods:1000:1");
//2. 加锁
rLock.lock();
try {
System.out.println("todo 逻辑处理 1000000.");
} finally {
if (rLock.isLocked() && rLock.isHeldByCurrentThread()) {
//3. 解锁
rLock.unlock();
}
}
}
}
加锁设计
rLock.lock();
是加锁的核心代码,我们一起来看看调用栈加锁的核心方法是:
org.redisson.RedissonLock#tryLockInnerAsync
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
其实它的本质是调用一段 LUA 脚本进行加锁, 需要注意的是这个地方使用的数据类型是 hash。这里是用 hash 的好处就是可以通过同一个 key 来存储重入的 tid
锁续期设计
锁的续期是在 org.redisson.RedissonLock#tryAcquireAsync
方法中调用 scheduleExpirationRenewal
实现的。
续期需要注意的是,看门狗是设置在主线程的延迟队列的线程中。
这里的好处就是如果我在一个进程中,同时加了 1000 把锁,我们不需要启动 1000 个子线程去续期,只需要创建 1000 个续期任务对象即可,在到达续期时间才会唤醒续期线程。
tryAcquireAsync
代码如下:
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime != -1) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
// lock acquired
if (ttlRemaining == null) {
if (leaseTime != -1) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
// 锁过期时间续期
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper<>(f);
}
锁续期 scheduleExpirationRenewal
代码如下:
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
try {
renewExpiration();
} finally {
if (Thread.currentThread().isInterrupted()) {
cancelExpirationRenewal(threadId);
}
}
}
}
然后在调用 renewExpiration();
执行续期逻辑, 其实这里是一个定时任务 + 递归的方式实现续期的,用定时任务的好处就是不用去开 N 个字线程,只需要创建对应的任务对象即可。
备注:如果超级极端的情况下 N 把锁,同时加锁,同时需求。我们可以考虑在锁的有效期上,给它加一个浮动时间比如 100 - 500ms. 这样就能一定程度上避免 (参考的是缓存失效/击穿的解决方案)
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
// 创建延迟任务
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// 真正的续期,调用 LUA 脚本续期
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
// 如果续期成功
if (res) {
// reschedule itself
renewExpiration();
} else {
cancelExpirationRenewal(null);
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
这里还有一个小的点,就是续期的时间是 1/3 为什么呢?保证在下次续期的时候锁不过期,如果是 1/2 可能在下次定时任务执行的时候 key 已经过期,如果小于 1/3 会导致频繁续期,任务代价/收益比不高。
renewExpirationAsync
方法, 里面还是一段 LUA 脚本,进行重新设置锁的过期时间。
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}
锁的自旋重试
org.redisson.RedissonLock#lock(long, java.util.concurrent.TimeUnit, boolean)
在执行获取锁失败的时候,会进入重试。其实这里就会执行 18 行以后的 while (true)
逻辑
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
// 订阅锁过期的消息
CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
RedissonLockEntry entry;
if (interruptibly) {
entry = commandExecutor.getInterrupted(future);
} else {
entry = commandExecutor.get(future);
}
try {
while (true) {
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
try {
// 阻塞锁的超时时间,等锁过期后再尝试加锁
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
entry.getLatch().acquire();
} else {
entry.getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(entry, threadId);
}
// get(lockAsync(leaseTime, unit));
}
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
其实这里就是一个间歇性自旋。等到上次锁过期的时间,在唤醒进行抢锁 entry.getLatch().acquire();
订阅锁失效
还有一个逻辑就是
CompletableFuture future = subscribe(threadId);
这里其实是会订阅一个消息,如果解锁过后,会发布解锁的消息。然后再唤醒当前多次竞争锁进入休眠的线程。
解锁设计
rLock.unlock(); 的核心就是释放锁,撤销续期和唤醒在等待加锁的线程(发布解锁成功消息)。
核心方法(解锁): org.redisson.RedissonLock#unlockInnerAsync
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
// 发布解锁成功消息
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
还是 LUA 的执行方式。
撤销锁续期
核心方法 org.redisson.RedissonBaseLock#unlockAsync(long)
@Override
public RFuture<Void> unlockAsync(long threadId) {
// 解锁
RFuture<Boolean> future = unlockInnerAsync(threadId);
// 撤销续期
CompletionStage<Void> f = future.handle((opStatus, e) -> {
cancelExpirationRenewal(threadId);
if (e != null) {
throw new CompletionException(e);
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
throw new CompletionException(cause);
}
return null;
});
return new CompletableFutureWrapper<>(f);
}
解锁成功唤排队线程
在 org.redisson.pubsub.LockPubSub#onMessage
中回去唤醒阻塞的线程,让执行前面的锁自旋逻辑,具体代码如下:
@Override
protected void onMessage(RedissonLockEntry value, Long message) {
if (message.equals(UNLOCK_MESSAGE)) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute != null) {
runnableToExecute.run();
}
value.getLatch().release();
} else if (message.equals(READ_UNLOCK_MESSAGE)) {
while (true) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute == null) {
break;
}
runnableToExecute.run();
}
value.getLatch().release(value.getLatch().getQueueLength());
}
}
公众号:运维开发故事
github:https://github.com/orgs/sunsharing-note/dashboard
爱生活,爱运维
边栏推荐
- Problems needing attention in the development and debugging of Yingguang single chip microcomputer
- Edgenext hit a mixed punch: a lightweight architecture integrating CNN and transformer
- Yingguang single chip microcomputer development specification pmc131 with AD chip to detect battery voltage single chip microcomputer sop8/14
- Map集合详细讲解
- 【Zuul】com.netflix.zuul.exception.ZuulException: Hystrix Readed time out
- Development and application case of pms134 scheme of Yingguang single chip microcomputer with original packaging
- uva1169
- Pfc232-sop8/14/16 should be wide-ranging and can be tape programmed with burning program
- Songhan sn8p2511 sop8 single chip microcomputer can be used for burning, providing single chip microcomputer scheme development and single chip microcomputer decryption
- Simple linear programming problem
猜你喜欢
Easyswoole3.2 restart failed
智能水电表能耗监测云平台
This "architect growth note" made 300 people successfully change jobs and enter the big factory, with an annual salary of 50W
What is the experience of maintaining Wanxing open source vector database
Income and risk of linear programming example investment
ASEMI整流桥UMB10F参数,UMB10F规格,UMB10F封装
我的创作纪念日
MB10M-ASEMI整流桥MB10M
【目标跟踪】|数据集汇总
售价仅40元,树莓派Pico开发板加入WiFi模块,刚上市就脱销
随机推荐
维护万星开源向量数据库是什么体验
Keras深度学习实战——基于VGG19模型实现性别分类
应广单片机开发 工规 PMC131 带AD芯片检测电池电压单片机SOP8/14
把xshell连接服务器关掉,运行的jar包就自动停止的解决方案
Longest non repeating subarray
Virtual lab basic experiment tutorial -7 Polarization (1)
应广单片机(MCU单片机科普)
应广单片机PMS150/PMC150/PMS150C消费类单片机
Use of nexttile function in MATLAB
AtCoder Beginner Contest 237 VP补题
[target tracking] | data set summary
EdgeNeXt打出了一套混合拳:集CNN与Transformer于一体的轻量级架构
Example nonlinear integer programming
Modbus协议通信异常
微信小程序 —— 上下浮动的箭头
Huimang micro IO MCU ft60f11f-mrb
阿里云子账户 - 权限策略 - 授权给某个账户某个 OSS Bucket 的完全控制权限
Uniapp H5 page calls wechat payment
freemarker+poi实现动态生成excel文件及解析excel文件
Linux中,mysql设置job任务自动启动