当前位置:网站首页>Distributed lock implementation

Distributed lock implementation

2022-06-12 23:49:00 Rookie +1024

One 、 What is distributed lock

In a process , When multiple threads compete for resources , Can pass synchronized perhaps Lock Lock for synchronous execution , Ensure multi threading , The invocation of resources is safe , So how to ensure that calls to the same resources are safe in multiple processes or multi node machines , This leads to the distributed lock solution . Distributed locks are used to ensure the consistency of shared resources in distributed systems .

Two 、 Distributed lock implementation

The following points need to be considered in the process of implementing distributed locks :

  1. The principle of locking and releasing locks
  2. How to ensure that only one node gets the lock at a time
  3. The reentry of locks
  4. How to prevent deadlock problems
  5. How to deal with nodes that do not acquire locks

Need to implement distributed locks , You have to rely on third-party software , Like databases 、Redis、ZooKeeper etc. , This paper starts with these three kinds of software , Let's take a look at the implementation process .

1、 be based on Mysql Distributed locks for

In the database, we can lock and unlock through the uniqueness of the primary key , Primary key uniqueness means that only one node in the current node can be created successfully , The rest is to get the creation exception , The successful creation of the node indicates that the lock has been obtained , The remaining nodes can only wait to acquire the lock . At this point, the first and second steps are guaranteed , So how to achieve lock reentrancy ? At this point, we can add a column to store the current thread information of the current node ( You can use the application name of the node +ip+ The thread of ), The number of reentries plus a counter , Because the database does not have an expiration time , Therefore, you need to start a scheduled task to determine whether the current lock has expired . So we need an update time .
The table is as follows :

DROP TABLE IF EXISTS `locks`;
CREATE TABLE `locks` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `lock_key` varchar(255) NOT NULL COMMENT ' Resources that need to be locked ',
  `repeat_key` varchar(255) NOT NULL COMMENT ' Reentrant id ',
  `repeat_time` int(11) DEFAULT NULL COMMENT ' Number of re-entry ',
  `update_time` datetime DEFAULT NULL COMMENT ' Update time ',
  PRIMARY KEY (`id`),
  UNIQUE KEY `lock_key` (`lock_key`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

The code implementation is as follows ,

import javax.sql.DataSource;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;

/** * Created by Administrator on 2022/1/13. */
public class MyLockFromMysql implements MyLock{
    

  private DataSource dataSource;


  public MyLockFromMysql(DataSource dataSource) {
    
    this.dataSource = dataSource;
  }

  @Override
  public void lock(String key) {
    
    if (!tryLock(key)){
    
      throw new RuntimeException();
    }
  }

  @Override
  public void unlock(String key) {
    
    String repeatKey= getRepeatKey();
     if (hasRepeatLock(key,repeatKey)){
    
         updateLock(key,repeatKey,-1);
     }else if (!deleteLock(key,repeatKey)){
    
       throw new RuntimeException();
     }
  }

  @Override
  public boolean tryLock(String key) {
    
    String repeatKey= getRepeatKey();
    if (hasLock(key,repeatKey)){
    
      return updateLock(key,repeatKey,1);
    }
    for (;;){
    
      if (addLock(key,repeatKey)){
    
         break;
      }
      try {
    
        Thread.sleep(100);
      } catch (InterruptedException e) {
    
      }
    }
    return true;
  }






  private String getRepeatKey()  {
    
    String host= "";
    try {
    
      host = InetAddress.getLocalHost().getHostAddress();
    } catch (UnknownHostException e) {
    
      e.printStackTrace();
    }
    return host+Thread.currentThread().getName();// Adopt node ip+ The thread name is used to determine the reentry 
  }


  /** *  according to key and repeatKey Judge whether the lock has been obtained  * @param key  Locked resources  * @param repeatKey  Reentrant id  * @return */
  private boolean hasLock(String key,String repeatKey){
    
    Connection connection=null;
    PreparedStatement statement=null;
    ResultSet rs=null;
    try {
    
       connection=dataSource.getConnection();
       statement=connection.prepareStatement("SELECT repeat_time FROM locks WHERE lock_key=? AND repeat_key=?");
       statement.setString(1,key);
       statement.setString(2,repeatKey);

       rs=statement.executeQuery();
      return rs.next();

    } catch (Exception e) {
    
      return false;
    }finally {
    
       close(rs);
       close(statement);
       close(connection);
    }
  }

  /** *  Judge whether there is a reentry lock at present  * @param key * @param repeatKey * @return */
  private boolean hasRepeatLock(String key,String repeatKey){
    
    Connection connection=null;
    PreparedStatement statement=null;
    ResultSet rs=null;
    try {
    
      connection=dataSource.getConnection();
      statement=connection.prepareStatement("SELECT repeat_time FROM locks WHERE lock_key=? AND repeat_key=? AND repeat_time>1 ");
      statement.setString(1,key);
      statement.setString(2,repeatKey);

      rs=statement.executeQuery();
      return rs.next();

    } catch (Exception e) {
    
      return false;
    }finally {
    
      close(rs);
      close(statement);
      close(connection);
    }
  }
  /** *  If the current thread does not obtain a lock, it directly adds a piece of data to compete for the lock  * @param key  Locked resources  * @param repeatKey  Reentrant id  * @return */
  private boolean addLock(String key,String repeatKey){
    
    Connection connection=null;
    PreparedStatement statement=null;
  try {
    
     connection=dataSource.getConnection();
     statement=connection.prepareStatement("INSERT INTO locks (lock_key,repeat_key,repeat_time,update_time) VALUES (?,?,1,now())");
    statement.setString(1,key);
    statement.setString(2,repeatKey);
    return statement.executeUpdate()>0;
  } catch (Exception e) {
    
    return false;
  }finally {
    
    close(statement);
    close(connection);
  }
}
  private boolean updateLock(String key,String repeatKey,int upDown){
    
    Connection connection=null;
    PreparedStatement statement=null;
    try {
    
      connection=dataSource.getConnection();
      statement=connection.prepareStatement("UPDATE locks set repeat_time=repeat_time+?,update_time=now() WHERE lock_key=? AND repeat_key=?");
      statement.setInt(1,upDown);
      statement.setString(2,key);
      statement.setString(3,repeatKey);
      return statement.executeUpdate()>0;
    } catch (Exception e) {
    
      return false;
    }finally {
    
      close(statement);
      close(connection);
    }
  }
  private boolean deleteLock(String key,String repeatKey){
    
    Connection connection=null;
    PreparedStatement statement=null;
    try {
    
      connection=dataSource.getConnection();
      statement=connection.prepareStatement("DELETE FROM locks WHERE lock_key=?");
      statement.setString(1,key);
      return statement.executeUpdate()>0;
    } catch (Exception e) {
    
      e.printStackTrace();
      return false;
    }finally {
    
      close(statement);
      close(connection);
    }
  }
private void  close(AutoCloseable close){
    
    if (close!=null){
    
      try {
    
        close.close();
      } catch (Exception e) {
    
      }
    }
}
}

The above code is just a simple implementation of mysql To do the process of distributed locking ( The performance is not very friendly , There will also be many problems ), The logic is to set the resource to be locked as the primary key , Add data to the database when locking , At this point, only the nodes that are successfully added will obtain the lock , Delete the data when the call is complete , This means that the lock is released . For reentry locks , A reentrant key And re-entry times , If the current node has obtained a lock , When you need to get the lock again , Direct to the number of times +1 operation , Do it when releasing the lock -1 operation . When it comes to 1 When the node is released, the node needs to be deleted .

Its shortcomings can also be imagined :

  1. Obtain the lock node , The lock cannot be released correctly , Then the lock record will always exist in the database , Other nodes cannot acquire locks , At this point, human intervention is needed
  2. High concurrency , It will bring pressure to the system and database system
  3. No wakeup operation , Other threads can only loop to acquire the lock

2、 be based on Redis Implement distributed locks

Redis There is one of them. setnx command , This command key There is no addition successfully put back 1, There is returned 0, So using redis The implementation of distributed locks is based on this command , That is to say, it can only be put back after the creation is successful 1 The node of is the node that obtains the lock , When you release the lock , Delete this node ,redis The expiration time policy can be used to ensure , After the client fails to release the lock , It can also release the lock within the specified time , The reentrancy of a lock lies in value Design process ,value We can save the unique ID and reentrant times of the current node .
The simple code is as follows :

import com.alibaba.fastjson.JSONObject;
import redis.clients.jedis.Jedis;

import java.net.InetAddress;
import java.net.UnknownHostException;

/** * Created by Administrator on 2022/1/14. */
public class MyLockForRedis implements MyLock {
    



  private final long TIME_WAIT=50L;
  @Override
  public void lock(String key) {
    
     tryLock(key);
  }

  @Override
  public void unlock(String key) {
    
    Jedis redis=new Jedis("127.0.0.1",6379);
      String  json=redis.get(key);
      LockValue lockValue= JSONObject.parseObject(json,LockValue.class);
      if (getRepeatKeyV().equals(lockValue.getRepeatKey())){
    
        if (lockValue.getTime()>1){
    
          lockValue.setTime(lockValue.getTime()-1);
          redis.set(key,JSONObject.toJSONString(lockValue));
        }else {
    
          redis.del(key);
        }
      }
      redis.close();
  }

  @Override
  public boolean tryLock(String key) {
    
    Jedis redis=new Jedis("127.0.0.1",6379);
    try {
    
      for (;;){
    

        if ("OK".equals(redis.set(key,JSONObject.toJSONString(new LockValue()),"NX","EX",200000))){
    
          return true;
        }else {
    
          String  json=redis.get(key);
          LockValue lockValue= JSONObject.parseObject(json,LockValue.class);
          if (lockValue!=null&&getRepeatKeyV().equals(lockValue.getRepeatKey())){
    
            lockValue.setTime(lockValue.getTime()+1);
            redis.set(key,JSONObject.toJSONString(lockValue));
            return  true;
          }
        }
        try {
    
          Thread.sleep(TIME_WAIT);
        } catch (InterruptedException e) {
    

        }
      }
    }finally {
    
      redis.close();
    }


  }
  private static class LockValue{
    
    private int time=1;
    private String repeatKey=getRepeatKeyV();

    public int getTime() {
    
      return time;
    }
    public void setTime(int time) {
    
      this.time = time;
    }
    public String getRepeatKey() {
    
      return repeatKey;
    }
    public void setRepeatKey(String repeatKey) {
    
      this.repeatKey = repeatKey;
    }
    @Override
    public String toString() {
    
      return "{time=" + time +", repeatKey=\"" + repeatKey+ "\"}";
    }
  }

  private static String getRepeatKeyV()  {
    
    String host= "";
    try {
    
      host = InetAddress.getLocalHost().getHostAddress();
    } catch (UnknownHostException e) {
    
      e.printStackTrace();
    }
    return host+Thread.currentThread().getName();// Adopt node ip+ The thread name is used to determine the reentry 
  }

}

The above is just a simple implementation based on Redis Distributed lock function ( There must be a lot of problems ), among value What is kept is json Format data to achieve lock reentrancy , There will be a performance overhead ( Constant analysis json Format ), There is a problem with setting the expiration time , If we set the 10, If a node acquires a lock , The execution is very slow, more than ten seconds , At this point, other nodes can obtain the lock , One solution is to start a thread in the node that obtains the lock to update the expiration time of the lock , When the node finishes executing, close the thread , If the node that acquires the lock goes down , It can also be solved by expiration time .

3、 be based on Zookeeper Implement distributed locks

ZooKeeper There are two ways to implement distributed locking in :

  1. Create temporary nodes , Multiple clients create the same node at the same time , Only one can be created successfully , The rest will fail to create , Then the successfully created node will obtain the lock , If it fails, continue to wait for the lock to be released
  2. Create a temporary ordered node , Each client creates a temporary ordered node , Then get all the nodes , Judge whether you are the smallest node , If so, get the lock , Otherwise, listen to the release of the previous node lock .

Distributed locks implemented in this way , You can see Curator In the client connection InterProcessMutex Realization , Its implementation logic is to create temporary sequential child nodes , The specific implementation process is as follows :

  1. First, in the InterProcessMutex Medium maintenance ConcurrentMap aggregate , The key value of the collection is the current thread , The value is LockData( It is made up of the current thread , Lock path 、 And reentry times )
  2. When you get the lock , First, get the current... From the collection LockData, If it exists, it indicates that the current thread has acquired the lock , Just add the number of reentries 1 operation , Otherwise try to get the lock
  3. When trying to acquire a lock , Create your own temporary sequence node first , Then, first obtain all the sequential child nodes in the current lock ( After sorting ), Then judge whether the current node is the smallest node , If so, obtain the lock , return , Otherwise, find the previous node , Then register one on the previous node wathcer, And then call wait Method to block ( If the set waiting time is negative , Indicates that the current node has not obtained a lock , Quit immediately , Then the current node will be deleted )
  4. When the current node releases the lock , The current node will receive watcher notice , And then call notifyAll() Method to wake up , At this point, the current node competes for the lock again
  5. After the lock is currently acquired , The current node and LockData Save in collection , Used to deal with reentrancy
  6. Release the lock , Find the path to be released according to the current node thread ( There is no throw exception ), Then first judge the current reentrancy , If it is greater than 0, Indicates that the current is a re-entry lock , Otherwise, if it equals 0, Release the lock , Less than 0 , Throw an exception
  7. The process of releasing a lock is , Delete the current node , Remove the current thread from the collection , Then remove watcher

There is anything wrong with the above , Please leave a message to correct , thank you !

原网站

版权声明
本文为[Rookie +1024]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/02/202202280604567029.html