当前位置:网站首页>Redisson high performance redis distributed lock source code analysis
Redisson high performance redis distributed lock source code analysis
2022-07-02 17:52:00 【Operation and maintenance development story】
WeChat official account : O & M development story , author : Lao Zheng
Redisson The mechanism of implementing distributed lock is as follows :
Principle description
Thread first 1 Get the lock , If the lock is successfully obtained , Then a background thread will be started , Each interval 10 Seconds for renewal .
Concurrency , Threads 2 Will lock , If the lock cannot be acquired , Then it will spin and wait , Wait for a certain number of times , Thread blocking will occur , And subscribe to unlock messages .
When a thread 1 After releasing the lock , Will trigger redis Unlock message for , The observer of the message will observe and wake up the unlocking logic , Threads 2 Keep competing for locks .
For lock reentry ,Redisson It's through hash Data type , Will store the current thread tid ( Essence is generative uuid only id).
Test code
Next, we will use an example of second kill to illustrate :
Dependent version
implementation 'org.redisson:redisson-spring-boot-starter:3.17.0'
Test code
The following is a simulation of a commodity spike , The sample code is as follows :
public class RedissonTest {
public static void main(String[] args) {
//1. Configuration part
Config config = new Config();
String address = "redis://127.0.0.1:6379";
SingleServerConfig serverConfig = config.useSingleServer();
serverConfig.setAddress(address);
serverConfig.setDatabase(0);
config.setLockWatchdogTimeout(5000);
Redisson redisson = (Redisson) Redisson.create(config);
RLock rLock = redisson.getLock("goods:1000:1");
//2. Lock
rLock.lock();
try {
System.out.println("todo Logical processing 1000000.");
} finally {
if (rLock.isLocked() && rLock.isHeldByCurrentThread()) {
//3. Unlock
rLock.unlock();
}
}
}
}
Lock design
rLock.lock();
Is the core code of locking , Let's take a look at the call stack The core method of locking is :
org.redisson.RedissonLock#tryLockInnerAsync
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
In fact, its essence is to call a paragraph LUA The script locks , It should be noted that the data type used in this place is hash. This is for hash The advantage is that you can pass the same key To store reentrant tid
Lock renewal design
The renewal of the lock is in org.redisson.RedissonLock#tryAcquireAsync
Call in method scheduleExpirationRenewal
Realized .
What we need to pay attention to when renewing is , The watchdog is set in the thread of the delay queue of the main thread .
The advantage here is that if I am in a process , At the same time 1000 The lock , We don't need to start 1000 Sub thread to renew , Just create 1000 Only one renewal task object , The renewal thread will not wake up until the renewal time .
tryAcquireAsync
The code is as follows :
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime != -1) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
// lock acquired
if (ttlRemaining == null) {
if (leaseTime != -1) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
// Lock expiration renewal
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper<>(f);
}
Lock renewal scheduleExpirationRenewal
The code is as follows :
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
try {
renewExpiration();
} finally {
if (Thread.currentThread().isInterrupted()) {
cancelExpirationRenewal(threadId);
}
}
}
}
And then call renewExpiration();
Execute renewal logic , In fact, this is a scheduled task + Recursive way to achieve renewal , The advantage of using timed tasks is that you don't have to drive N Word thread , Just create the corresponding task object .
remarks : If the super extreme situation N The lock , At the same time lock , Simultaneous demand . We can consider the validity of the lock , Add a floating time to it, such as 100 - 500ms. In this way, it can be avoided to a certain extent ( The reference is cache invalidation / Solution to breakdown )
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
// Create deferred tasks
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// Real renewal , call LUA Script renewal
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
// If the renewal is successful
if (res) {
// reschedule itself
renewExpiration();
} else {
cancelExpirationRenewal(null);
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
Here is another small point , The renewal time is 1/3 Why? ? Ensure that the lock will not expire when it is renewed next time , If it is 1/2 Maybe when the next scheduled task is executed key It's overdue , If it is less than 1/3 It will lead to frequent renewal , Task cost / The income ratio is not high .
renewExpirationAsync
Method , It's still a paragraph LUA Script , Reset the expiration time of the lock .
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}
Lock spin retry
org.redisson.RedissonLock#lock(long, java.util.concurrent.TimeUnit, boolean)
When the lock acquisition fails , Will enter retry . In fact, it will be implemented here 18 OK, later while (true)
Logic
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
// Subscribe to messages with expired locks
CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
RedissonLockEntry entry;
if (interruptibly) {
entry = commandExecutor.getInterrupted(future);
} else {
entry = commandExecutor.get(future);
}
try {
while (true) {
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
try {
// Timeout of blocking lock , Wait until the lock expires before trying to lock it
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
entry.getLatch().acquire();
} else {
entry.getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(entry, threadId);
}
// get(lockAsync(leaseTime, unit));
}
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
In fact, this is an intermittent spin . Wait until the last time the lock expired , Grab the lock when waking up entry.getLatch().acquire();
The subscription lock is invalid
Another logic is
CompletableFuture future = subscribe(threadId);
This is actually a subscription to a message , If after unlocking , Will release the unlocking message . Then wake up the thread that currently competes for locks many times to enter sleep .
Unlock design
rLock.unlock(); The core of is to release the lock , Undo renewal and wake up the thread waiting for locking ( Release the unlocking success message ).
The core approach ( Unlock ): org.redisson.RedissonLock#unlockInnerAsync
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
// Release the unlocking success message
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
still LUA How to execute .
Revoke lock renewal
The core approach org.redisson.RedissonBaseLock#unlockAsync(long)
@Override
public RFuture<Void> unlockAsync(long threadId) {
// Unlock
RFuture<Boolean> future = unlockInnerAsync(threadId);
// Revoke renewal
CompletionStage<Void> f = future.handle((opStatus, e) -> {
cancelExpirationRenewal(threadId);
if (e != null) {
throw new CompletionException(e);
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
throw new CompletionException(cause);
}
return null;
});
return new CompletableFutureWrapper<>(f);
}
Successfully unlock the queued thread
stay org.redisson.pubsub.LockPubSub#onMessage
Go back to wake up the blocked thread , Let's execute the previous lock spin logic , The specific code is as follows :
@Override
protected void onMessage(RedissonLockEntry value, Long message) {
if (message.equals(UNLOCK_MESSAGE)) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute != null) {
runnableToExecute.run();
}
value.getLatch().release();
} else if (message.equals(READ_UNLOCK_MESSAGE)) {
while (true) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute == null) {
break;
}
runnableToExecute.run();
}
value.getLatch().release(value.getLatch().getQueueLength());
}
}
official account : O & M development story
github:https://github.com/orgs/sunsharing-note/dashboard
Love life , Love operation and maintenance
边栏推荐
- What are the green field and brown field models in software development - green field development and brown field development
- Yingguang single chip microcomputer pms150/pmc150/pms150c consumer single chip microcomputer
- 每日一题——倒置字符串
- Turn off the xshell connection server and the running jar package will stop automatically
- 把xshell连接服务器关掉,运行的jar包就自动停止的解决方案
- 应广单片机开发 工规 PMC131 带AD芯片检测电池电压单片机SOP8/14
- 【Zuul】com. netflix. zuul. exception. ZuulException: Hystrix Readed time out
- 第十五章 字符串本地化和消息字典(一)
- ASEMI整流桥UMB10F参数,UMB10F规格,UMB10F封装
- 辉芒微IO单片机FT60F011A-RB
猜你喜欢
【Zuul】com. netflix. zuul. exception. ZuulException: Hystrix Readed time out
freemarker+poi实现动态生成excel文件及解析excel文件
[how is the network connected] Chapter 4 explores access networks and network operators
Bluetooth technology | new working mode of wearable devices of the Internet of things, and Bluetooth ble helps the new working mode
Alibaba cloud sub account - Permission Policy - full control permission granted to an account and an OSS bucket
VirtualLab基础实验教程-7.偏振(2)
[today in history] July 2: BitTorrent came out; The commercial system linspire was acquired; Sony deploys Playstation now
515. 在每个树行中找最大值
2 juillet: BitTorrent est sorti; L'acquisition du système commercial linspire; Sony Deployment PlayStation now
体验一下阿里云文字识别OCR
随机推荐
[非线性控制理论]8_三种鲁棒控制器的比较
应广单片机开发案例
Use of nexttile function in MATLAB
Yilong em78p153k dip14 MCU
Daily question - inverted string
微信小程序 —— 上下浮动的箭头
[nonlinear control theory]8_ Comparison of three robust controllers
原装应广单片机 MCU芯片PMS152 SOP8封装 单片机开发
When the industrial Internet began to enter the deep-water area, it appeared more in the form of industry
一日2篇Nature!中科大校友段镶锋团队纳米材料新成果,曾是贝尔比奖章第三位华人得主...
[target tracking] |siamfc
台风来袭,多景区暂时关闭,省文旅厅提醒注意安全!
使用Zadig从0到1搭建持续交付平台
Yingguang single chip microcomputer pms150/pmc150/pms150c consumer single chip microcomputer
每日一题——“水仙花数”
把xshell連接服務器關掉,運行的jar包就自動停止的解决方案
Wasserstein Slim GAIN with Clipping Penalty(WSGAIN-CP)介绍及代码实现——基于生成对抗网络的缺失数据填补
透过华为军团看科技之变(六):智慧公路
Linux中,mysql设置job任务自动启动
售价仅40元,树莓派Pico开发板加入WiFi模块,刚上市就脱销