当前位置:网站首页>關於分布式鎖的續命問題——基於Redis實現的分布式鎖

關於分布式鎖的續命問題——基於Redis實現的分布式鎖

2022-06-11 13:03:00 熟透的蝸牛

目錄

一、背景

二、自定義實現

三、Redisson框架實現


一、背景

關於分布式鎖就不多說了,現在出現了一種場景,如果在分布式鎖中,業務代碼沒有執行完,然後鎖的鍵值過期了,那麼其他的JVM就可能會獲取到鎖而發生幂等的問題。那麼這種情况怎麼解决呢?

1、如果線程1獲取到了鎖,那麼在業務代碼還沒執行完時,redis鍵值過期了,那麼就會發生幂等問題。解决的思路是,當線程1獲取到鎖之後,開啟一個線程去監聽線程1是否執行完成,如果沒有執行完成,就去延長鍵值的過期時間。

2、如果因為某些原因(比如代碼bug,網絡不穩定),鎖一直不能釋放,然後就要一直進行延長過期時間麼?答案是否定的,我們可以設置延長時間的次數,如果超過設定的次數還是失敗,就自動釋放鎖,然後回滾業務。

3、釋放鎖時需要考慮的問題,誰加的鎖,就讓誰來釋放鎖。

二、自定義實現

鎖信息實體類

package com.xiaojie.lock;


import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author xiaojie
 * @version 1.0
 * @description: 記錄鎖信息
 * @date 2022/6/9 17:44
 */
public class RedisLockInfo {
    /**
     * 鎖的狀態 state為true 則錶示獲取鎖成功
     */
    private boolean state;
    /**
     * 鎖的id
     */
    private String lockId;
    /**
     * 鎖的持有線程
     */
    private Thread lockThread;

    /**
     * 鎖的過期時間
     */
    private Long expire;

    /**
     * 續命次數
     */
    private AtomicInteger lifeCount;

    /**
     * 獲取鎖次數
     */
    private AtomicInteger lockCount;

    // 鎖的可重入次數
    public RedisLockInfo(String lockId, Thread lockThread, Long expire) {
        state = true;
        this.lockId = lockId;
        this.lockThread = lockThread;
        this.expire = expire;
        lifeCount = new AtomicInteger(0);
        lockCount = new AtomicInteger(0);
    }

    public RedisLockInfo(Thread lockThread, Long expire) {
        this.lockThread = lockThread;
        this.expire = expire;
        lifeCount = new AtomicInteger(0);
        lockCount = new AtomicInteger(0);
        state = true;
    }

    public RedisLockInfo() {
        state = true;
    }

    public String getLockId() {
        return lockId;
    }

    public boolean isState() {
        return state;
    }

    public Thread getLockThread() {
        return lockThread;
    }

    public Long getExpire() {
        return expire;
    }

    //重試的次數
    public Integer getLifeCount() {
        return lifeCount.incrementAndGet();
    }

    //鎖獲取的次數
    public Integer incrLockCount() {
        return lockCount.incrementAndGet();
    }

    //釋放鎖次數
    public Integer decreLockCount() {
        return lockCount.decrementAndGet();
    }
}

獲取鎖的方法和續命方法

package com.xiaojie.lock.impl;

import com.xiaojie.lock.RedisLock;
import com.xiaojie.lock.RedisLockInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * @author xiaojie
 * @version 1.0
 * @description: 實現分布式鎖的實現類
 * @date 2022/6/9 18:05
 */
@Component
@Slf4j
public class RedisLockImpl implements RedisLock {
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    private String redisLockKey = "redisLockKey";
    /**
     * 緩存redis鎖
     */
    private static Map<Thread, RedisLockInfo> lockCacheMap = new ConcurrentHashMap<>();
    /**
     * 重試時間
     */
    private Long timeout = 3000L;
    private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();


    @Override
    public boolean tryLock() {
        Thread currentThread = Thread.currentThread();
        RedisLockInfo redisLockInfo = lockCacheMap.get(currentThread);
        //判斷是不是當前線程獲取到了鎖
        if (null != redisLockInfo && redisLockInfo.isState()) {
            //證明已經獲取到了鎖,鎖重入
            log.info(">>>>>>>>>>>>>>>>>已經獲取到了鎖");
            redisLockInfo.incrLockCount(); //獲取鎖的次數加1
            return true;
        }
        Long startTime = System.currentTimeMillis();
        Long lockExpire = 30l; //鍵值的過期時間
        //重試獲取鎖
        for (; ; ) {
            Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(redisLockKey, "1", lockExpire, TimeUnit.SECONDS);
            if (lock) {
                //獲取鎖成功
                log.info(">>>>>>>>>>>>>>獲取鎖成功");
                lockCacheMap.put(currentThread, new RedisLockInfo(currentThread, lockExpire));
                return true;
            }
            // 控制一個超時的時間
            Long endTime = System.currentTimeMillis();
            if (endTime - startTime > timeout) {
                log.info("在3秒內已經過了重試時間了>>>>>>>>>>>");
                return false;
            }
            // 繼續循環
            try {
                Thread.sleep(10); //休眠避免CPU飆高
            } catch (Exception e) {

            }
        }
    }

    @Override
    public boolean releaseLock() {
        Thread currentThread = Thread.currentThread();
        RedisLockInfo redisLockInfo = lockCacheMap.get(currentThread);
        if (null != redisLockInfo && redisLockInfo.isState()) {
            if (redisLockInfo.decreLockCount() <= 0) {
                lockCacheMap.remove(currentThread);
                //删除鍵值
                stringRedisTemplate.delete(redisLockKey);
                return true;
            }
        }
        return false;
    }

    public RedisLockImpl() {
        //開始定時任務實現續命,每3秒續命一次
        this.scheduledExecutorService.scheduleAtFixedRate(new LifeExtensionThread(), 0, 3, TimeUnit.SECONDS);
    }

    //定義監聽線程
    class LifeExtensionThread implements Runnable {
        @Override
        public void run() {
            log.info("開始續命線程>>>>>>>>>>>>>>>>>>>>>>>>>");
            if (lockCacheMap.size() > 0) {
                lockCacheMap.forEach((k, lockInfo) -> {
                    try {
                        Thread lockServiceThread = lockInfo.getLockThread();
                        if (lockServiceThread.isInterrupted()) {
                            //判斷線程是否執行完畢
                            log.info("當前線程已經被停止>>>>>>>>>>>>>>>");
                            lockCacheMap.remove(k); //從緩存中移除線程
                            return;
                        }
                        // 續命鍵值過期時間
                        Integer lifeCount = lockInfo.getLifeCount();
                        if (lifeCount > 5) {
                            log.info(">>>>>>>>>>>>>>>>>您已經續命了多次當前線程還沒有釋放鎖,現在主動將該鎖釋放 避免死鎖的問題");
                            // 1.回滾業務(例如事務回滾)
                            // 2.釋放鎖
                            releaseLock();
                            // 3.將該線程主動停止
                            lockServiceThread.interrupt();
                            // 4.移除監聽
                            lockCacheMap.remove(k);
                            return;
                        }
                        //提前實現續命 延長過期key的時間
                        stringRedisTemplate.expire(redisLockKey, lockInfo.getExpire(), TimeUnit.SECONDS);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
            }
        }
    }
}

模擬效果使用 Jemeter模擬秒殺的業務

package com.xiaojie.controller;

import com.xiaojie.entity.Goods;
import com.xiaojie.lock.RedisLock;
import com.xiaojie.mapper.GoodsMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author xiaojie
 * @version 1.0
 * @description: 秒殺的接口
 * @date 2022/6/9 22:55
 */
@RestController
public class SeckillController {
    @Autowired
    private GoodsMapper goodsMapper;
    @Autowired
    private RedisLock redisLock;

    @GetMapping("/secKill")
    public String secKill(Long id) {
        try {
            if (!redisLock.tryLock()) {
                //獲取鎖失敗
                return "活動太火了,請稍後重試";
            }
            Goods goods = goodsMapper.selectById(id);
            if (null == goods) {
                return "該商品沒有秒殺活動";
            }
            if (goods.getStock() <= 0) {
                return "商品庫存為空了。。。。。";
            }
            //减庫存
            Integer result = goodsMapper.deceGoods(id);
            return result > 0 ? "秒殺成功" : "秒殺失敗";
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //釋放鎖
            redisLock.releaseLock();
        }
        return "fail";
    }
}

三、Redisson框架實現

圖片來源   https://blog.csdn.net/menxu_work/article/details/123827526  

1、Redisson向redis寫入鍵值的時候不再使用setNx的命令,而是使用Lua脚本的形式寫入

2、寫入鍵值成功之後會設置一個後臺線程(看門狗線程)來對鍵值進行續命,默認值是每10秒續命一次,續命時間為30秒

3、鍵值的類型為hash類型,如下圖,其中1是鎖入的次數

 4、釋放鎖的時候,也是通過Lua脚本的方式,當鎖入的次數小於0時,就會删除鍵值。

源碼分析

加鎖代碼

 private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
    //如果沒有過期時間,證明鍵值不存在,直接返回
            return;
        }

        RFuture<RedissonLockEntry> future = subscribe(threadId);
        if (interruptibly) {
            commandExecutor.syncSubscriptionInterrupted(future);
        } else {
            commandExecutor.syncSubscription(future);
        }

//鍵值存在,自旋方式去獲取鎖
        try {
            while (true) {
                ttl = tryAcquire(-1, leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }

                // waiting for message
                if (ttl >= 0) {
                    try {
                        future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        if (interruptibly) {
                            throw e;
                        }
                        future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                } else {
                    if (interruptibly) {
                        future.getNow().getLatch().acquire();
                    } else {
                        future.getNow().getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally {
//取消訂閱
            unsubscribe(future, threadId);
        }
//        get(lockAsync(leaseTime, unit));
    }


  <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " + //判斷鍵值是否存在,==0不存在
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + //創建哈希類型鍵值,並設置值為1
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +//設置過期時間 ,默認是30s
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +//鍵值存在
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +//將鍵值的value值加1
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " + //設置過期時間為30s
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);", //鍵值存在,並且不是重入鎖,則返回鍵值剩餘存活時間
                Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
    }

釋放鎖的代碼

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + //鍵值不存在則錶示鎖已經釋放了
                        "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + //簡直存在,則鍵值减1
                        "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " + //如果减1後鍵值還是存在,則重新設置過期時間
                        "return 0; " +
                        "else " +
                        "redis.call('del', KEYS[1]); " + //删除鍵值
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return nil;",
                Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    }

續命的代碼

 模擬秒殺代碼實現

package com.xiaojie.redisson;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author xiaojie
 * @version 1.0
 * @description: reddisson配置
 * @date 2022/6/10 2:05
 */
@Configuration
public class RedissonConfig {

    @Value("${spring.redis.host}")
    private String host;
    @Value("${spring.redis.port}")
    private String port;
    @Value("${spring.redis.password}")
    private String password;

    @Bean
    public RedissonClient getRedisson() {
        Config config = new Config();
        //設置看門狗時間
//        config.setLockWatchdogTimeout(30000L);
        //設置單機版本redis
        config.useSingleServer().setAddress("redis://" + host + ":" + port);
        //設置密碼
        config.useSingleServer().setPassword(password);
        //設置集群的方式
//        config.useClusterServers().addNodeAddress("redis://" + host + ":" + port);
//        config.useClusterServers().addNodeAddress("redis://" + host2 + ":" + port2);
        //添加主從配置
//      config.useMasterSlaveServers().setMasterAddress("").setPassword("").addSlaveAddress(new String[]{"",""});
        return Redisson.create(config);
    }

}

秒殺的接口

package com.xiaojie.controller;

import com.xiaojie.entity.Goods;
import com.xiaojie.lock.RedisLock;
import com.xiaojie.mapper.GoodsMapper;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author xiaojie
 * @version 1.0
 * @description: 秒殺的接口
 * @date 2022/6/9 22:55
 */
@RestController
public class RedissonSeckillController {
    @Autowired
    private GoodsMapper goodsMapper;
    @Autowired
    private RedissonClient redissonClient;

    @GetMapping("/secKillRedisson")
    public String secKillRedisson(Long id) {
        RLock rLock = null;
        try {
            rLock = redissonClient.getLock(id + "");
            rLock.lock(); //加鎖,加幾次鎖,finally釋放鎖的時候就要釋放幾次鎖
            Goods goods = goodsMapper.selectById(id);
            if (null == goods) {
                return "該商品沒有秒殺活動";
            }
            if (goods.getStock() <= 0) {
                return "商品庫存為空了。。。。。";
            }
            //减庫存
            Integer result = goodsMapper.deceGoods(id);
            return result > 0 ? "秒殺成功" : "秒殺失敗";
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            rLock.unlock();
        }
        return "fail";
    }
}

原网站

版权声明
本文为[熟透的蝸牛]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/06/202206111250521946.html