当前位置:网站首页>Redis(十) - Redission原理与实践
Redis(十) - Redission原理与实践
2022-07-30 02:45:00 【Super_Leng】
一、Redission分布式锁原理
基于setnx实现的分布式锁存在下面的问题:
1. Redission介绍
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

官网地址: https://redisson.org
GitHub地址: https://github.com/redisson/redisson
2. Redission基本使用
(1)引入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
(2)配置Redisson客户端
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
// 配置类
Config config = new Config();
// 添加redis地址,这里添加了单点的地址,也可以使用config.useClusterServers()添加集群地址
config.useSingleServer().setAddress("redis://192.168.150.101:6379")
.setPassword("123321");
// 创建客户端
return Redisson.create(config);
}
}
(3)使用Redission的分布式锁
@Resource
private RedissionClient redissonClient;
@Test
void testRedisson() throws Exception{
//获取锁(可重入),指定锁的名称
RLock lock = redissonClient.getLock("anyLock");
//尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
//判断获取锁成功
if(isLock){
try{
System.out.println("执行业务");
}finally{
//释放锁
lock.unlock();
}
}
}
改造上一章中的VoucherOrderServiceImpl,使用Redisson分布式锁:
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private RedissonClient redissonClient;
@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀尚未开始!");
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀已经结束!");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}
Long userId = UserHolder.getUser().getId();
// 使用Redis分布式锁
// 创建锁对象
// SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
// 使用Redisson分布式锁
RLock lock = redissonClient.getLock("lock:order:" + userId);
// 获取锁对象
// 无参的tryLock(),默认等待时间是-1表示一直等待,过期时间默认是30s
boolean isLock = lock.tryLock();
// 加锁失败
if (!isLock) {
return Result.fail("不允许重复下单");
}
try {
// 获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
// 释放锁
lock.unlock();
}
}
}
经测试后,Redisson分布式锁也可以防止一人多单场景
3. Redission可重入锁原理
(1)可重入锁流程分析

注意:这里的存放锁的数据结构是Hash结构,因为多了一个字段value存重入次数
为了保证获取锁和释放锁的原子性,需要分别通过Lua脚本进行获取锁和释放锁:
- 获取锁的Lua脚本

- 释放锁的Lua脚本

(2)演示可重入锁
编写测试方法:
@Slf4j
@SpringBootTest
class RedissonTest {
@Resource
private RedissonClient redissonClient;
private RLock lock;
@BeforeEach
void setUp() {
lock = redissonClient.getLock("order");
}
@Test
void method1() throws InterruptedException {
// 尝试获取锁
boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
if (!isLock) {
log.error("获取锁失败 .... 1");
return;
}
try {
log.info("获取锁成功 .... 1");
method2();
log.info("开始执行业务 ... 1");
} finally {
log.warn("准备释放锁 .... 1");
lock.unlock();
}
}
void method2() {
// 尝试获取锁
boolean isLock = lock.tryLock();
if (!isLock) {
log.error("获取锁失败 .... 2");
return;
}
try {
log.info("获取锁成功 .... 2");
log.info("开始执行业务 ... 2");
} finally {
log.warn("准备释放锁 .... 2");
lock.unlock();
}
}
}
2022-07-29 11:04:38.030 INFO 3244 --- [ main] com.hmdp.RedissonTest : 获取锁成功 .... 1
2022-07-29 11:04:44.859 INFO 3244 --- [ main] com.hmdp.RedissonTest : 获取锁成功 .... 2
2022-07-29 11:04:45.343 INFO 3244 --- [ main] com.hmdp.RedissonTest : 开始执行业务 ... 2
2022-07-29 11:04:47.307 WARN 3244 --- [ main] com.hmdp.RedissonTest : 准备释放锁 .... 2
2022-07-29 11:04:52.796 INFO 3244 --- [ main] com.hmdp.RedissonTest : 开始执行业务 ... 1
2022-07-29 11:04:53.309 WARN 3244 --- [ main] com.hmdp.RedissonTest : 准备释放锁 .... 1
(3)Redisson源码分析
Redisson锁的接口:
public interface RLock extends Lock, RLockAsync {
String getName();
void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException;
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
void lock(long leaseTime, TimeUnit unit);
boolean forceUnlock();
boolean isLocked();
boolean isHeldByThread(long threadId);
boolean isHeldByCurrentThread();
int getHoldCount();
long remainTimeToLive();
}
调用Redisson锁的尝试加锁、释放锁的方法:
private RLock lock;
// 尝试获取锁
boolean isLock = lock.tryLock();
// 释放锁
lock.unlock();
RedissonLock尝试加锁源码:
public class RedissonLock extends RedissonExpirable implements RLock {
// 1
@Override
public boolean tryLock() {
return get(tryLockAsync());
}
// 2
@Override
public RFuture<Boolean> tryLockAsync() {
return tryLockAsync(Thread.currentThread().getId());
}
// 3
@Override
public RFuture<Boolean> tryLockAsync(long threadId) {
// 由此可知,调用无参的tryLock()方法时,waitTime默认等待时间是-1
return tryAcquireOnceAsync(-1, -1, null, threadId);
}
// 4
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
// 调用无参的tryLock()方法时,leaseTime会被赋值为-1,所以执行到这里
// 最终将leaseTime默认设置为30s(long lockWatchdogTimeout = 30 * 1000;)
// 该方法是异步Future方法
RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(waitTime,
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining) {
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
// 5 此处就是底层源码尝试加锁的lua脚本
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " + // 获取锁成功返回nil
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " + // 获取锁成功返回nil
"end; " +
"return redis.call('pttl', KEYS[1]);", // 获取锁失败返回锁的剩余有效时间,pttl表示以毫秒为单位
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
}
RedissonLock释放锁源码:
// 1
@Override
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
}
// 2
@Override
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
return;
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
result.trySuccess(null);
});
return result;
}
// 3 此处就是底层源码释放锁的lua脚本
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " + // 释放锁后,会publish发送通知
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
4. Redisson可重试原理
调用Redisson锁的带等待时间的尝试加锁方法:
private RLock lock;
// 尝试获取锁,传入等待时间
boolean isLock = lock.tryLock(1L, TimeUtil.SECONDS);
Redisson带等待时间的尝试加锁源码:
// 1
@Override
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
return tryLock(waitTime, -1, unit);
}
// 2
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
// 将等待时间转为毫秒
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
// 尝试获取锁,最终会调到尝试获取锁的lua脚本
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
// 返回nil表示获取锁成功
return true;
}
// System.currentTimeMillis() - current 表示前面代码执行的时间
// time -= System.currentTimeMillis() - current 表示剩余等待时间
time -= System.currentTimeMillis() - current;
if (time <= 0) {
// 没有剩余等待时间,则直接返回获取锁失败
acquireFailed(waitTime, unit, threadId);
return false;
}
current = System.currentTimeMillis();
// 上一步获取锁失败后,则会在此等待并订阅其他线程释放锁的通知
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
// 在剩余等待时间内还没收到释放锁的通知,则会取消订阅并返回获取锁失败
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
// 取消订阅
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
// 返回失败
return false;
}
try {
// 继续计算剩余等待时间
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
while (true) {
long currentTime = System.currentTimeMillis();
// 再次重试获取锁
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// waiting for message
currentTime = System.currentTimeMillis();
// 通过信号量getLatch(),再次等待其他线程释放锁
if (ttl >= 0 && ttl < time) {
subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
unsubscribe(subscribeFuture, threadId);
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}
5. Redisson的WatchDog机制
- WatchDog机制(看门狗机制),主要是给锁续期,防止业务没执行完而将锁错误释放
- 注意:当自己不设置有效期时,leaseTime会被默认设为-1时,此时才有WatchDog机制;如果自己设置了有效期,则没有WatchDog机制
WatchDog机制源码:
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 异步Future方法
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
// 当tryLockInnerAsync回调完成后,执行该方法
// ttlRemaining表示剩余有效期,e表示异常
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
// 有异常时,直接返回
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
// 执行加锁lua脚本返回nil时,表示加锁成功
// 自动给锁续期
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
// EXPIRATION_RENEWAL_MAP是ConcurrentHashMap(private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>();)
// getEntryName()是获取当前锁的名称
// 所有的Redisson分布式锁都会存到EXPIRATION_RENEWAL_MAP中
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
// addThreadId(threadId);记录该线程获取锁的次数
// 注意,只是第一次加锁时,将定时任务存到ExpirationEntry中,后面再重入时该任务仍然存在,所以不需要再设置,即不用调用renewExpiration()方法
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
// 第一次加锁时,会更新有效期,即续期
renewExpiration();
}
}
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
// 创建延时任务
// internalLockLeaseTime / 3 表示三分之一的有效时间之后,再执行续期任务
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// 更新有效期,底层通过lua脚本,pexpire命令进行重新赋值有效时间
RFuture<Boolean> future = renewExpirationAsync(threadId);
// 回调完成后,递归调用renewExpiration()方法
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// reschedule itself
// 递归调用renewExpiration()方法,这样就实现了不断续期机制
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
// 将定时任务存到ExpirationEntry中,后面将会不断续期
// 注意,只是第一次加锁时,将定时任务存到ExpirationEntry中,后面再重入时该任务仍然存在,所以不需要再设置,即不用调用renewExpiration()方法
ee.setTimeout(task);
}
// 底层通过lua脚本,pexpire命令进行重新赋值有效时间
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
释放锁时,取消定时续期任务:
lock.unlock() -> unlockAsync(Thread.currentThread().getId()) -> cancelExpirationRenewal(threadId)
void cancelExpirationRenewal(Long threadId) {
// 通过锁的名称获取该锁的定时任务并移除掉
ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (task == null) {
return;
}
if (threadId != null) {
task.removeThreadId(threadId);
}
if (threadId == null || task.hasNoThreads()) {
Timeout timeout = task.getTimeout();
if (timeout != null) {
timeout.cancel();
}
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
}
}
6. 小结:Redisson分布式锁原理
Redisson分布式锁原理:
- 可重入:利用hash结构记录线程id和重入次数
- 可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制(使用等待唤醒机制,避免过多占用CPU,提高性能)
- 超时续约:利用watchDog,每隔一段时间(releaseTime / 3),重置超时时间

二、Redission锁的MutiLock原理
1. Redisson分布式锁主从一致性问题
- 如果只有一台redis服务器,当redis挂掉后,依赖redis的业务都会受影响,所以为了提高redis的可用性,则需要搭建redis的主从模式
- 一般会通过一主多从,实现读写分离;主节点处理所有写操作,从节点处理所有读操作;
- 从节点需要通过主从同步后,才会有数据,但是主从同步时,存在延时,有延时就会导致主从一致性问题

- 当主节点挂掉时,会选一个从节点作为新的主节点
- 当新主节点没来得及同步之前挂掉的旧主节点的锁数据时,此时新主节点没有锁数据,所以相当于之前获取的锁都将失效
- 这时,其他线程就会获取锁成功,造成并发安全问题

2. Redisson使用MutiLock解决主从一致性问题
- MutiLock表示联锁,将多个节点的锁联合在一起
- 由于主从模式会导致一致性问题,所以Redisson将每个节点都视为主节点,所有节点都是独立的,每个节点都能进行读写操作
- 此时获取锁的方式是:依次从每个节点获取锁,都获取成功才算成功
- 如果想提高可用性,每个主节点可以再创建各自的从节点

- 就算有一个主节点挂掉,造成锁失效,但是只要其他任意一个主节点的锁没失效,新来的线程就会加锁失败,这样就避免了并发安全问题

(1)演示MutiLock
配置3个节点6379、6380、6381,演示MutiLock:
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
// 配置类
Config config = new Config();
// 添加redis地址,这里添加了单点的地址,也可以使用config.useClusterServers()添加集群地址
config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("123321");
// 创建客户端
return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient2(){
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6380").setPassword("123321");
return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient3(){
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6381").setPassword("123321");
return Redisson.create(config);
}
}
编写测试方法:
@Slf4j
@SpringBootTest
class RedissonTest {
@Resource
private RedissonClient redissonClient;
@Resource
private RedissonClient redissonClient2;
@Resource
private RedissonClient redissonClient3;
private RLock lock;
@BeforeEach
void setUp() {
RLock lock1 = redissonClient.getLock("order");
RLock lock2 = redissonClient2.getLock("order");
RLock lock3 = redissonClient3.getLock("order");
// 创建联锁,MultiLock
lock = redissonClient.getMultiLock(lock1, lock2, lock3);
}
@Test
void method1() throws InterruptedException {
// 尝试获取锁
boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
if (!isLock) {
log.error("获取锁失败 .... 1");
return;
}
try {
log.info("获取锁成功 .... 1");
method2();
log.info("开始执行业务 ... 1");
} finally {
log.warn("准备释放锁 .... 1");
lock.unlock();
}
}
void method2() {
// 尝试获取锁
boolean isLock = lock.tryLock();
if (!isLock) {
log.error("获取锁失败 .... 2");
return;
}
try {
log.info("获取锁成功 .... 2");
log.info("开始执行业务 ... 2");
} finally {
log.warn("准备释放锁 .... 2");
lock.unlock();
}
}
}
测试结果:

- 联锁就是将每个独立的锁联合在一起,每个独立的锁加锁原理跟Redisson分布式锁原理相同,只是每个独立的锁都加锁成功后,才表示加锁成功
(2)RedissonMultiLock 原理
RedissonMultiLock源码:
public class RedissonMultiLock implements RLock {
@Override
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
return tryLock(waitTime, -1, unit);
}
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long newLeaseTime = -1;
// 如果自己设置了有效期leaseTime,则执行这里;如果没设置有效期,则leaseTime默认为-1,不执行这里
if (leaseTime != -1) {
// 等待时间waitTime为-1表示只会等待锁释放,不会去重试获取锁
if (waitTime == -1) {
newLeaseTime = unit.toMillis(leaseTime);
} else {
// 等待时间waitTime不为-1表示锁获取失败后,想要继续重试获取锁
// 并且将有效期重新设置为waitTime的2倍,防止等待重试期间,锁到达有效期而被释放,所以有效时间需要比等待时间长才行
newLeaseTime = unit.toMillis(waitTime)*2;
}
}
long time = System.currentTimeMillis();
long remainTime = -1;
if (waitTime != -1) {
// 当传入了自己设置的等待时间时,将会使用自己设置的等待时间作为剩余等待时间
remainTime = unit.toMillis(waitTime);
}
// 锁等待的时间lockWaitTime 就是剩余等待时间remainTime
long lockWaitTime = calcLockWaitTime(remainTime);
// failedLocksLimit()这个值是0,即failedLocksLimit = 0
int failedLocksLimit = failedLocksLimit();
// 获取锁成功的集合,locks是所有独立锁的集合(final List<RLock> locks = new ArrayList<>();)
List<RLock> acquiredLocks = new ArrayList<>(locks.size());
// 遍历locks联锁中每个独立的锁
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
try {
// lock.tryLock()就是RedissonLock中尝试获取锁的方法
if (waitTime == -1 && leaseTime == -1) {
lockAcquired = lock.tryLock();
} else {
long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (RedisResponseTimeoutException e) {
unlockInner(Arrays.asList(lock));
lockAcquired = false;
} catch (Exception e) {
lockAcquired = false;
}
if (lockAcquired) {
// 每个独立的锁获取成功后,都会加入到获取锁成功的集合中
acquiredLocks.add(lock);
} else {
// 获取锁失败后,执行else里面逻辑
// 这里是跳出for循环的条件
if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
break;
}
// failedLocksLimit初始值就是0,所以满足该条件
if (failedLocksLimit == 0) {
// 先释放成功获取锁的集合
unlockInner(acquiredLocks);
// waitTime为-1表示不会进行重试,则第一次失败,就直接返回失败
if (waitTime == -1) {
return false;
}
// waitTime不为-1表示获取锁失败后,想继续重试获取锁
failedLocksLimit = failedLocksLimit();
// 先清空成功获取锁的集合
acquiredLocks.clear();
// reset iterator
// 重新从独立锁集合的第一个锁开始遍历
while (iterator.hasPrevious()) {
iterator.previous();
}
// 继续进入for循环重试获取锁,直到获取锁成功或者没有剩余等待时间后,退出循环
} else {
failedLocksLimit--;
}
}
// 判断剩余时间
if (remainTime != -1) {
remainTime -= System.currentTimeMillis() - time;
time = System.currentTimeMillis();
if (remainTime <= 0) {
// 获取锁超时后,先将之前获取锁成功的集合释放,然后再返回失败
unlockInner(acquiredLocks);
return false;
}
}
// 如果还有剩余时间,则继续执行for循环,遍历下一个独立锁
}
// 如果自己设置了锁的有效期leaseTime,则会执行这里;
// 否则leaseTime默认为-1,不会执行这里,而是通过WatchDog机制续期
if (leaseTime != -1) {
List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
// 所有锁获取成功后,遍历每个成功获取的锁,重新设置有效期
// 因为每个独立锁是依次获取的,
// 例如:第一个锁获取成功后,还需要等最后一个锁也获取成功,等待的过程中,前面的锁的有效期提前倒计时消耗了一部分
// 所以,当所有锁获取成功后,需要重新设置有效期,保证所有锁的有效期从头开始倒计时
for (RLock rLock : acquiredLocks) {
RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
futures.add(future);
}
for (RFuture<Boolean> rFuture : futures) {
rFuture.syncUninterruptibly();
}
}
return true;
}
}
三、分布式锁总结
1. 不可重入Redis分布式锁:
- 原理:利用setnx的互斥性;利用ex避免死锁;释放锁时判断线程标识
- 缺陷:不可重入、无法重试、锁超时失效
2. 可重入的Redis分布式锁:
- 原理:利用hash结构,记录线程标识和重入次数;利用watchDog延续锁时间;利用信号量控制锁重试等待
- 缺陷:redis宕机引起锁失效问题
3. Redisson的multiLock:
- 原理:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功
- 缺陷:运维成本高、实现复杂
边栏推荐
- 绘图问题记录
- 【C语言刷LeetCode】592. 分数加减运算(M)
- 新手入门上位机开发 C#语言:PC串口发送数据
- 华宝新能通过注册:拟募资近7亿 营收增加利润反而下降
- 解决:npm ERR code ELIFECYCLE npm ERR errno 1(安装脚手架过程中,在npm run dev 时发生错误)
- HCIP 第十四天
- uni-app实现跨端开发手机蓝牙接收和发送数据
- Is it difficult for AI to land?Cloud native helps enterprises quickly apply machine learning MLOps
- 中级-面试题目整理
- 共享内存-内存映射-共享文件对象
猜你喜欢

HCIP 第十四天

The speed of life and death, every second counts
![[Notes] Stuttering word segmentation to draw word cloud map](/img/a1/05504ad82d4670386d1cc233291c6a.png)
[Notes] Stuttering word segmentation to draw word cloud map

解决:npm ERR code ELIFECYCLE npm ERR errno 1(安装脚手架过程中,在npm run dev 时发生错误)

FL Studio官方20.9中文版无需汉化补丁,正确安装并设置切换

1050的显卡,为何在Steam上的显卡使用率排行榜一直都是前五

The display and hiding of widgets for flutter learning

Tibetan Mapping

基于数据驱动故障预测的多台电力设备预测性维护调度

一文读懂Elephant Swap,为何为ePLATO带来如此高的溢价?
随机推荐
超详细的MySQL基本操作
绘制热度图、频谱图、地形图、colormap
表达式计算器 ExpressionRunner
单片机没有随机数发生器如何生成随机数——2022.07.26
go bidirectional streaming mode
go 双向流模式
音视频开发的正确(学习思路+技术指导)
【C语言刷LeetCode】2295. 替换数组中的元素(M)
matlab洗碗机节水模型的优化设计-这是个课题名称,不是买洗碗机,审核的人仔细看下,谢谢
一文读懂Elephant Swap,为何为ePLATO带来如此高的溢价?
fluttter学习之ButtonStyle 、MaterialStateProperty
un7.29:Linux——centos中如何安装与配置redis?
[3D检测系列-PointRCNN]复现PointRCNN代码,并实现PointRCNN3D目标检测可视化,包含预训练权重下载链接(从0开始以及各种报错的解决方法)
新手入门上位机开发 C#语言:PC串口发送数据
五种绑定彻底弄懂this,默认绑定、隐式绑定、显式绑定、new绑定、箭头函数绑定详解
RAII技术学习
1050的显卡,为何在Steam上的显卡使用率排行榜一直都是前五
FL Studio官方20.9中文版无需汉化补丁,正确安装并设置切换
HCIP 第十五天
乖宝宠物IPO过会:年营收25.75亿 KKR与君联是股东