当前位置:网站首页>Redisson high performance redis distributed lock source code analysis
Redisson high performance redis distributed lock source code analysis
2022-07-02 17:52:00 【Operation and maintenance development story】
WeChat official account : O & M development story , author : Lao Zheng
Redisson The mechanism of implementing distributed lock is as follows :
Principle description
Thread first 1 Get the lock , If the lock is successfully obtained , Then a background thread will be started , Each interval 10 Seconds for renewal .
Concurrency , Threads 2 Will lock , If the lock cannot be acquired , Then it will spin and wait , Wait for a certain number of times , Thread blocking will occur , And subscribe to unlock messages .
When a thread 1 After releasing the lock , Will trigger redis Unlock message for , The observer of the message will observe and wake up the unlocking logic , Threads 2 Keep competing for locks .
For lock reentry ,Redisson It's through hash Data type , Will store the current thread tid ( Essence is generative uuid only id).
Test code
Next, we will use an example of second kill to illustrate :
Dependent version
implementation 'org.redisson:redisson-spring-boot-starter:3.17.0'
Test code
The following is a simulation of a commodity spike , The sample code is as follows :
public class RedissonTest {
public static void main(String[] args) {
//1. Configuration part
Config config = new Config();
String address = "redis://";
SingleServerConfig serverConfig = config.useSingleServer();
Redisson redisson = (Redisson) Redisson.create(config);
RLock rLock = redisson.getLock("goods:1000:1");
//2. Lock
try {
System.out.println("todo Logical processing 1000000.");
} finally {
if (rLock.isLocked() && rLock.isHeldByCurrentThread()) {
//3. Unlock
Lock design
Is the core code of locking , Let's take a look at the call stack The core method of locking is :
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
In fact, its essence is to call a paragraph LUA The script locks , It should be noted that the data type used in this place is hash. This is for hash The advantage is that you can pass the same key To store reentrant tid
Lock renewal design
The renewal of the lock is in org.redisson.RedissonLock#tryAcquireAsync
Call in method scheduleExpirationRenewal
Realized .
What we need to pay attention to when renewing is , The watchdog is set in the thread of the delay queue of the main thread .
The advantage here is that if I am in a process , At the same time 1000 The lock , We don't need to start 1000 Sub thread to renew , Just create 1000 Only one renewal task object , The renewal thread will not wake up until the renewal time .
The code is as follows :
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime != -1) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
// lock acquired
if (ttlRemaining == null) {
if (leaseTime != -1) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
// Lock expiration renewal
return ttlRemaining;
return new CompletableFutureWrapper<>(f);
Lock renewal scheduleExpirationRenewal
The code is as follows :
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
} else {
try {
} finally {
if (Thread.currentThread().isInterrupted()) {
And then call renewExpiration();
Execute renewal logic , In fact, this is a scheduled task + Recursive way to achieve renewal , The advantage of using timed tasks is that you don't have to drive N Word thread , Just create the corresponding task object .
remarks : If the super extreme situation N The lock , At the same time lock , Simultaneous demand . We can consider the validity of the lock , Add a floating time to it, such as 100 - 500ms. In this way, it can be avoided to a certain extent ( The reference is cache invalidation / Solution to breakdown )
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
// Create deferred tasks
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
// Real renewal , call LUA Script renewal
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
// If the renewal is successful
if (res) {
// reschedule itself
} else {
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
Here is another small point , The renewal time is 1/3 Why? ? Ensure that the lock will not expire when it is renewed next time , If it is 1/2 Maybe when the next scheduled task is executed key It's overdue , If it is less than 1/3 It will lead to frequent renewal , Task cost / The income ratio is not high .
Method , It's still a paragraph LUA Script , Reset the expiration time of the lock .
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
internalLockLeaseTime, getLockName(threadId));
Lock spin retry
org.redisson.RedissonLock#lock(long, java.util.concurrent.TimeUnit, boolean)
When the lock acquisition fails , Will enter retry . In fact, it will be implemented here 18 OK, later while (true)
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
// Subscribe to messages with expired locks
CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
RedissonLockEntry entry;
if (interruptibly) {
entry = commandExecutor.getInterrupted(future);
} else {
entry = commandExecutor.get(future);
try {
while (true) {
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
// waiting for message
if (ttl >= 0) {
try {
// Timeout of blocking lock , Wait until the lock expires before trying to lock it
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
if (interruptibly) {
} else {
} finally {
unsubscribe(entry, threadId);
// get(lockAsync(leaseTime, unit));
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
In fact, this is an intermittent spin . Wait until the last time the lock expired , Grab the lock when waking up entry.getLatch().acquire();
The subscription lock is invalid
Another logic is
CompletableFuture future = subscribe(threadId);
This is actually a subscription to a message , If after unlocking , Will release the unlocking message . Then wake up the thread that currently competes for locks many times to enter sleep .
Unlock design
rLock.unlock(); The core of is to release the lock , Undo renewal and wake up the thread waiting for locking ( Release the unlocking success message ).
The core approach ( Unlock ): org.redisson.RedissonLock#unlockInnerAsync
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
// Release the unlocking success message
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
still LUA How to execute .
Revoke lock renewal
The core approach org.redisson.RedissonBaseLock#unlockAsync(long)
public RFuture<Void> unlockAsync(long threadId) {
// Unlock
RFuture<Boolean> future = unlockInnerAsync(threadId);
// Revoke renewal
CompletionStage<Void> f = future.handle((opStatus, e) -> {
if (e != null) {
throw new CompletionException(e);
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
throw new CompletionException(cause);
return null;
return new CompletableFutureWrapper<>(f);
Successfully unlock the queued thread
stay org.redisson.pubsub.LockPubSub#onMessage
Go back to wake up the blocked thread , Let's execute the previous lock spin logic , The specific code is as follows :
protected void onMessage(RedissonLockEntry value, Long message) {
if (message.equals(UNLOCK_MESSAGE)) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute != null) {
} else if (message.equals(READ_UNLOCK_MESSAGE)) {
while (true) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute == null) {
official account : O & M development story
Love life , Love operation and maintenance
- What are the green field and brown field models in software development - green field development and brown field development
- Yingguang single chip microcomputer pms150/pmc150/pms150c consumer single chip microcomputer
- 每日一题——倒置字符串
- Turn off the xshell connection server and the running jar package will stop automatically
- 把xshell连接服务器关掉,运行的jar包就自动停止的解决方案
- 应广单片机开发 工规 PMC131 带AD芯片检测电池电压单片机SOP8/14
- 【Zuul】com. netflix. zuul. exception. ZuulException: Hystrix Readed time out
- 第十五章 字符串本地化和消息字典(一)
- ASEMI整流桥UMB10F参数,UMB10F规格,UMB10F封装
- 辉芒微IO单片机FT60F011A-RB
【Zuul】com. netflix. zuul. exception. ZuulException: Hystrix Readed time out
[how is the network connected] Chapter 4 explores access networks and network operators
Bluetooth technology | new working mode of wearable devices of the Internet of things, and Bluetooth ble helps the new working mode
Alibaba cloud sub account - Permission Policy - full control permission granted to an account and an OSS bucket
[today in history] July 2: BitTorrent came out; The commercial system linspire was acquired; Sony deploys Playstation now
515. 在每个树行中找最大值
2 juillet: BitTorrent est sorti; L'acquisition du système commercial linspire; Sony Deployment PlayStation now
Use of nexttile function in MATLAB
Yilong em78p153k dip14 MCU
Daily question - inverted string
微信小程序 —— 上下浮动的箭头
[nonlinear control theory]8_ Comparison of three robust controllers
原装应广单片机 MCU芯片PMS152 SOP8封装 单片机开发
When the industrial Internet began to enter the deep-water area, it appeared more in the form of industry
[target tracking] |siamfc
Yingguang single chip microcomputer pms150/pmc150/pms150c consumer single chip microcomputer
Wasserstein Slim GAIN with Clipping Penalty(WSGAIN-CP)介绍及代码实现——基于生成对抗网络的缺失数据填补