1.基于Redis客户端的设计
1.初级设计
public class JuniorRedisLock implements RedisLock{
@Autowired
private StringRedisTemplate redisTemplate;
@Override
public boolean lock(String key) {
long id = Thread.currentThread().getId();
Boolean result = redisTemplate.opsForValue().setIfAbsent(LOCK_PREFIX + key, id + "");
return Boolean.TRUE.equals(result);
}
@Override
public boolean unLock(String key) {
Boolean reuslt = redisTemplate.delete(LOCK_PREFIX + key);
return Boolean.TRUE.equals(reuslt);
}
}
public void testLock(){
String productId = "productId000000";
//加锁
boolean result = redisLock.lock(productId);
//加锁失败 直接return 只是举例 真实场景具体问题具体分析
if(!result){
return; //实际场景中 一般是需要重试几下
}
try {
//加锁成功后 执行业务逻辑
soldTicket += 1;
log.info("soldTicket: {}", soldTicket);
}finally {
//解锁
redisLock.unLock(productId);
}
}
上述的设计基本上可以满足分布式锁的需求,但是由于没有对锁设置过期时间,这就会导致在解锁前服务宕机了,就会产生了死锁,进而影响业务处理。为此,我们将setNx命令带上过期时间。
2进阶设计
将setNX命令带上过期时间的时候,又会引入新的问题。
当请求A在获取锁后设置过期时间为10s,但是执行业务逻辑11s,这时过期时间已到,锁会自动释放。这时请求B会获取到锁,由于请求A还没有执行删除锁的逻辑,当在执行删除锁的逻辑时,删除的是请求B加的锁,因此会产生问题。
基于以上分析的问题,我可以通过对每一个请求加锁时设置对应的值,解锁时取出该值和当前请求加锁时的值比较,如果符合再去解锁,如果不符合不做处理即可,具体流程图如下所示:
具体代码实现如下:
//加锁 一般情况下可以采用threadId作为锁对象 我这里使用了uuid + threadId的方式埋在了ThreadLocal中了
public boolean lock(String key) {
String id = getThreadUUID();
Boolean result = redisTemplate.opsForValue().setIfAbsent(LOCK_PREFIX + key, id, expiredTime, TimeUnit.SECONDS);
return Boolean.TRUE.equals(result);
}
//这个方法在实际使用中要保证原子性
public boolean unLock(String key) {
//1.获取锁对象
String id = redisTemplate.opsForValue().get(LOCK_PREFIX + key);
String currentThreadId = getThreadUUID();
//2.判断锁对象
if(currentThreadId.equals(id)){
//3.释放锁对象
Boolean reuslt = redisTemplate.delete(LOCK_PREFIX + key);
//清除threadLocal中的数据
removeThreadUUID();
return Boolean.TRUE.equals(reuslt);
}
return false;
}
3.高级设计
实际上,引入了对key设置过期时间时,在解锁时需要根据当前线程来判断决定是否释放锁。这里分为三步,先获取锁的对象(也就是存储的id),如果是当前线程对象,再去释放。如果当判断当前线程满足,准备即将释放锁时,此时锁过期了,被另外一个线程设置了锁,这时候还是会造成误删锁,虽然概率很低,但是还是会发生。导致这个问题根本的原因就是上述三步不是原子性操作。
Lua 脚本
//KEYS[1]是锁的key ARGV[1]是当前线程标识
if(redis.call('GET', KEYS[1]) == ARGV[1])) then
return redis.call('DEL', KEYS[1])
end
return 0;
//在Java中定义执行脚本的变量
protected static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("/script/unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
//主要是unlock方法的改变
public class SeniorRedisLock extends AbstraRedisLock{
@Override
public boolean unLock(String key) {
String currentThreadId = getThreadUUID();
Long result = redisTemplate.execute(UNLOCK_SCRIPT, List.of(LOCK_PREFIX + key), currentThreadId);
removeThreadUUID();
if(result <= 0){
log.error("unlock exception, key: {}", key); //对于锁对象过期时间过短的error日志提示
}
return result > 0;
}
}
问题思考:
- 上述采用的统一的方式对锁设置了过期时间为10秒,那如果业务方可能不止10秒呢?或者不到10秒呢?所以要交出这个参数由业务方自己决定。另外这个业务逻辑代码也可以交出去,可以考虑使用lambda函数方式
- 如果业务方式定义为15秒,但是到了15秒业务逻辑没有执行完,锁会自动释放。希望有一种方式能够实现续锁,又可以避免死锁,详细内容见后文;
- 业务方在调用的时候都是通过try-finally的方式去使用,有没有更加简便的方式,例如是否可以使用声明式的方式实现。
4.声明式分布式锁设计
基于问题1,提供了新的一个接口,如下:
//参数全部有业务方自行决定 灵活性很强 此处只做两件事:加锁 解锁(如果没有加成功 可以重试)
public <T> T executeWithRedisLock(String lockKey, long lockExpriedTime, long tryLockTime, Supplier<T> suppierForBiz){
//int counter = 0;
//获取重试结束时间
long currentTime = System.nanoTime() + tryLockTime * 1_000_000_000;
//还没到重试结束时间 一直重试
while (currentTime > System.nanoTime()){
//lock successfully -> excute the biz code
//加锁成功 执行业务逻辑代码
if(lock(lockKey, lockExpriedTime)){
try{
return suppierForBiz.get();
}catch (Exception e){
log.info("execute biz exception");
}finally {
unLock(lockKey);
}
}else {
//提供尝试重试机制 而不是一上来就等待
// if(counter < 10){
// counter ++;
// continue;
// }
//加锁失败 线程休眠 醒后继续尝试获取锁
try {
Thread.sleep(100);
//counter = 0;
} catch (InterruptedException e) {
}
}
}
log.error("lock failed");
return null;
}
为了简化使用,使用注解的方式实现分布式锁,满足上述问题3的要求。
//定义redis锁的注解 此处定义成方法级别
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface RedisLocked {
String key();
long lockExpiredTime();
long tryLockTime();
}
//解析注解 使用环绕通知来实现对biz code的增强 主要就是在执行前加一把公平锁 执行后释放锁
@Aspect
@Component
@Slf4j
public class RedisLockAspect {
@Autowired
private RedisLock redisLock;
@Around("@annotation(com.dev.wizard.redis.lock.RedisLocked)")
public Object around(ProceedingJoinPoint joinPoint){
MethodSignature methodSign = (MethodSignature)joinPoint.getSignature();
Method method = methodSign.getMethod();
RedisLocked redisLockedAnnotation = method.getAnnotation(RedisLocked.class);
if(null == redisLockedAnnotation){
try {
return joinPoint.proceed();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
String key = redisLockedAnnotation.key();
long lockExpiredTime = redisLockedAnnotation.lockExpiredTime();
long tryLockTime = redisLockedAnnotation.tryLockTime();
return redisLock.executeWithRedisLock(key, lockExpiredTime, tryLockTime, () ->{
try {
return joinPoint.proceed();
} catch (Throwable e) {
throw new RuntimeException(e);
}
});
}
}
//使用RedisLocked的注解
@RedisLocked(key = "product01", lockExpiredTime = 100, tryLockTime = 100)
public void testLock3(){
}
在上述当没有获取到锁时就会进入睡眠状态,这里其实有好几种方案:
- 一直重试;
- 先重试一定次数,然后再进入睡眠状态;Synchronized关键字底层就是先通过重试 然后再加入阻塞队列
小编也做了类似的实验,重试的次数为10然后进入睡眠qps稍微降低并且错误率为0,直接进入睡眠状态的性能qps最好但是会存在一定的错误率,一直重试的性能最差。实际上我理解应该是先重试后睡眠的方式性能最好的。
2.基于Redisson的设计
上述的分布式锁是可以用于生产项目中,但是还是有不完美的地方。如下:
- 锁释放问题:虽然我们可以将锁的过期时间交给了业务方去设置了,这会带来一个问题,每一个业务方都要评估这个锁的过期时间。时间设置太多,会导致锁自动失效导致业务逻辑执行有问题;时间设置太长,如果执行过程中出现问题(宕机),锁长时间不能释放导致大量的请求在等待,等待后获取锁失败,依然会影响业务执行。因为我们需要一种方式,一方面是业务不需要关心锁的过期时(其实更多的是主动释放锁而不是根据过期时间来释放),另一方面是在释放锁失败时根据过期时间也能有效的释放锁。
- 锁的不可重入问题:上述的设计在对自身方法调用时会产生死锁的;
- 性能问题:上述通过sleep的方式不断尝试去获取锁,如果存在当锁释放时能够主动通知就很好了;
对于以上的三个问题,Redisson都帮我们做好了,并且通过小编的亲测,qps提升了三倍左右。
//配置Redisson
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
return Redisson.create(config);
}
}
//声明方法 lockExpriedTime可设置为-1 不用关心锁的过期时间
public <T> T executeWithRedisLock(String lockKey, long lockExpriedTime, long tryLockTime, Supplier<T> suppierForBiz){
RLock rlock = redissonClient.getLock(LOCK_PREFIX + lockKey);
boolean locked = false;
try {
locked = rlock.tryLock(tryLockTime, lockExpriedTime, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if(locked){
try {
return suppierForBiz.get();
}catch (Exception e){
}finally {
rlock.unlock();
}
}
throw new RuntimeException();
}
2.1可重入锁的设计原理
在第一章中使用的setNX的命令可以实现分布式锁,不支持锁的重入。对于可重入锁的设计的话必须维护一个计数器,当锁重入时加一锁释放时减一,当这个计数器为0时完全释放锁。基于上述的简单描述,整个可重入锁的原理如下:
维护计数器的采用hash结构,key作为锁对象,field作为当前线程id,value就是重入计数器。
由于上述的逻辑都需要基于原子性操作,因为redisson也是基于lua脚本来实现上述逻辑的原子性的。
加锁逻辑见:org.redisson.RedissonLock#tryLockInnerAsync
解锁逻辑见:org.redisson.RedissonLock#unlockInnerAsync
2.2Redisson分布式锁的性能分析
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
final long threadId = Thread.currentThread().getId();
//1.获取锁成功 返回null; 2.获取锁失败返回剩余有效期
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
//锁获取失败后需要重试
//剩余等待时间
time -= (System.currentTimeMillis() - current);
if (time <= 0) {
//如果剩余等待时间已经到了 返回获取锁失败结果
acquireFailed(threadId);
return false;
}
//如果还没到等到时间,还可以继续尝试获取,但不是立即去获取锁,立即获取锁大概率还是失败还造成了CPU的负载
current = System.currentTimeMillis();
//这里用了一个比较巧妙的方式,redission基于订阅机制 拿着当前线程Id订阅的是别的线程释放锁的信号,这就以为这如果别的线程释放锁后会通知到这个线程
//我的设计是基于不断循环尝试的方式 当然性能不如别人了
final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
//这里会有个等待逻辑 time是上述的剩余等待时间 等你一段时间,等不到返回为false
//注意:这里等待的只是释放锁的通知 不是加锁成功
if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
//超时后要取消订阅
if (!subscribeFuture.cancel(false)) {
subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
@Override
public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
if (subscribeFuture.isSuccess()) {
unsubscribe(subscribeFuture, threadId);
}
}
});
}
//等待时间已经到了 返回获取锁失败结果
acquireFailed(threadId);
return false;
}
//等一段时间 等到别的线程释放了锁了
try {
//经过上述一段代码的执行,再次计算剩余的等待时间
time -= (System.currentTimeMillis() - current);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
//还存在剩余的等待时间 再次重试获取锁
while (true) {
long currentTime = System.currentTimeMillis();
//和上面一样 获取锁失败会返回锁的ttl时间
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= (System.currentTimeMillis() - currentTime);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
// waiting for message
currentTime = System.currentTimeMillis();
// 此处采用信号量的方式来进行等待的 当时间到了也会返回false
//1.剩余等待时间大于ttl,那当然选择ttl作为等待时间
//2.剩余等待时间小于ttl,选择剩余等待时间作为等待时间,等不到就得返回获取锁的结果
if (ttl >= 0 && ttl < time) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
//等待一段时间后继续尝试
time -= (System.currentTimeMillis() - currentTime);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
}
} finally {
unsubscribe(subscribeFuture, threadId);
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}
redisson在主动释放锁时会发布一条事件,上述的性能提升主要就是基于订阅机制,当锁释放后会通知每一个在等待的线程,然后这些线程再去获取锁,而不是盲目的去轮询尝试。注意:Redis采用信号量的方式保证只有客户端的一个线程能发出请求到redis中加锁,这也是为了避免惊群效应。
2.3Redisson分布式锁的续期分析
private long lockWatchdogTimeout = 30 * 1000;
//org.redisson.RedissonLock#tryAcquireAsync
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
//当开发者设置了锁的释放时间,那么选择对应设置的时间
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
//如果不设置时间或者设置的释放时间为-1 那么整个续期逻辑都交给了watchdog
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
//当上述的future完成后会调用下面的lambda 类似函数回调
//剩余有效期 和 异常
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
//如果产生了异常 直接return
if (e != null) {
return;
}
// lock acquired
//获取锁成功
if (ttlRemaining == null) {
//续期任务调度
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
private void scheduleExpirationRenewal(long threadId) {
//保证同一个锁拿到的是同一个entry entry的目标是维护了一个线程id和一个定时任务
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
//如果是重入状态返回的是之前的entry
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
//如果是第一次进入的 开启续时任务
entry.addThreadId(threadId);
renewExpiration();
}
}
//续时任务的方法
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
//定时任务:delay的参数internalLockLeaseTime / 3,这个internalLockLeaseTime就是watchdog的时间 30s
//所以就是10秒后出触发定时任务续期
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
//获取entry后拿到线程id
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
//对该线程的分布式锁key重新续期
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
//执行完过后递归调用续期方法
if (res) {
// reschedule itself
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
//将定时任务塞到entry中 目的是为了锁释放后取消定时任务
ee.setTimeout(task);
}
Redisson主要通过watchDog机制解决了续期问题,简单来说,watchDog机制是对每一个锁维护了一个定时任务,到了一段时间后自动重新设置过期时间。
3总结
本文主要基于Redis由浅入深设计分布式锁,并分析每种设计中存在的缺陷。例如不设置过期时间存在死锁问题,设置过期时间存在误删问题,以及可重入锁的问题,甚至还有对于性能的问题。Redisson是分布式锁的最终解决方案,在Redisson中使用了watchDog解决了续期问题,使用了Hash的结构实现了可重入锁的问题,并且对于Redisson的分布式锁的获取锁等源码进行了深入了分析。