当前位置:网站首页>Distributed lock implementation
Distributed lock implementation
2022-06-12 23:49:00 【Rookie +1024】
One 、 What is distributed lock
In a process , When multiple threads compete for resources , Can pass synchronized perhaps Lock Lock for synchronous execution , Ensure multi threading , The invocation of resources is safe , So how to ensure that calls to the same resources are safe in multiple processes or multi node machines , This leads to the distributed lock solution . Distributed locks are used to ensure the consistency of shared resources in distributed systems .
Two 、 Distributed lock implementation
The following points need to be considered in the process of implementing distributed locks :
- The principle of locking and releasing locks
- How to ensure that only one node gets the lock at a time
- The reentry of locks
- How to prevent deadlock problems
- How to deal with nodes that do not acquire locks
Need to implement distributed locks , You have to rely on third-party software , Like databases 、Redis、ZooKeeper etc. , This paper starts with these three kinds of software , Let's take a look at the implementation process .
1、 be based on Mysql Distributed locks for
In the database, we can lock and unlock through the uniqueness of the primary key , Primary key uniqueness means that only one node in the current node can be created successfully , The rest is to get the creation exception , The successful creation of the node indicates that the lock has been obtained , The remaining nodes can only wait to acquire the lock . At this point, the first and second steps are guaranteed , So how to achieve lock reentrancy ? At this point, we can add a column to store the current thread information of the current node ( You can use the application name of the node +ip+ The thread of ), The number of reentries plus a counter , Because the database does not have an expiration time , Therefore, you need to start a scheduled task to determine whether the current lock has expired . So we need an update time .
The table is as follows :
DROP TABLE IF EXISTS `locks`;
CREATE TABLE `locks` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`lock_key` varchar(255) NOT NULL COMMENT ' Resources that need to be locked ',
`repeat_key` varchar(255) NOT NULL COMMENT ' Reentrant id ',
`repeat_time` int(11) DEFAULT NULL COMMENT ' Number of re-entry ',
`update_time` datetime DEFAULT NULL COMMENT ' Update time ',
PRIMARY KEY (`id`),
UNIQUE KEY `lock_key` (`lock_key`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
The code implementation is as follows ,
import javax.sql.DataSource;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
/** * Created by Administrator on 2022/1/13. */
public class MyLockFromMysql implements MyLock{
private DataSource dataSource;
public MyLockFromMysql(DataSource dataSource) {
this.dataSource = dataSource;
}
@Override
public void lock(String key) {
if (!tryLock(key)){
throw new RuntimeException();
}
}
@Override
public void unlock(String key) {
String repeatKey= getRepeatKey();
if (hasRepeatLock(key,repeatKey)){
updateLock(key,repeatKey,-1);
}else if (!deleteLock(key,repeatKey)){
throw new RuntimeException();
}
}
@Override
public boolean tryLock(String key) {
String repeatKey= getRepeatKey();
if (hasLock(key,repeatKey)){
return updateLock(key,repeatKey,1);
}
for (;;){
if (addLock(key,repeatKey)){
break;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
return true;
}
private String getRepeatKey() {
String host= "";
try {
host = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
e.printStackTrace();
}
return host+Thread.currentThread().getName();// Adopt node ip+ The thread name is used to determine the reentry
}
/** * according to key and repeatKey Judge whether the lock has been obtained * @param key Locked resources * @param repeatKey Reentrant id * @return */
private boolean hasLock(String key,String repeatKey){
Connection connection=null;
PreparedStatement statement=null;
ResultSet rs=null;
try {
connection=dataSource.getConnection();
statement=connection.prepareStatement("SELECT repeat_time FROM locks WHERE lock_key=? AND repeat_key=?");
statement.setString(1,key);
statement.setString(2,repeatKey);
rs=statement.executeQuery();
return rs.next();
} catch (Exception e) {
return false;
}finally {
close(rs);
close(statement);
close(connection);
}
}
/** * Judge whether there is a reentry lock at present * @param key * @param repeatKey * @return */
private boolean hasRepeatLock(String key,String repeatKey){
Connection connection=null;
PreparedStatement statement=null;
ResultSet rs=null;
try {
connection=dataSource.getConnection();
statement=connection.prepareStatement("SELECT repeat_time FROM locks WHERE lock_key=? AND repeat_key=? AND repeat_time>1 ");
statement.setString(1,key);
statement.setString(2,repeatKey);
rs=statement.executeQuery();
return rs.next();
} catch (Exception e) {
return false;
}finally {
close(rs);
close(statement);
close(connection);
}
}
/** * If the current thread does not obtain a lock, it directly adds a piece of data to compete for the lock * @param key Locked resources * @param repeatKey Reentrant id * @return */
private boolean addLock(String key,String repeatKey){
Connection connection=null;
PreparedStatement statement=null;
try {
connection=dataSource.getConnection();
statement=connection.prepareStatement("INSERT INTO locks (lock_key,repeat_key,repeat_time,update_time) VALUES (?,?,1,now())");
statement.setString(1,key);
statement.setString(2,repeatKey);
return statement.executeUpdate()>0;
} catch (Exception e) {
return false;
}finally {
close(statement);
close(connection);
}
}
private boolean updateLock(String key,String repeatKey,int upDown){
Connection connection=null;
PreparedStatement statement=null;
try {
connection=dataSource.getConnection();
statement=connection.prepareStatement("UPDATE locks set repeat_time=repeat_time+?,update_time=now() WHERE lock_key=? AND repeat_key=?");
statement.setInt(1,upDown);
statement.setString(2,key);
statement.setString(3,repeatKey);
return statement.executeUpdate()>0;
} catch (Exception e) {
return false;
}finally {
close(statement);
close(connection);
}
}
private boolean deleteLock(String key,String repeatKey){
Connection connection=null;
PreparedStatement statement=null;
try {
connection=dataSource.getConnection();
statement=connection.prepareStatement("DELETE FROM locks WHERE lock_key=?");
statement.setString(1,key);
return statement.executeUpdate()>0;
} catch (Exception e) {
e.printStackTrace();
return false;
}finally {
close(statement);
close(connection);
}
}
private void close(AutoCloseable close){
if (close!=null){
try {
close.close();
} catch (Exception e) {
}
}
}
}
The above code is just a simple implementation of mysql To do the process of distributed locking ( The performance is not very friendly , There will also be many problems ), The logic is to set the resource to be locked as the primary key , Add data to the database when locking , At this point, only the nodes that are successfully added will obtain the lock , Delete the data when the call is complete , This means that the lock is released . For reentry locks , A reentrant key And re-entry times , If the current node has obtained a lock , When you need to get the lock again , Direct to the number of times +1 operation , Do it when releasing the lock -1 operation . When it comes to 1 When the node is released, the node needs to be deleted .
Its shortcomings can also be imagined :
- Obtain the lock node , The lock cannot be released correctly , Then the lock record will always exist in the database , Other nodes cannot acquire locks , At this point, human intervention is needed
- High concurrency , It will bring pressure to the system and database system
- No wakeup operation , Other threads can only loop to acquire the lock
2、 be based on Redis Implement distributed locks
Redis There is one of them. setnx command , This command key There is no addition successfully put back 1, There is returned 0, So using redis The implementation of distributed locks is based on this command , That is to say, it can only be put back after the creation is successful 1 The node of is the node that obtains the lock , When you release the lock , Delete this node ,redis The expiration time policy can be used to ensure , After the client fails to release the lock , It can also release the lock within the specified time , The reentrancy of a lock lies in value Design process ,value We can save the unique ID and reentrant times of the current node .
The simple code is as follows :
import com.alibaba.fastjson.JSONObject;
import redis.clients.jedis.Jedis;
import java.net.InetAddress;
import java.net.UnknownHostException;
/** * Created by Administrator on 2022/1/14. */
public class MyLockForRedis implements MyLock {
private final long TIME_WAIT=50L;
@Override
public void lock(String key) {
tryLock(key);
}
@Override
public void unlock(String key) {
Jedis redis=new Jedis("127.0.0.1",6379);
String json=redis.get(key);
LockValue lockValue= JSONObject.parseObject(json,LockValue.class);
if (getRepeatKeyV().equals(lockValue.getRepeatKey())){
if (lockValue.getTime()>1){
lockValue.setTime(lockValue.getTime()-1);
redis.set(key,JSONObject.toJSONString(lockValue));
}else {
redis.del(key);
}
}
redis.close();
}
@Override
public boolean tryLock(String key) {
Jedis redis=new Jedis("127.0.0.1",6379);
try {
for (;;){
if ("OK".equals(redis.set(key,JSONObject.toJSONString(new LockValue()),"NX","EX",200000))){
return true;
}else {
String json=redis.get(key);
LockValue lockValue= JSONObject.parseObject(json,LockValue.class);
if (lockValue!=null&&getRepeatKeyV().equals(lockValue.getRepeatKey())){
lockValue.setTime(lockValue.getTime()+1);
redis.set(key,JSONObject.toJSONString(lockValue));
return true;
}
}
try {
Thread.sleep(TIME_WAIT);
} catch (InterruptedException e) {
}
}
}finally {
redis.close();
}
}
private static class LockValue{
private int time=1;
private String repeatKey=getRepeatKeyV();
public int getTime() {
return time;
}
public void setTime(int time) {
this.time = time;
}
public String getRepeatKey() {
return repeatKey;
}
public void setRepeatKey(String repeatKey) {
this.repeatKey = repeatKey;
}
@Override
public String toString() {
return "{time=" + time +", repeatKey=\"" + repeatKey+ "\"}";
}
}
private static String getRepeatKeyV() {
String host= "";
try {
host = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
e.printStackTrace();
}
return host+Thread.currentThread().getName();// Adopt node ip+ The thread name is used to determine the reentry
}
}
The above is just a simple implementation based on Redis Distributed lock function ( There must be a lot of problems ), among value What is kept is json Format data to achieve lock reentrancy , There will be a performance overhead ( Constant analysis json Format ), There is a problem with setting the expiration time , If we set the 10, If a node acquires a lock , The execution is very slow, more than ten seconds , At this point, other nodes can obtain the lock , One solution is to start a thread in the node that obtains the lock to update the expiration time of the lock , When the node finishes executing, close the thread , If the node that acquires the lock goes down , It can also be solved by expiration time .
3、 be based on Zookeeper Implement distributed locks
ZooKeeper There are two ways to implement distributed locking in :
- Create temporary nodes , Multiple clients create the same node at the same time , Only one can be created successfully , The rest will fail to create , Then the successfully created node will obtain the lock , If it fails, continue to wait for the lock to be released
- Create a temporary ordered node , Each client creates a temporary ordered node , Then get all the nodes , Judge whether you are the smallest node , If so, get the lock , Otherwise, listen to the release of the previous node lock .
Distributed locks implemented in this way , You can see Curator In the client connection InterProcessMutex Realization , Its implementation logic is to create temporary sequential child nodes , The specific implementation process is as follows :
- First, in the InterProcessMutex Medium maintenance ConcurrentMap aggregate , The key value of the collection is the current thread , The value is LockData( It is made up of the current thread , Lock path 、 And reentry times )
- When you get the lock , First, get the current... From the collection LockData, If it exists, it indicates that the current thread has acquired the lock , Just add the number of reentries 1 operation , Otherwise try to get the lock
- When trying to acquire a lock , Create your own temporary sequence node first , Then, first obtain all the sequential child nodes in the current lock ( After sorting ), Then judge whether the current node is the smallest node , If so, obtain the lock , return , Otherwise, find the previous node , Then register one on the previous node wathcer, And then call wait Method to block ( If the set waiting time is negative , Indicates that the current node has not obtained a lock , Quit immediately , Then the current node will be deleted )
- When the current node releases the lock , The current node will receive watcher notice , And then call notifyAll() Method to wake up , At this point, the current node competes for the lock again
- After the lock is currently acquired , The current node and LockData Save in collection , Used to deal with reentrancy
- Release the lock , Find the path to be released according to the current node thread ( There is no throw exception ), Then first judge the current reentrancy , If it is greater than 0, Indicates that the current is a re-entry lock , Otherwise, if it equals 0, Release the lock , Less than 0 , Throw an exception
- The process of releasing a lock is , Delete the current node , Remove the current thread from the collection , Then remove watcher
There is anything wrong with the above , Please leave a message to correct , thank you !
边栏推荐
- 设计消息队列存储信息数据的MySQL表结构
- Operation of simulation test platform for G3 boiler water treatment test questions in 2022
- NCF 的Dapr应用实例的运行
- Teach you how to grab ZigBee packets through cc2531 and parse encrypted ZigBee packets
- SAP QM qp03 displays an inspection plan with multiple specs inspection features
- Common message oriented middleware selection
- 最全预告!华为云精彩议程火速收藏
- [opencv learning] small ticket recognition based on perspective transformation and OCR recognition
- 【Matlab】三维曲线与三维曲面
- array
猜你喜欢
How to get Matplotlib figure size
leaflet如何优雅的展示重叠点位的气泡窗口
CV—BaseLine总结(从AlexNet到SENet的发展历程)
[redis sentinel] failed listening on port 26379 (TCP) & sentinel mode no response problem solved
dict和set的基本操作
基于Three.js海上风电数字孪生三维效果
M_ 8: Design a MySQL table for message queue to store message data
Operation of simulation test platform for G3 boiler water treatment test questions in 2022
2022年R2移动式压力容器充装考试题及在线模拟考试
2022 electrician (elementary) operation certificate examination question bank and online simulation examination
随机推荐
Summary of individual NLP internship experience
OSM地图本地发布-如何生成各省市矢量地图
Preparing for the Blue Bridge Cup Day11__ Basic operation of serial port communication
KConfig
测试平台系列(97) 完善执行case部分
2022 R2 mobile pressure vessel filling test questions and online simulation test
Xi'an Jiaotong 22nd autumn e-commerce technology online expansion resources (IV) [standard answer]
Talent Weekly - 5
How does idea switch the interface to Chinese
scala中的隐式转换和隐式参数讲解与实践
How about opening a securities account in flush? Is it safe or not
Lower interest rates lead to higher bond prices
【Matlab】符号计算
Introduction to business rules service on SAP Business Technology Platform (BTP)
Don't write about the full screen explosion, try the decorator mode, this is the elegant way!!
Mgr and greatsql resource summary
How SAP ui5 uses manifest JSON file defines third-party library dependencies
Printf segment error (core dump): a problem caused by formatted output
Redis实现短信验证码登录
利率降低导致债券价格上涨