当前位置:网站首页>The strongest distributed locking tool: redisson
The strongest distributed locking tool: redisson
2022-07-28 13:47:00 【Java notes shrimp】
Click on the official account , utilize Fragment time to learn
One 、Redisson summary
What is? Redisson?
Redisson It's a Redis On the basis of implementation Java In memory data grid (In-Memory Data Grid). It not only provides a series of distributed Java Common objects , There are also many distributed services .
These include (BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson Provides the use of Redis The simplest and most convenient way .
Redisson The aim is to promote the user's understanding of Redis Separation of concerns (Separation of Concern), This allows users to focus more on processing business logic .
One is based on Redis Distributed tools implemented , There are basic distributed objects and high-level and abstract distributed services , For every programmer who tries to recreate the distributed wheel, it brings solutions to most of the distributed problems .
Redisson and Jedis、Lettuce What's the difference? ? It's not Lei Feng and Lei Feng tower
Redisson The difference between them is like a graphical interface operated by a mouse , A command line operation file .Redisson It's a higher level of abstraction ,Jedis and Lettuce yes Redis Encapsulation of commands .
Jedis yes Redis Officially launched for passing Java Connect Redis A toolkit for the client , Provides Redis Various commands support
Lettuce Is an extensible thread safe Redis client , The communication framework is based on Netty, Support advanced Redis characteristic , Like sentinels , colony , The Conduit , And automatic reconnection Redis Data model .Spring Boot 2.x Start Lettuce Has replaced Jedis Become the first choice Redis The client of .
Redisson It's set up in Redis On the basis of , Communication based Netty Comprehensive 、 New middleware , Used in enterprise development Redis The best example of
Jedis hold Redis The command is encapsulated ,Lettuce Then there is a richer Api, It also supports cluster and other modes . But both have reached the end , Only give you the operation Redis Scaffolding of database , and Redisson Is based on Redis、Lua and Netty Established a mature distributed solution , even to the extent that redis A tool set recommended by the government .
Two 、 Distributed lock
How to implement distributed lock ?
Distributed lock is a rigid requirement under concurrent business , Although there are various ways to achieve :ZooKeeper Yes Znode Sequential node , The database has table level lock and happy / Pessimistic locking ,Redis Yes setNx, But the same way , Finally, we should return to mutual exclusion , This paper introduces Redisson, Then take redis For example .
How to write a simple Redis Distributed lock ?
With Spring Data Redis For example , use RedisTemplate To operate Redis(setIfAbsent It's already setNx + expire Merge command ), as follows
// Lock
public Boolean tryLock(String key, String value, long timeout, TimeUnit unit) {
return redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit);
}
// Unlock , Prevent deleting others' locks by mistake , With uuid by value Check whether your lock
public void unlock(String lockName, String uuid) {
if(uuid.equals(redisTemplate.opsForValue().get(lockName)){ redisTemplate.opsForValue().del(lockName); }
}
// structure
if(tryLock){
// todo
}finally{
unlock;
}Simple 1.0 Version complete , Smart Xiao Zhang saw at a glance , This is a lock, right , but get and del The operation is non atomic , Once concurrency grows , Process security cannot be guaranteed . So Xiao Zhang proposed , use Lua Script
Lua What is a script ?
Lua The script is redis A lightweight and compact language that has been built in , Its implementation is through redis Of eval/evalsha Command to run , Encapsulate the operation into a Lua Script , For example, it is an atomic operation performed at one time .
therefore 2.0 Version passed Lua Script delete
lockDel.lua as follows
if redis.call('get', KEYS[1]) == ARGV[1]
then
-- Delete
return redis.call('del', KEYS[1])
else
-- You don't succeed , return 0
return 0
enddelete Execute when operating Lua command
// Unlock script
DefaultRedisScript<Object> unlockScript = new DefaultRedisScript();
unlockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lockDel.lua")));
// perform lua Script unlock
redisTemplate.execute(unlockScript, Collections.singletonList(keyName), value);2.0 It seems more like a lock , But there seems to be something missing , Xiao Zhang patted his head ,synchronized and ReentrantLock All very silky , Because they are reentrant locks , A thread will not deadlock even if it takes a lock many times , We need reentrant .
How to ensure reentry ?
Reentry is , It is allowed for the same thread to acquire the same lock multiple times , No deadlock , This point synchronized Deflection lock provides a good idea ,synchronized The implementation of reentry is in JVM level ,JAVA Object head MARK WORD Threads are hidden in ID And counter to make reentry judgment on the current thread , Avoid each time CAS.
When a thread accesses a synchronization block and acquires a lock , The biased thread will be stored in the lock record in the object header and stack frame ID, In the future, the thread does not need to enter or exit the synchronization block CAS Operate to lock and unlock , Simply test the header of the object Mark Word Whether there are biased locks pointing to the current thread stored in . If the test is successful , Indicates that the thread has acquired the lock . If the test fails , It needs to be tested again Mark Word Whether the middle deflection lock flag is set to 1: No rules CAS competition ; Set up , be CAS Point the object head to the current thread .
Maintain another counter , If the same thread enters, it will automatically increase 1, Leave and subtract 1, Until 0 In order to release
Reentrant lock
Copy the scheme , We need to transform Lua Script :
1. Need to store Lock name lockName、 Get the lock Threads id And the corresponding thread Number of entries count
2. Lock
Every time a thread acquires a lock , Judge whether the lock already exists
non-existent
Set up hash Of key For threads id,value Initialize to 1
Set expiration time
Return to obtain lock successfully true
There is
Continue to determine whether the current thread exists id Of hash key
There is , Threads key Of value + 1, The number of reentries increases 1, Set expiration time
non-existent , Return lock failed
3. Unlock
Every time the thread comes to unlock , Judge whether the lock already exists
There is
Is there a id Of hash key, If there is one, there will be less 1, If there is none, it will return to unlocking failure
reduce 1 after , Judge the remaining count Is it 0, by 0 It means that the lock is no longer needed , perform del Command deletion
1. Storage structure
For the convenience of maintaining this object , We use it Hash Structure to store these fields .Redis Of Hash similar Java Of HashMap, Suitable for storing objects .

hset lockname1 threadId 1
Set a name to lockname1 Of hash structure , The hash structure key by threadId, value value by 1
hget lockname1 threadId
obtain lockname1 Of threadId Value
The storage structure is
lockname Lock name
key1: threadId The only key , Threads id
value1: count Counter , Record the number of times the thread acquired the lock redis In the structure

2. Addition and subtraction of counter
When the same thread obtains the same lock , We need to counter the corresponding thread count Add and subtract
Judge a redis key Whether there is , It can be used exists, And judge a hash Of key Whether there is , It can be used hexists

and redis Also have hash Self increasing orders hincrby
Every time it's self increasing 1 when hincrby lockname1 threadId 1, Self reduction 1 when hincrby lockname1 threadId -1
3. Judgment of unlocking
When a lock is no longer needed , Unlock once every time ,count reduce 1, Until 0 when , Execution deletion
Integrate the above storage structure and judgment process , Lock and unlock Lua as follows
Lock lock.lua
local key = KEYS[1];
local threadId = ARGV[1];
local releaseTime = ARGV[2];
-- lockname non-existent
if(redis.call('exists', key) == 0) then
redis.call('hset', key, threadId, '1');
redis.call('expire', key, releaseTime);
return 1;
end;
-- The current thread has id There is
if(redis.call('hexists', key, threadId) == 1) then
redis.call('hincrby', key, threadId, '1');
redis.call('expire', key, releaseTime);
return 1;
end;
return 0;Unlock unlock.lua
local key = KEYS[1];
local threadId = ARGV[1];
-- lockname、threadId non-existent
if (redis.call('hexists', key, threadId) == 0) then
return nil;
end;
-- Counter -1
local count = redis.call('hincrby', key, threadId, -1);
-- Delete lock
if (count == 0) then
redis.call('del', key);
return nil;
end;Code
/**
* @description Native redis Implement distributed locks
**/
@Getter
@Setter
public class RedisLock {
private RedisTemplate redisTemplate;
private DefaultRedisScript<Long> lockScript;
private DefaultRedisScript<Object> unlockScript;
public RedisLock(RedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
// Load the locked script
lockScript = new DefaultRedisScript<>();
this.lockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lock.lua")));
this.lockScript.setResultType(Long.class);
// Load the script to release the lock
unlockScript = new DefaultRedisScript<>();
this.unlockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("unlock.lua")));
}
/**
* Get the lock
*/
public String tryLock(String lockName, long releaseTime) {
// Prefix of stored thread information
String key = UUID.randomUUID().toString();
// Execute the script
Long result = (Long) redisTemplate.execute(
lockScript,
Collections.singletonList(lockName),
key + Thread.currentThread().getId(),
releaseTime);
if (result != null && result.intValue() == 1) {
return key;
} else {
return null;
}
}
/**
* Unlock
* @param lockName
* @param key
*/
public void unlock(String lockName, String key) {
redisTemplate.execute(unlockScript,
Collections.singletonList(lockName),
key + Thread.currentThread().getId()
);
}
}So far, a distributed lock has been completed , Conform to mutual exclusion 、 Reentrant 、 Basic features of deadlock prevention .
Rigorous Xiao Zhang felt that although he was an ordinary mutex , It's stable enough , But there are always many special situations in business , such as A When the process obtains the lock , Because the business operation time is too long , The lock is released, but the business is still executing , But this moment B The process can normally get the lock for business operations , The two processes will still share resources .
And if you are responsible for storing this distributed lock Redis After the node goes down , And when the lock is in the locked state , This lock will be locked .
Xiao Zhang is not a gangster , Because inventory operations are always special .
So we hope that in this case , It can extend the lock releaseTime Delay releasing the lock until the desired result of the business is achieved , The operation of continuously extending the lock expiration time to ensure the completion of business execution is lock renewal .
Read write separation is also common , A business that reads more and writes less is for performance , There are often read locks and write locks .
And now the expansion has gone beyond the complexity of a simple wheel , Just deal with the renewal , It's enough for Xiao Zhang to drink a pot , Besides, in terms of performance ( The maximum waiting time of the lock )、 grace ( Invalid lock request )、 retry ( Failure retry mechanism ) And other aspects still need to be studied .
When Xiao Zhang was thinking hard , Next to Xiaobai came over and looked at Xiao Zhang , I'm curious , all 2021 Years. , Why not use it directly redisson Well ?
Redisson There is the lock you want .
3、 ... and 、Redisson Distributed lock
Known as simple Redisson What is the use posture of distributed lock ?
1. rely on
<!-- Native , Use of this chapter -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
<!-- Another kind Spring Integrate starter, Not used in this chapter -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.13.6</version>
</dependency>2. To configure
@Configuration
public class RedissionConfig {
@Value("${spring.redis.host}")
private String redisHost;
@Value("${spring.redis.password}")
private String password;
private int port = 6379;
@Bean
public RedissonClient getRedisson() {
Config config = new Config();
config.useSingleServer().
setAddress("redis://" + redisHost + ":" + port).
setPassword(password);
config.setCodec(new JsonJacksonCodec());
return Redisson.create(config);
}
}3. Enable distributed locks
@Resource
private RedissonClient redissonClient;
RLock rLock = redissonClient.getLock(lockName);
try {
boolean isLocked = rLock.tryLock(expireTime, TimeUnit.MILLISECONDS);
if (isLocked) {
// TODO
}
} catch (Exception e) {
rLock.unlock();
}Simple and clear , Just one RLock, Since I recommend Redisson, Just look inside and see how it works .
Four 、RLock
RLock yes Redisson The core interface of distributed lock , Inherited concurrent Bag Lock Interface and own RLockAsync Interface ,RLockAsync The return values of are RFuture, yes Redisson Execute the core logic of asynchronous implementation , It's also Netty Play the main position .
RLock How to lock ?
from RLock Get into , find RedissonLock class , find tryLock The method is then advanced to the executive tryAcquireOnceAsync Method , This is the main code for locking ( Different versions, different implementations here , And the latest 3.15.x There are certain differences , But the core logic remains unchanged . Here to 3.13.6 For example )
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1L) {
return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
} else {
RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining) {
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}Here comes leaseTime Judged by time 2 Branches , In fact, it is whether to set the expiration time when locking , Expiration time is not set (-1) Sometimes there will be watchDog Of Lock renewal ( Below ), A renewal task that has registered a lock event . Let's first look at the expiration time tryLockInnerAsync part ,
evalWriteAsync yes eval Command execution lua Entrance
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}Here, uncover the truth ,eval Command execution Lua Where the script is , Here Lua Script expansion
-- There is no such thing as key when
if (redis.call('exists', KEYS[1]) == 0) then
-- Add the lock and hash In this thread id Corresponding count Set up 1
redis.call('hincrby', KEYS[1], ARGV[2], 1);
-- Set expiration time
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- There is a place for key also hash Middle thread id Of key Also exist
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
-- Number of thread reentries ++
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
return redis.call('pttl', KEYS[1]);It's almost the same as the script we wrote for custom distributed locks , It seems redisson The same implementation , Specific parameter analysis :
// keyName
KEYS[1] = Collections.singletonList(this.getName())
// leaseTime
ARGV[1] = this.internalLockLeaseTime
// uuid+threadId Unique value of the combination
ARGV[2] = this.getLockName(threadId)in total 3 Parameters complete a logic :
Judge whether the lock has a corresponding hash Table exists ,
• There is no counterpart hash surface : be set The hash One of the tables entry Of key For the lock name ,value by 1, Then set the hash The expiration time of the table is leaseTime
• There is a corresponding hash surface : Then the lockName Of value perform +1 operation , That is to calculate the number of entries , Set the expiration time leaseTime
• Finally, return to the lock ttl The rest of the time
It is also no different from the above Custom Lock
In that case , There must be corresponding steps for unlocking -1 operation , Look again unlock Method , Also find the method name , All the way to
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end;if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.getName(), this.getChannelName()), new Object[]{LockPubSub.unlockMessage, this.internalLockLeaseTime, this.getLockName(threadId)});
}Pull out Lua part
-- non-existent key
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end;
-- Counter -1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
-- Expiration time reset
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
-- Delete and post the unlocking message
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil; The Lua KEYS Yes 2 individual Arrays.asList(getName(), getChannelName())
name Lock name
channelName, be used for pubSub The one who publishes the news channel name ARGV There are three variables LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)
LockPubSub.UNLOCK_MESSAGE,channel Type of message sent , Unlock here as 0
internalLockLeaseTime,watchDog Configured timeout , The default is 30s
lockName there lockName refer to uuid and threadId Unique value of the combination Steps are as follows :
1. If the lock does not exist, return nil;
2. If the lock exists, its thread's hash key Counter -1,
3. Counter counter>0, Reset the expiration time , return 0; otherwise , Delete the lock , Release the unlocking message unlockMessage, return 1;
among unLock It's time to use Redis Publish subscribe PubSub Complete message notification .
The step of subscription is RedissonLock Locked entrance lock In the method
long threadId = Thread.currentThread().getId();
Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
if (ttl != null) {
// subscribe
RFuture<RedissonLockEntry> future = this.subscribe(threadId);
if (interruptibly) {
this.commandExecutor.syncSubscriptionInterrupted(future);
} else {
this.commandExecutor.syncSubscription(future);
}
// Omit When the lock is occupied by another thread , By listening to the release notification of the lock ( In other threads through RedissonLock When releasing the lock , Publish and subscribe pub/sub Function initiation notification ), Wait for the lock to be released by other threads , It is also a common efficiency means to avoid spin .
1. Unlock message
To find out what was notified , What did you do after the notice , Get into LockPubSub.
There is only one obvious way to monitor onMessage, Its subscription and semaphore release are in the parent class PublishSubscribe, We only focus on the actual operation of monitoring events
protected void onMessage(RedissonLockEntry value, Long message) {
Runnable runnableToExecute;
if (message.equals(unlockMessage)) {
// Get the listening thread from the listener queue to execute the listening callback
runnableToExecute = (Runnable)value.getListeners().poll();
if (runnableToExecute != null) {
runnableToExecute.run();
}
// getLatch() The return is Semaphore, Semaphore , Here is the release semaphore
// After releasing the semaphore, you will wake up waiting entry.getLatch().tryAcquire Try to apply for the lock again
value.getLatch().release();
} else if (message.equals(readUnlockMessage)) {
while(true) {
runnableToExecute = (Runnable)value.getListeners().poll();
if (runnableToExecute == null) {
value.getLatch().release(value.getLatch().getQueueLength());
break;
}
runnableToExecute.run();
}
}
}One was found to be Default unlock message , One is Read lock unlock message , because redisson There are read-write locks , And read-write lock read-write situation and read-write 、 Writing is mutually exclusive , Let's just look at the default unlock message above unlockMessage Branch
LockPubSub Monitoring is finally implemented 2 thing
runnableToExecute.run()Execute listening callbackvalue.getLatch().release();Release semaphore
Redisson adopt LockPubSub Listen for unlocking messages , Execute the listening callback and release the semaphore to inform the waiting thread that it can re rob the lock .
Then come back to see tryAcquireOnceAsync The other branch
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1L) {
return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
} else {
RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining) {
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}You can see , When there is no timeout , After locking , Also executed a piece of inexplicable logic
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining) {
this.scheduleExpirationRenewal(threadId);
}
}
}) } } }) Copy code It's about Netty Of Future/Promise-Listener Model ,Redisson Almost all of them communicate in this way ( So Redisson Is based on Netty Communication mechanism ), To understand this logic, try to understand
stay Java Of Future in , The business logic is a Callable or Runnable Implementation class , Of the class call() or run() The completion of execution means the end of business logic , stay Promise In mechanism , You can manually set the success or failure of business logic in business logic , This makes it easier to monitor your business logic .
The surface meaning of this code is , After executing the asynchronous locking operation , If the locking is successful, it will be returned according to the completion of locking ttl Whether to expire to confirm whether to execute a scheduled task .
This regular task is watchDog At the heart of .
2. Lock renewal
see RedissonLock.this.scheduleExpirationRenewal(threadId)
private void scheduleExpirationRenewal(long threadId) {
RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();
RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
this.renewExpiration();
}
}
private void renewExpiration() {
RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
if (ee != null) {
Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
if (ent != null) {
Long threadId = ent.getFirstThreadId();
if (threadId != null) {
RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
} else {
if (res) {
RedissonLock.this.renewExpiration();
}
}
});
}
}
}
}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
}Split it up , This continuously nested and lengthy code actually does a few steps
• Add one netty Of Timeout Callback task , Every time (
internalLockLeaseTime / 3) Execute in milliseconds , The way to do it isrenewExpirationAsync•
renewExpirationAsyncReset the lock timeout , Register another listener , The listening callback is executed againrenewExpiration
renewExpirationAsync Of Lua as follows
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('pexpire', KEYS[1], ARGV[1]);
return 1;
end;
return 0;Reset the timeout .
Redisson What is the purpose of adding this logic ?
The purpose is to ensure that the business will not affect , For example, the task execution timed out but did not end , The lock has been released .
When a thread holds a lock , Because the timeout is not set leaseTime,Redisson Default configuration 30S, Turn on watchDog, Every time 10S Renew the lock once , maintain 30S Timeout for , Delete the lock until the task is completed .
This is it. Redisson Of Lock renewal , That is to say WatchDog The basic idea of realization .
3. Process summary
Through the overall introduction , The process is simple :
A、B Threads vie for a lock ,A Access to the ,B Blocking
B Thread blocking is not active CAS, It is PubSub Subscribe to the broadcast message of the lock
A The operation is completed and the lock is released ,B Thread receives subscription message notification
B Wake up and continue to grab the lock , Get the lock
The detailed locking and unlocking process is summarized in the figure below :

5、 ... and 、 Fair lock
The reentrant lock described above is an unfair lock ,Redisson It's also based on Redis Queues (List) and ZSet Achieve a fair lock
What is the definition of fairness ?
Fairness is to queue up to get the lock according to the request of the client , First come first served basis , That is to say FIFO, So queue and container sequencing is essential
FairSync
review JUC Of ReentrantLock The realization of fair lock
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}AQS The entire implementation has been provided , Whether it is fair depends on whether the implementation class takes out the node logic in order

AbstractQueuedSynchronizer It's the basic framework for building locks or other synchronization components , Through the built-in FIFO Queue to complete the queuing of resource acquisition thread , He does not implement the synchronization interface himself , Only a few methods of obtaining and releasing synchronization state are defined for user-defined synchronization components ( Upper figure ), Support exclusive and shared access , This is a design based on template method pattern , Give fairness / Injustice provides the soil .
We use it 2 Picture to explain briefly AQS Waiting process ( come from 《JAVA The art of concurrent programming 》)
One is the synchronization queue (FIFO The bidirectional queue ) management Failed to get synchronization status ( Lock snatch failed ) Thread reference 、 Flow chart of waiting state and predecessor successor nodes

One is The total process of obtaining synchronization status exclusively , The core acquire(int arg) Method call process

You can see the lock acquisition process
AQS Maintain a synchronization queue , Threads that fail to get the status will join the queue for spin , The condition to move out of the queue or stop spinning is that the precursor node successfully obtains the synchronization state for the head node .
And compare another unfair lock NonfairSync You can find , Key codes to control fairness and unfairness , lie in hasQueuedPredecessors Method .
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}NonfairSync Reduced hasQueuedPredecessors Judge the condition , The function of this method is
Check whether the current node in the synchronization queue has a precursor node , If a lock is requested earlier than the current thread, it returns true.
Make sure to get the first node of the queue every time ( Threads ) To get the lock , This is the fair rule
Why? JUC With default unfair lock ?
Because when a thread requests a lock , As long as you get the synchronization status, you will get it successfully . Under this premise , The newly released thread has a very high probability of getting the synchronization state again , So that other threads can only wait in the synchronization queue . But the benefits of this are , Unfair locking greatly reduces the switching overhead of system thread context .
It can be seen that the cost of fairness is performance and throughput .
Redis Not in it AQS, But there are List and zSet, have a look Redisson How to achieve fairness .
RedissonFairLock
RedissonFairLock The usage is still very simple
RLock fairLock = redissonClient.getFairLock(lockName);
fairLock.lock();
RedissonFairLock Inherited from RedissonLock, Similarly, find the lock implementation method all the way down tryLockInnerAsync.
Here you are 2 A lengthy Lua, however Debug Find out , The entrance of the fair lock is command == RedisCommands.EVAL_LONG after , This paragraph Lua longer , There are also many parameters , We focus on Lua The implementation rules of
Parameters
-- lua Some of the parameters in
KEYS = Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName)
KEYS[1]: lock_name, Lock name
KEYS[2]: "redisson_lock_queue:{xxx}" The thread queue
KEYS[3]: "redisson_lock_timeout:{xxx}" Threads id Corresponding timeout set
ARGV = internalLockLeaseTime, getLockName(threadId), currentTime + threadWaitTime, currentTime
ARGV[1]: "{leaseTime}" Expiration time
ARGV[2]: "{Redisson.UUID}:{threadId}"
ARGV[3] = current time + Thread wait time :(10:00:00) + 5000 millisecond = 10:00:05
ARGV[4] = current time (10:00:00) Deployment server time , Not redis-server Server time Fair lock implementation Lua Script
-- 1. Dead cycle cleanup expired key
while true do
-- Get the header node
local firstThreadId2 = redis.call('lindex', KEYS[2], 0);
-- The first time you get a must empty jump out of the loop
if firstThreadId2 == false then
break;
end;
-- Remove expired key
local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));
if timeout <= tonumber(ARGV[4]) then
redis.call('zrem', KEYS[3], firstThreadId2);
redis.call('lpop', KEYS[2]);
else
break;
end;
end;
-- 2. The lock does not exist && ( There is no thread waiting queue || There is a thread waiting queue and the first node is this thread ID), The main logic of locking part
if (redis.call('exists', KEYS[1]) == 0) and
((redis.call('exists', KEYS[2]) == 0) or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then
-- Pop up the thread in the queue id Elements , Delete Zset In this thread id Corresponding elements
redis.call('lpop', KEYS[2]);
redis.call('zrem', KEYS[3], ARGV[2]);
local keys = redis.call('zrange', KEYS[3], 0, -1);
-- Traverse zSet all key, take key Timeout for (score) - current time ms
for i = 1, #keys, 1 do
redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);
end;
-- Lock and set the lock expiration time
redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 3. Threads exist , Reentry judgment
if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then
redis.call('hincrby', KEYS[1], ARGV[2],1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 4. Returns the remaining lifetime of the current thread
local timeout = redis.call('zscore', KEYS[3], ARGV[2]);
if timeout ~= false then
-- Expiration time timeout The value of is set below , The subtraction here still calculates the current thread ttl
return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);
end;
-- 5. The remaining survival time of the tail node
local lastThreadId = redis.call('lindex', KEYS[2], -1);
local ttl;
-- The tail node is not empty && The tail node is not the current thread
if lastThreadId ~= false and lastThreadId ~= ARGV[2] then
-- Calculate the remaining survival time of the tail node
ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);
else
-- obtain lock_name The remaining time to live
ttl = redis.call('pttl', KEYS[1]);
end;
-- 6. Line up at the end
-- zSet Timeout time (score), Tail node ttl + current time + 5000ms + current time , You are new , There is update
-- Threads id Put it at the end of the queue , No, , If there is one, it will not be inserted
local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);
if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then
redis.call('rpush', KEYS[2], ARGV[2]);
end;
return ttl;1. Fair lock locking steps
Through the above Lua, You can find ,lua The key structure of the operation is the list (list) And ordered set (zSet).
among list Maintain a waiting thread queue redisson_lock_queue:{xxx},zSet Maintains an ordered set of thread timeouts redisson_lock_timeout:{xxx}, Even though lua longer , But it can be divided into 6 A step
1. Queue cleaning
Ensure that there are only unexpired waiting threads in the queue
2. First lock
hset Lock ,pexpire Expiration time
3. Reentry judgment
This is the same as re-entry lock lua
4. return ttl
5. Calculate the tail node ttl
The initial value is the remaining expiration time of the lock
6. Line up at the end
ttl + 2 * currentTime + waitTimeyes score The default value calculation formula
2. simulation
If the following sequence is simulated , It will be clear redisson Fair lock the whole locking process
hypothesis t1 10:00:00 < t2 10:00:10 < t3 10:00:20
t1: When a thread 1 Acquire lock for the first time
1. Waiting queue headless node , Out of the loop ->2
2. The lock does not exist && There is no thread waiting queue establish
2.1 lpop and zerm、zincrby Are invalid operations , Only locking takes effect , It means locking for the first time , Return after locking nil
Locking success , Threads 1 Get lock , end
t2: Threads 2 Attempt to acquire lock ( Threads 1 Not releasing the lock )
1. Waiting queue headless node , Out of the loop ->2
2. The lock does not exist Don't set up ->3
3. Non reentrant threads ->4
4.score No value ->5
5. The tail node is empty , Set up ttl The initial value is lock_name Of ttl -> 6
6. according to ttl + waitTime + currentTime + currentTime To set up zSet Timeout time score, And join the waiting queue , Threads 2 Is the head node
score = 20S + 5000ms + 10:00:10 + 10:00:10 = 10:00:35 + 10:00:10
t3: Threads 3 Attempt to acquire lock ( Threads 1 Not releasing the lock )
1. The waiting queue has a header node
1.1 Not expired ->2
2. If there is no such lock, it will not work ->3
3. Non reentrant threads ->4
4.score No value ->5
5. Tail node is not empty && The tail node thread is 2, Not the current thread
5.1 Take out the previously set score, Subtract the current time :ttl = score - currentTime ->6
6. according to ttl + waitTime + currentTime + currentTime To set up zSet Timeout time score, And join the waiting queue
score = 10S + 5000ms + 10:00:20 + 10:00:20 = 10:00:35 + 10:00:20
In this way , Three threads that need to grab a lock , Completed a queue , stay list Arrange their waiting threads id, stay zSet Expiration time of storage in ( Easy to prioritize ). And back to ttl The thread of 2 client 、 Threads 3 The client will spin and repeat this segment at regular intervals Lua, Try to lock , In this way, and AQS There are similarities .
And when threads 1 After releasing the lock ( There is still a pass here Pub/Sub Release the unlocking message , Notify other threads to get )
10:00:30 Threads 2 Attempt to acquire lock ( Threads 1 Lock released )
1. The waiting queue has a header node , Not expired ->2
2. The lock does not exist & The waiting queue header node is the current thread establish
2.1 Delete the queue information of the current thread and zSet Information , The timeout is :
Threads 2 10:00:35 + 10:00:10 - 10:00:30 = 10:00:15
Threads 3 10:00:35 + 10:00:20 - 10:00:30 = 10:00:25
2.2 Threads 2 Get lock , Reset expiration time
Locking success , Threads 2 Get lock , end
The queuing structure is shown in the figure

The release script of a fair lock is similar to that of a reentry lock , One more step. The cleaning at the beginning of locking is expired key Of while true Logic , I won't expand my description here .
As can be seen from the above ,Redisson The play of fair lock is similar to that of delay queue , The core is all in Redis Of List and zSet Collocation of structure , But it also draws lessons from AQS Realization , It is the same on the timing judgment header node (watchDog), It ensures the fair competition and mutual exclusion of locks . In the concurrent scenario ,lua Script ,zSet Of score The problem of sequential insertion is well solved , Prioritize .
And to prevent threads exiting due to exceptions from being cleaned up , Each request will judge the expiration of the header node and clean it up , Pass when finally released CHANNEL Notify the subscribing thread that it can acquire the lock , Repeat the first step , Smooth handover to the next sequential thread .
6、 ... and 、 summary
Redisson The implementation of the overall distributed plus unlock process is slightly complicated , author Rui Gu Yes Netty and JUC、Redis Study deeply , It takes advantage of many advanced features and semantics , It's worth learning , This introduction is just a stand-alone Redis Lower lock implementation .
Redisson It also provides interlocking in case of multiple machines MultiLock:
https://github.com/redisson/redisson/wiki/8.- Distributed locks and synchronizers #81- Reentrant lock reentrant-lock
And the official recommended red lock RedLock:
https://github.com/redisson/redisson/wiki/8.- Distributed locks and synchronizers #84- Red lock redlock
therefore , When you really need distributed locks , You might as well come first Redisson Look inside .
source :juejin.cn/post/6961380552519712798
recommend :
The most comprehensive java Interview question bank
PS: Because the official account platform changed the push rules. , If you don't want to miss the content , Remember to click after reading “ Looking at ”, Add one “ Star standard ”, In this way, each new article push will appear in your subscription list for the first time . spot “ Looking at ” Support us !边栏推荐
- Realize the mutual value transfer between main window and sub window in WPF
- 我秃了!唯一索引、普通索引我该选谁?
- C language: random number + quick sort
- 半波整流点亮LED
- You have to apologize if you get involved in the funny shop?
- 倒计时 2 天!2022 中国算力大会:移动云邀您共见算力网络,创新发展
- 图的遍历(BFS&&DFS基础)
- Jar package
- 数据库系统原理与应用教程(059)—— MySQL 练习题:操作题 1-10(三)
- Leetcode notes 118. Yang Hui triangle
猜你喜欢
JWT login authentication + token automatic renewal scheme, well written!

拒绝服务 DDoS 攻击

Org.apache.ibatis.exceptions.toomanyresultsexception

【安全】 阅读 RFC6749 及理解 Oauth2.0 下的授权码模式

面经整理,助力秋招,祝你称为offer收割机

Continuous (integration -- & gt; delivery -- & gt; deployment)

【ECMAScript6】Promise

Denial of service DDoS Attacks

I miss the year of "losing" Li Ziqi

从手机厂高位“出走”的三个男人
随机推荐
使用 IPtables 进行 DDoS 保护
LyScript 获取上一条与下一条指令
接口调不通,如何去排查?没想到10年测试老鸟栽在这道面试题上
word打字时后面的字会消失是什么原因?如何解决?
Realize the mutual value transfer between main window and sub window in WPF
面经整理,助力秋招,祝你称为offer收割机
Facial expression recognition based on pytorch convolution - graduation project "suggestions collection"
严格模式——let和const——箭头函数——解构赋值——字符串模板symbol——Set和Map——生成器函数
在 Kubernetes 中部署应用交付服务(第 1 部分)
R语言因子数据的表格和列联表(交叉表)生成:使用summay函数分析列表查看卡方检验结果判断两个因子变量是否独立(使用卡方检验验证独立性)
《如何打一场数据挖掘赛事》入门版
R语言使用dpois函数生成泊松分布密度数据、使用plot函数可视化泊松分布密度数据(Poisson distribution)
Jenkins -- continuous integration server
产品经理:岗位职责表
C language: merge sort
Rust from introduction to mastery 01 introduction
Leetcdoe-342. Power of 4
Holes in [apue] files
我秃了!唯一索引、普通索引我该选谁?
合并表格行---三层for循环遍历数据