方案设计 | 基于Redis的分布式锁设计

1.基于Redis客户端的设计

1.初级设计

public class JuniorRedisLock implements RedisLock{

    @Autowired
    private StringRedisTemplate redisTemplate;
    @Override
    public boolean lock(String key) {
        long id = Thread.currentThread().getId();
        Boolean result = redisTemplate.opsForValue().setIfAbsent(LOCK_PREFIX + key, id + "");
        return Boolean.TRUE.equals(result);
    }

    @Override
    public boolean unLock(String key) {
        Boolean reuslt = redisTemplate.delete(LOCK_PREFIX + key);
        return Boolean.TRUE.equals(reuslt);
    }
}


 public void testLock(){
        String productId = "productId000000";
        //加锁
        boolean result = redisLock.lock(productId);
        //加锁失败 直接return 只是举例 真实场景具体问题具体分析
        if(!result){
            return;  //实际场景中 一般是需要重试几下
        }
        try {
            //加锁成功后 执行业务逻辑
            soldTicket += 1;
            log.info("soldTicket: {}", soldTicket);
        }finally {
            //解锁
            redisLock.unLock(productId);
        }
    }

上述的设计基本上可以满足分布式锁的需求,但是由于没有对锁设置过期时间,这就会导致在解锁前服务宕机了,就会产生了死锁,进而影响业务处理。为此,我们将setNx命令带上过期时间。

2进阶设计

将setNX命令带上过期时间的时候,又会引入新的问题。

当请求A在获取锁后设置过期时间为10s,但是执行业务逻辑11s,这时过期时间已到,锁会自动释放。这时请求B会获取到锁,由于请求A还没有执行删除锁的逻辑,当在执行删除锁的逻辑时,删除的是请求B加的锁,因此会产生问题。

基于以上分析的问题,我可以通过对每一个请求加锁时设置对应的值,解锁时取出该值和当前请求加锁时的值比较,如果符合再去解锁,如果不符合不做处理即可,具体流程图如下所示:

image-20230715104029641

具体代码实现如下:

//加锁 一般情况下可以采用threadId作为锁对象  我这里使用了uuid + threadId的方式埋在了ThreadLocal中了
     public boolean lock(String key) {
        String id = getThreadUUID();
        Boolean result = redisTemplate.opsForValue().setIfAbsent(LOCK_PREFIX + key, id, expiredTime,                 TimeUnit.SECONDS);
        return Boolean.TRUE.equals(result);
    }
    //这个方法在实际使用中要保证原子性
    public boolean unLock(String key) {
        //1.获取锁对象
        String id = redisTemplate.opsForValue().get(LOCK_PREFIX + key);
        String currentThreadId = getThreadUUID();
        //2.判断锁对象
        if(currentThreadId.equals(id)){
            //3.释放锁对象
            Boolean reuslt = redisTemplate.delete(LOCK_PREFIX + key);
            //清除threadLocal中的数据
            removeThreadUUID();
            return Boolean.TRUE.equals(reuslt);
        }
        return false;
    }

3.高级设计

实际上,引入了对key设置过期时间时,在解锁时需要根据当前线程来判断决定是否释放锁。这里分为三步,先获取锁的对象(也就是存储的id),如果是当前线程对象,再去释放。如果当判断当前线程满足,准备即将释放锁时,此时锁过期了,被另外一个线程设置了锁,这时候还是会造成误删锁,虽然概率很低,但是还是会发生。导致这个问题根本的原因就是上述三步不是原子性操作。

Lua 脚本
//KEYS[1]是锁的key ARGV[1]是当前线程标识
if(redis.call('GET', KEYS[1]) == ARGV[1])) then
    return redis.call('DEL', KEYS[1])
end
    return 0;

//在Java中定义执行脚本的变量
 protected static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
    static {
        UNLOCK_SCRIPT = new DefaultRedisScript<>();
        UNLOCK_SCRIPT.setLocation(new ClassPathResource("/script/unlock.lua"));
        UNLOCK_SCRIPT.setResultType(Long.class);
    }
//主要是unlock方法的改变
public class SeniorRedisLock extends AbstraRedisLock{
    @Override
    public boolean unLock(String key) {
        String currentThreadId = getThreadUUID();
        Long result = redisTemplate.execute(UNLOCK_SCRIPT, List.of(LOCK_PREFIX + key), currentThreadId);
        removeThreadUUID();
        if(result <= 0){
            log.error("unlock exception, key: {}", key); //对于锁对象过期时间过短的error日志提示
        }
        return result > 0;
    }
}

问题思考:

  1. 上述采用的统一的方式对锁设置了过期时间为10秒,那如果业务方可能不止10秒呢?或者不到10秒呢?所以要交出这个参数由业务方自己决定。另外这个业务逻辑代码也可以交出去,可以考虑使用lambda函数方式
  2. 如果业务方式定义为15秒,但是到了15秒业务逻辑没有执行完,锁会自动释放。希望有一种方式能够实现续锁,又可以避免死锁,详细内容见后文;
  3. 业务方在调用的时候都是通过try-finally的方式去使用,有没有更加简便的方式,例如是否可以使用声明式的方式实现。

4.声明式分布式锁设计

基于问题1,提供了新的一个接口,如下:

//参数全部有业务方自行决定 灵活性很强 此处只做两件事:加锁 解锁(如果没有加成功 可以重试)
public <T> T executeWithRedisLock(String lockKey, long lockExpriedTime, long tryLockTime, Supplier<T> suppierForBiz){
        //int counter = 0;
        //获取重试结束时间
        long currentTime = System.nanoTime() + tryLockTime * 1_000_000_000;
        //还没到重试结束时间 一直重试
        while (currentTime > System.nanoTime()){
            //lock successfully -> excute the biz code
            //加锁成功 执行业务逻辑代码
            if(lock(lockKey, lockExpriedTime)){
                try{
                    return suppierForBiz.get();
                }catch (Exception e){
                    log.info("execute biz exception");
                }finally {
                    unLock(lockKey);
                }
            }else {
                //提供尝试重试机制 而不是一上来就等待
             //  if(counter < 10){
             //       counter ++;
             //       continue;
             //   }
                //加锁失败 线程休眠 醒后继续尝试获取锁
                try {
                    Thread.sleep(100);
                    //counter = 0;
                } catch (InterruptedException e) {
                }
            }
        }
        log.error("lock failed");
        return null;
    }

为了简化使用,使用注解的方式实现分布式锁,满足上述问题3的要求。

//定义redis锁的注解 此处定义成方法级别
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface RedisLocked {

    String key();

    long lockExpiredTime();

    long tryLockTime();
}

//解析注解 使用环绕通知来实现对biz code的增强 主要就是在执行前加一把公平锁 执行后释放锁
@Aspect
@Component
@Slf4j
public class RedisLockAspect {
    @Autowired
    private RedisLock redisLock;

    @Around("@annotation(com.dev.wizard.redis.lock.RedisLocked)")
    public Object around(ProceedingJoinPoint joinPoint){
        MethodSignature methodSign = (MethodSignature)joinPoint.getSignature();
        Method method = methodSign.getMethod();
        RedisLocked redisLockedAnnotation = method.getAnnotation(RedisLocked.class);
        if(null == redisLockedAnnotation){
            try {
                return joinPoint.proceed();
            } catch (Throwable e) {
                throw new RuntimeException(e);
            }
        }
        String key = redisLockedAnnotation.key();
        long lockExpiredTime = redisLockedAnnotation.lockExpiredTime();
        long tryLockTime = redisLockedAnnotation.tryLockTime();
        return redisLock.executeWithRedisLock(key, lockExpiredTime, tryLockTime, () ->{
            try {
              return joinPoint.proceed();
            } catch (Throwable e) {
                throw new RuntimeException(e);
            }
        });
    }
}

//使用RedisLocked的注解
@RedisLocked(key = "product01", lockExpiredTime = 100, tryLockTime = 100)
    public void testLock3(){

    }

在上述当没有获取到锁时就会进入睡眠状态,这里其实有好几种方案:

  • 一直重试;
  • 先重试一定次数,然后再进入睡眠状态;Synchronized关键字底层就是先通过重试 然后再加入阻塞队列

小编也做了类似的实验,重试的次数为10然后进入睡眠qps稍微降低并且错误率为0,直接进入睡眠状态的性能qps最好但是会存在一定的错误率,一直重试的性能最差。实际上我理解应该是先重试后睡眠的方式性能最好的。

2.基于Redisson的设计

上述的分布式锁是可以用于生产项目中,但是还是有不完美的地方。如下:

  1. 锁释放问题:虽然我们可以将锁的过期时间交给了业务方去设置了,这会带来一个问题,每一个业务方都要评估这个锁的过期时间。时间设置太多,会导致锁自动失效导致业务逻辑执行有问题;时间设置太长,如果执行过程中出现问题(宕机),锁长时间不能释放导致大量的请求在等待,等待后获取锁失败,依然会影响业务执行。因为我们需要一种方式,一方面是业务不需要关心锁的过期时(其实更多的是主动释放锁而不是根据过期时间来释放),另一方面是在释放锁失败时根据过期时间也能有效的释放锁。
  2. 锁的不可重入问题:上述的设计在对自身方法调用时会产生死锁的;
  3. 性能问题:上述通过sleep的方式不断尝试去获取锁,如果存在当锁释放时能够主动通知就很好了;

对于以上的三个问题,Redisson都帮我们做好了,并且通过小编的亲测,qps提升了三倍左右。

//配置Redisson
@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient(){
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        return Redisson.create(config);
    }
}

//声明方法 lockExpriedTime可设置为-1 不用关心锁的过期时间
public <T> T executeWithRedisLock(String lockKey, long lockExpriedTime, long tryLockTime, Supplier<T> suppierForBiz){
        RLock rlock = redissonClient.getLock(LOCK_PREFIX + lockKey);
        boolean locked = false;
        try {
            locked = rlock.tryLock(tryLockTime, lockExpriedTime, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        if(locked){
            try {
              return suppierForBiz.get();
            }catch (Exception e){

            }finally {
                rlock.unlock();
            }
        }
        throw new RuntimeException();
    }

2.1可重入锁的设计原理

在第一章中使用的setNX的命令可以实现分布式锁,不支持锁的重入。对于可重入锁的设计的话必须维护一个计数器,当锁重入时加一锁释放时减一,当这个计数器为0时完全释放锁。基于上述的简单描述,整个可重入锁的原理如下:

维护计数器的采用hash结构,key作为锁对象,field作为当前线程id,value就是重入计数器。

image-20230716152912074

由于上述的逻辑都需要基于原子性操作,因为redisson也是基于lua脚本来实现上述逻辑的原子性的。

加锁逻辑见:org.redisson.RedissonLock#tryLockInnerAsync

解锁逻辑见:org.redisson.RedissonLock#unlockInnerAsync

2.2Redisson分布式锁的性能分析

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {

        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();

        final long threadId = Thread.currentThread().getId();
        //1.获取锁成功 返回null; 2.获取锁失败返回剩余有效期
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }
        //锁获取失败后需要重试
        //剩余等待时间
        time -= (System.currentTimeMillis() - current);
        if (time <= 0) {
            //如果剩余等待时间已经到了 返回获取锁失败结果
            acquireFailed(threadId);
            return false;
        }

        //如果还没到等到时间,还可以继续尝试获取,但不是立即去获取锁,立即获取锁大概率还是失败还造成了CPU的负载
        current = System.currentTimeMillis();
        //这里用了一个比较巧妙的方式,redission基于订阅机制 拿着当前线程Id订阅的是别的线程释放锁的信号,这就以为这如果别的线程释放锁后会通知到这个线程
        //我的设计是基于不断循环尝试的方式 当然性能不如别人了
        final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        //这里会有个等待逻辑 time是上述的剩余等待时间 等你一段时间,等不到返回为false
        //注意:这里等待的只是释放锁的通知 不是加锁成功
        if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
            //超时后要取消订阅
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
                    @Override
                    public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                        if (subscribeFuture.isSuccess()) {
                            unsubscribe(subscribeFuture, threadId);
                        }
                    }
                });
            }
            //等待时间已经到了 返回获取锁失败结果
            acquireFailed(threadId);
            return false;
        }
        //等一段时间 等到别的线程释放了锁了
        try {
            //经过上述一段代码的执行,再次计算剩余的等待时间
            time -= (System.currentTimeMillis() - current);
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
           //还存在剩余的等待时间 再次重试获取锁
            while (true) {
                long currentTime = System.currentTimeMillis();
                //和上面一样 获取锁失败会返回锁的ttl时间
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }

                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }

                // waiting for message
                currentTime = System.currentTimeMillis();
                // 此处采用信号量的方式来进行等待的 当时间到了也会返回false
                //1.剩余等待时间大于ttl,那当然选择ttl作为等待时间
                //2.剩余等待时间小于ttl,选择剩余等待时间作为等待时间,等不到就得返回获取锁的结果
                if (ttl >= 0 && ttl < time) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }
                //等待一段时间后继续尝试
                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(subscribeFuture, threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }

image-20230716161259569 redisson在主动释放锁时会发布一条事件,上述的性能提升主要就是基于订阅机制,当锁释放后会通知每一个在等待的线程,然后这些线程再去获取锁,而不是盲目的去轮询尝试。注意:Redis采用信号量的方式保证只有客户端的一个线程能发出请求到redis中加锁,这也是为了避免惊群效应。

2.3Redisson分布式锁的续期分析

private long lockWatchdogTimeout = 30 * 1000;

//org.redisson.RedissonLock#tryAcquireAsync
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
        //当开发者设置了锁的释放时间,那么选择对应设置的时间
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
       //如果不设置时间或者设置的释放时间为-1 那么整个续期逻辑都交给了watchdog
         RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
                                                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    //当上述的future完成后会调用下面的lambda  类似函数回调
    //剩余有效期 和 异常
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            //如果产生了异常 直接return
            if (e != null) {
                return;
            }

            // lock acquired
            //获取锁成功
            if (ttlRemaining == null) {
                //续期任务调度
                scheduleExpirationRenewal(threadId);
            }
        });
        return ttlRemainingFuture;
    }


    private void scheduleExpirationRenewal(long threadId) {
        //保证同一个锁拿到的是同一个entry  entry的目标是维护了一个线程id和一个定时任务
        ExpirationEntry entry = new ExpirationEntry();
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        //如果是重入状态返回的是之前的entry
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        } else {
            //如果是第一次进入的 开启续时任务
            entry.addThreadId(threadId);
            renewExpiration();
        }
    }



//续时任务的方法
private void renewExpiration() {
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        //定时任务:delay的参数internalLockLeaseTime / 3,这个internalLockLeaseTime就是watchdog的时间 30s
        //所以就是10秒后出触发定时任务续期
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                //获取entry后拿到线程id
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                //对该线程的分布式锁key重新续期
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getName() + " expiration", e);
                        return;
                    }
                    //执行完过后递归调用续期方法
                    if (res) {
                        // reschedule itself
                        renewExpiration();
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        //将定时任务塞到entry中 目的是为了锁释放后取消定时任务
        ee.setTimeout(task);
    }

Redisson主要通过watchDog机制解决了续期问题,简单来说,watchDog机制是对每一个锁维护了一个定时任务,到了一段时间后自动重新设置过期时间。

3总结

本文主要基于Redis由浅入深设计分布式锁,并分析每种设计中存在的缺陷。例如不设置过期时间存在死锁问题,设置过期时间存在误删问题,以及可重入锁的问题,甚至还有对于性能的问题。Redisson是分布式锁的最终解决方案,在Redisson中使用了watchDog解决了续期问题,使用了Hash的结构实现了可重入锁的问题,并且对于Redisson的分布式锁的获取锁等源码进行了深入了分析。