再谈分布式锁之剖析Redis实现
之前笔者已经写过关于分布式锁的内容,但囿于彼时对于分布式锁的研究还不算太深入,如今读来发现还是存在一些问题,故而写作本文,对Redis分布式锁的实现做一个更加全面、进阶的阐述和总结,帮助读者对Redis分布式锁有一个更加深入客观的了解。关于更多分布式锁的其他实现,在后续的文章中也会陆续展开。
我们还是通过经典的WWH(what why how)三段论方式进行行文。首先再次从宏观上了解什么是分布式锁以及分布式锁的约束条件和常见实现方式。
分布式锁
这部分主要对分布式锁再次做一次较为完整的回顾与总结。
什么是分布式锁
引用度娘的词条,对于分布式锁的解释如下:
这段话概括的还是不错的,根据概述以及对单机锁的了解,我们能够提炼并类比得出分布式锁的几个主要约束条件:
分布式锁的约束条件
特点 | 描述 |
---|---|
互斥性 | 即:在任意时刻,只有一个客户端能持有锁 |
安全性 | 即:不会出现死锁的情况,当一个客户端在持有锁期间内,由于意外崩溃而导致锁未能主动解锁,其持有的锁也能够被正确释放,并保证后续其它客户端也能加锁; |
可用性 | 即:分布式锁需要有一定的故障恢复能力,通过高可用机制能够保证故障发生的情况下能够最大限度对外提供服务,无单点风险。如:通过Redis的集群模式、哨兵模式;ETCD/zookeeper的集群选主能力等保证HA |
对称性 | 对于任意一个锁,其加锁和解锁必须是同一个客户端,即客户端 A 不能把客户端 B 加的锁给解了。这又称为锁的可重入性。 |
基于上述特点,这里直接给出常见的实现方式,笔者之前的文章也有对这些常见实现方式的详述,此处只是作为概括,不再展开,感兴趣的同学可以自行查阅博客的历史记录。
分布式锁常见实现方式
类别 | 举例 |
---|---|
通过数据库方式实现 | 如:采用乐观锁、悲观锁或者基于主键唯一约束实现 |
基于分布式缓存实现的锁服务 | 如: Redis 和基于 Redis 的 RedLock(Redisson提供了参考实现) |
基于分布式一致性算法实现的锁服务 | 如:ZooKeeper、Chubby(google闭源实现) 和 Etcd |
简单对分布式锁的概念做了一个总结整理后,我们进入本文的正题,对Redis实现分布式锁的机理展开论述。
分布式锁Redis原理
这部分对Redis实现分布式锁的原理进行展开论述。
Redis分布式锁核心指令:加锁
既然是锁,核心操作无外乎加锁、解锁,首先来看一下通过Redis的哪个指令进行加锁操作。
SET lock_name my_random_value NX PX 30000
这个指令的含义是在键“lock_name”不存在时,设置键的值,到期时间为30秒。我们通过该命令就能实现加锁功能。
这里对该命令做一个较为详细的讲解。
命令格式:
SET KEY VALUE [EX seconds] [PX milliseconds] [NX|XX]
- EX seconds − 设置到期时间(秒为单位)。
- PX milliseconds - 设置到期时间(毫秒为单位)。
- NX - 仅在键不存在时设置键。
- XX - 只有在键已存在时才设置。
我们的目的在于使锁具有互斥性,因此采用NX参数, 仅在锁不存在时才能设置锁成功。
加锁参数解析
我们回过头接着看下加锁的完整实例:
SET lock_name my_random_value NX PX 30000
- lock_name,即分布式锁的名称,对于 Redis 而言,lock_name 就是 Key-Value 中的 Key且具有唯一性。
- my_random_value,由客户端生成的一个随机字符串,它要保证在足够长的一段时间内,且在所有客户端的所有获取锁的请求中都是唯一的,用于唯一标识锁的持有者。
- NX 表示只有当 lock_name(key) 不存在的时候才能 SET 成功,从而保证只有一个客户端能获得锁,而其它客户端在锁被释放之前都无法获得锁。
- PX 30000 表示这个锁节点有一个 30 秒的自动过期时间(目的是为了防止持有锁的客户端故障后,无法主动释放锁而导致死锁,因此要求锁的持有者必须在过期时间之内执行完相关操作并释放锁)。
Redis分布式锁核心指令:解锁
解锁通过del命令即可触发,完整指令如下:
del lock_name
对该指令做一个解释:
- 在加锁时为锁设置过期时间,当过期时间到达,Redis 会自动删除对应的 Key-Value,从而避免死锁。
- 注意,这个过期时间需要结合具体业务综合评估设置,以保证锁的持有者能够在过期时间之内执行完相关操作并释放锁。
- 正常执行完毕,未到达锁过期时间,通过del lock_name主动释放锁。
以上便是基于Redis实现分布式锁能力的核心指令,我们接着看一个常见的错误实现案例。
Redis分布式锁常见错误案例:setNx
首先看一段java代码:
Jedis jedis = jedisPool.getResource();
// 如果锁不存在则进行加锁
Long lockResult = jedis.setnx(lockName, myRandomValue);
if (lockResult == 1) {
// 设置锁过期时间,加锁和设置过期时间是两步完成的,非原子操作
jedis.expire(lockName, expireTime);
}
setnx() 方法的作用就是 SET IF NOT EXIST,expire() 方法就是给锁加一个过期时间。
乍看觉得这段代码没什么问题,但仔细推敲一下就能看出,其实这里是有问题的:加锁实际上使用了两条 Redis 命令,这个组合操作是非原子性的。
如果执行setNx成功后,接着执行expire时发生异常导致锁的过期时间未能设置,便会造成锁无过期时间。后续如果执行的过程中出现业务执行异常或者出现FullGC等情况,将会导致锁一致无法释放,从而造成死锁。
网上很多博客中采用的就是这种较为初级的实现方式,不建议仿效。
究其原因,还是因为setNx本身虽然能够保证设置值的原子性,但它与expire组合使用,整个操作(加锁并设置过期时间)便不是原子的,隐藏了死锁风险。
优雅解锁方案
说完加锁,我们接着说说如何进行优雅的可靠解锁。
这里共有两种方案:
- 通过Lua脚本执行解锁
- 通过使用Redis的事务功能,通过 Redis 事务功能,利用 Watch 命令监控锁对应的 Key实现可靠解锁
1. 利用Lua脚本实现解锁
我们看下官网对脚本原子性的解释:
我们看一段Lua脚本实现的解锁代码;
String script = "if redis.call('get', KEYS[1]) == ARGV[1]
then return redis.call('del', KEYS[1])
else return 0
end";
可能有些读者朋友对Lua脚本了解不多,这里简单介绍下这段脚本的含义:
我们通过 Redis 的 eval() 函数执行 Lua 脚本,其中入参 lockName 赋值给参数 KEYS[1],锁的具体值赋值给 ARGV[1],eval() 函数将 Lua 脚本交给 Redis 服务端执行。
从上面Redis官网文档截图能够看出,通过 eval() 执行 Lua 代码时,Lua 代码将被当成一个命令去执行(可保证原子性),并且直到 eval 命令执行完成,Redis 才会执行其他命令。因此,通过 Lua 脚本结合eval函数,可以科学得实现解锁操作的原子性,避免误解锁。
利用Jedis实现的Java版本代码如下:
Long unlock = 1L;
Jedis jedis = null;
// Lua脚本,用于校验并释放锁
String script = "if redis.call('get', KEYS[1]) == ARGV[1]
then return redis.call('del', KEYS[1])
else
return 0 end";
try {
jedis = jedisPool.getResource();
// 通过 Redis 的 eval() 函数执行 Lua 脚本,
// 入参 lockName 赋值给参数 KEYS[1],myRandomValue 赋值给 ARGV[1],
// eval() 函数将 Lua 脚本交给 Redis 服务端执行。
Object result =
jedis.eval(script,
Collections.singletonList(lockName),
Collections.singletonList(myRandomValue));
// 注意:如果脚本顺利执行将返回1,
// 如果执行脚本时,其它的客户端对这个lockName对应的值进行了更改
// 则返回0
if (unlock.equals(result) {
return true;
}
}
catch (Exception e) {
throw e;
}
finally {
if (null != jedis) {
jedis.close();
}
}
return false;
2. 利用Redis事务实现解锁
首先看一下利用Redis事务实现解锁的代码实现:
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
// 监控锁对应的Key,如果其它的客户端对这个Key进行了更改,那么本次事务会被取消。
jedis.watch(lockName);
// 成功获取锁,则操作公共资源执行自定义流程
// ...自定义流程代码省略...
// 校验是否持有锁
if (lockValue.equals(jedis.get(lockName))) {
// 开启事务功能,
Transaction multi = jedis.multi();
// 释放锁
multi.del(lockName);
// 执行事务(如果其它的客户端对这个Key进行了更改,那么本次事务会被取消,不会执行)
// 如果正常执行,由于只有一个删除操作,返回的list将只有一个对象。
List<Object> result = multi.exec();
if (RELEASE_SUCCESS.equals(result.size())) {
return true;
}
}
}
catch (Exception e) {
throw e;
}
finally {
if (null != jedis) {
jedis.unwatch();
jedis.close();
}
}
根据代码实现,我们总结下通过Redis的事务功能监控并释放锁的步骤:
- 首先通过 Watch 命令监控锁对应的 key(lockName)。当事务开启后,如果其它的客户端对这个 Key 进行了更改,那么本次事务会被取消而不会执行 jedis.watch(lockName)。
- 开启事务功能,代码:jedis.multi()
- 执行释放锁操作。当事务开启后,释放锁的操作便是事务中的一个元素且隶属于该事务,代码:multi.del(lockName);
- 执行事务,代码: multi.exec();
- 最后对资源进行释放,代码 jedis.unwatch();jedis.close();
一种常见的错误解锁方式
这里再重点介绍一种常见的错误解锁方式,以便进行警示。
首先看下代码实现:
Jedis jedis = jedisPool.getResource();
jedis.del(lockName);
该方式直接使用了 jedis.del() 方法删除锁且没有进行校验。这种不校验锁的拥有者而直接执行解锁的粗暴方式,会导致已经存在的锁被错误的释放,从而破坏互斥性(如:一个进程直接通过该方是unlock掉另一个进程的锁)
那么如何进行优化呢?一种方式就是在解锁之前进行校验,判断加锁与解锁的是否为同一个客户端。代码如下:
Jedis jedis = jedisPool.getResource();
if (lockValue.equals(jedis.get(lockName))) {
jedis.del(lockName);
}
这种解锁方式相较于上文中粗暴的方式已经有了明显进步,在解锁之前进行了校验。但是问题并没有得到解决,整个解锁过程仍然是独立的两条命令,并非原子操作。
更为关键之处在于,如果在执行解锁操作的时候,因为异常(如:业务代码异常、FullGC导致的stop the world现象等)而出现了客户端阻塞的现象,导致锁过期自动释放,则当前客户端已经不再持有锁。
当进程恢复执行后,未进行锁持有校验(即进程认为自己还持有锁)而直接调用 del(lockName) 直接对当前存在的锁进行解锁操作,从而导致其他进程持有的锁被跨进程解锁的异常现象,这种情况是不被允许的,它违反了互斥性的原则。
阶段总结
上文中我们了解了基于Redis实现分布式锁的原理,也了解了实现一个Redis分布式锁需要解决的问题。
我们可以感受到实现一个可靠的分布式锁并不是一件容易的事情。
除了上文提到的现象,就算我们代码实现的很健壮,当采用主从架构的Redis集群,仍会出现异常现象:
对于主从异步复制的架构模式,当出现主节点down机时,从节点的数据尚未得到及时同步,此时进程访问到从机,判定为能够加锁,于是获取到锁,从而导致多个进程拿到一把锁的异常现象。
那么有没有一种更加可靠健壮且易用性更好的Redis锁实现方式呢?答案是显而易见的,它就是接下来重点讲解的Redisson分布式锁实现。
关于如何基于Redisson封装一个开箱即用的分布式锁组件可以移步我的另一篇文章:《自己写分布式锁-基于redission》,本文中我只对Redisson的分布式锁实现进行深度解析,具体的使用及封装过程还请读者自行阅读我的博文。
关于Redisson的分布式锁,在github上有较为详细的官方文档,分布式锁和同步器,我们这里挑重点进行讲解。
下文中的部分代码引自官方文档,此处做统一声明。
Redisson分布式锁
这部分对Redisson分布式锁进行较为全面的介绍。
Redisson分布式锁–可重入锁
基于Redis的Redisson分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。
一种常见的使用方式如下:
RLock lock = redisson.getLock("anyLock");
// 最常见的使用方法
lock.lock();
当储存这个分布式锁的Redisson节点宕机以后,且这个锁刚好是锁住的状态时,会出现锁死的情况。为了避免这种死锁情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,提供锁续约能力,不断的延长锁的有效期。
默认情况下,看门狗的检查锁的超时时间是30秒钟,这个具体的值可以通过修改Config.lockWatchdogTimeout来另行指定。
Redisson还提供了显式进行锁过期时间制定的接口,超过该时间便会对锁进行自动解锁,代码如下:
// 显式制定解锁时间,无需调用unlock方法手动解锁
lock.lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
lock.unlock();
}
Redisson还提供了异步方式的分布式锁执行方法,由于用的不多,此处不再赘述,感兴趣的同学可以自行查看官方文档。
这里还要补充一下,Redisson的分布式锁实现的优点之一,在于它的RLock对象完全符合Java的Lock规范,RLock实现了JUC的Lock接口,之所以称之为可重入锁在于只有拥有锁的进程才能解锁,当其他进程解锁则会抛出IllegalMonitorStateException错误。
这可以从RLock源码的声明出看出端倪
public interface RLock extends Lock, RLockAsync {
......
后文中我会带领读者对RLock的源码实现做一个较为详细的解读。我们先接着了解一下其余的锁实现。
Redisson分布式锁–公平锁(Fair Lock)
基于Redis的Redisson分布式可重入公平锁也是实现了java.util.concurrent.locks.Lock接口的一种RLock对象。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个线程都处于等待状态,那么后面的线程会等待至少25秒。
一种常见的Redisson公平锁使用方式如下:
RLock fairLock = redisson.getFairLock("anyLock");
// 最常见的使用方法
fairLock.lock();
公平锁实现同样具有自动续约的能力,该能力也是通过看门狗实现,与上文提到的重入锁RLock原理完全相同。下文中提到的锁类型也具有该能力,因此不再赘述,读者只要记住,这些类型的锁都能通过看门狗实现锁自动续约,且看门狗检查锁超时时间默认为30s,该参数可以通过修改Config.lockWatchdogTimeout自行配置。
公平锁也可以显式制定锁的加锁时长:
// 10秒钟以后自动解锁
// 无需调用unlock方法手动解锁
fairLock.lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
...
fairLock.unlock();
Redisson分布式锁–联锁(MultiLock)
基于Redis的Redisson分布式联锁RedissonMultiLock对象可以将多个RLock对象关联为一个联锁,每个RLock对象实例可以来自于不同的Redisson实例。
这种锁类型挺有意思的,它为我们提供了多重锁机制,当所有的锁均加锁成功,才认为成功,调用的代码如下,(个人认为使用场景并不算多,因此作为了解即可)
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 所有的锁都上锁成功才算成功。
lock.lock();
...
lock.unlock();
Redisson分布式锁–红锁(RedLock)
红锁是Redisson实现的一种高可用的分布式锁实现,因此此处对红锁做一个较为详细的展开。
基于Redis的Redisson红锁RedissonRedLock对象实现了Redlock介绍的加锁算法。该对象也可以用来将多个RLock对象关联为一个红锁,每个RLock对象实例可以来自于不同的Redisson实例。
基于上文对红锁的概述,我们可以得知,红锁是一个复合锁,且每一个锁的实例是位于不同的Redisson实例上的。
看一段红锁的使用样例:
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 红锁在大部分节点上加锁成功就算成功。
lock.lock();
...
lock.unlock();
红锁同样能够显示制定加锁时间:
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 给lock1,lock2,lock3加锁,如果没有手动解开的话,10秒钟后将会自动解开
lock.lock(10, TimeUnit.SECONDS);
// 为加锁等待100秒时间,并在加锁成功10秒钟后自动解开
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();
这里引用一下官网对红锁算法实现的举例截图:
我们可以从中提取出红锁实现的关键点:半数以上节点获取锁成功,才认为加锁成功,某个节点超时就去下一个继续获取。
这里体现出分布式领域解决一致性的一种常用思路:多数派思想。这种思想在Raft算法、Zab算法、Paxos算法中都有所体现。
Redisson分布式锁–读写锁(ReadWriteLock)
Redisson同样实现了java.util.concurrent.locks.ReadWriteLock接口,使得其具有了读写锁能力。其中,读锁和写锁都继承了RLock接口。
同上述的锁一样,读写锁同样是分布式的。
分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。
一种常见的使用方式如下:
RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常见的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();
按照惯例,我们接着看下显式方式指定加锁时长的读写锁的调用方式:
// 10秒钟以后自动解锁
// 无需调用unlock方法手动解锁
rwlock.readLock().lock(10, TimeUnit.SECONDS);
// 或
rwlock.writeLock().lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
// 或
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();
Redisson同时还实现了分布式AQS同步器组件,如:分布式信号量(RSemaphore)、可过期行分布式信号量(RPermitExpirableSemaphore)、分布式闭锁(RCountDownLatch)等,由于本文主要讲解锁相关的内容,因此不再进行展开介绍,感兴趣的同学可以自行查看官方文档及源码。
Redisson分布式锁源码解析
这一章节我将重点对Redisson中的重入锁(RLock)实现机制进行源码级别的讨论。
源码结构
我们从Redisson的github官方仓库下载最新的Redisson代码,导入IDEA中进行查看,源码结构如下:
图中红框圈住的模块即为Redisson的内核模块,也是我们阅读源码的重点。
分布式锁部分的源码实现在如下路径
redisson-master
|-redisson
|-src
|-main
|-java
|-org.redisson
我们逐级展开即可查看关键源码,那么废话不多说,直接看代码。
源码解析
笔者看源码的方式应当也是贴近的主流的方式,我一般会从一个demo开始,从代码的入口逐层深入进行阅读,我们首先找一段重入锁的demo。
RLock lock = redisson.getLock(lockName);
boolean getLock = false;
try {
getLock = rLock.tryLock(0, expireSeconds, TimeUnit.SECONDS);
if (getLock) {
LOGGER.info("获取Redisson分布式锁[成功],lockName={}", lockName);
} else {
LOGGER.info("获取Redisson分布式锁[失败],lockName={}", lockName);
}
} catch (InterruptedException e) {
LOGGER.error("获取Redisson分布式锁[异常],lockName=" + lockName, e);
e.printStackTrace();
return false;
}
return getLock;
这段代码截取自笔者封装的分布式锁组件,目前star数为92,源码地址 ,感兴趣的可以帮我点个star,哈哈。
首先,通过 redisson.getLock(lockName); 获取RLock锁实例,lockName一般为具有业务标识的分布式锁key。
获取RLock实例
先看下如何获取RLock实例:
进入Redisson.java类,找到如下代码:
@Override
public RLock getLock(String name) {
return new RedissonLock(connectionManager.getCommandExecutor(), name, id);
}
此处的id为UUID。
protected final UUID id = UUID.randomUUID();
可以看到是调用了重载方法,点进去,跳入RedissonLock.java,通过类声明可以看到该类实现了RLock接口,声明及构造方法如下:
public class RedissonLock extends RedissonExpirable implements RLock {
...省略部分代码...
protected static final LockPubSub PUBSUB = new LockPubSub();
final CommandAsyncExecutor commandExecutor;
public RedissonLock(CommandAsyncExecutor commandExecutor, String name, UUID id) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = id;
// 看门狗锁续约检查时间周期,默认30s
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
}
通过该构造方法构造了RedissonLock实例,其中internalLockLeaseTime即为看门狗的检查锁的超时时间,默认为30s。该参数可通过修改Config.lockWatchdogTimeout来指定新值。
tryLock加锁逻辑
当获取获取了锁实例成功后,进行尝试加锁操作,代码如下:
boolean getLock = rLock.tryLock(0, expireSeconds, TimeUnit.SECONDS);
进入RedissonLock.java查看实现。
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
final long threadId = Thread.currentThread().getId();
// 申请锁,返回还剩余的锁过期时间
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
// 如果ttl为空,表示锁申请成功
if (ttl == null) {
return true;
}
time -= (System.currentTimeMillis() - current);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
current = System.currentTimeMillis();
// 订阅监听redis的消息,并创建RedissonLockEntry
// 其中,RedissonLockEntry中比较关键的是一个Semaphore
// 属性对象用来控制本地的锁的请求的信号量同步,返回Netty框架的Future
final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
// 阻塞等待subscribe的future的结果对象,如果subscribe方法调用超过了time,
// 说明已经超过了客户端设置的最大的wait time,直接返回false,取消订阅,并且不会再继续申请锁
if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
@Override
public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
if (subscribeFuture.isSuccess()) {
unsubscribe(subscribeFuture, threadId);
}
}
});
}
acquireFailed(threadId);
return false;
}
try {
time -= (System.currentTimeMillis() - current);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
while (true) {
long currentTime = System.currentTimeMillis();
// 再次尝试申请一次锁
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
// 获得锁并返回
if (ttl == null) {
return true;
}
time -= (System.currentTimeMillis() - currentTime);
if (time <= 0) {
// 不等待申请锁并返回
acquireFailed(threadId);
return false;
}
// waiting for message
// 阻塞等待锁
currentTime = System.currentTimeMillis();
// 通过信号量(共享锁)进行阻塞,等待解锁消息
// 如果剩余时间 TTL 小于wait time,就在ttl时间内
// 从Entry的信号量获取一个许可(除非发生中断或者一直不存在可用的许可)
// 否则就在wait time时间范围内等待可以通过的信号量
if (ttl >= 0 && ttl < time) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
// 更新等待时间,(最大等待时间-已经消耗的阻塞时间)
time -= (System.currentTimeMillis() - currentTime);
if (time <= 0) {
// 等待时间小于等于0,不等待申请锁并返回
acquireFailed(threadId);
return false;
}
}
} finally {
// 无论最终获取锁是否成功,都需要取消订阅解锁消息,防止死锁发生。
unsubscribe(subscribeFuture, threadId);
}
}
上面这段核心代码逻辑中,我们重点关注下tryAcquire(long leaseTime, TimeUnit unit),调用加锁逻辑主要就在这段代码逻辑中
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
点进去看一下 get(tryAcquireAsync(leaseTime, unit, threadId))
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
...省略部分逻辑...
}
ryAcquire(long leaseTime, TimeUnit unit)只针对leaseTime的不同参数进行对应的转发处理逻辑。
trylock的无参方法就是直接调用了 get(tryLockInnerAsync(Thread.currentThread().getId()));
我们接着看一下核心的tryLockInnerAsyn,它返回的是一个future对象,是为了通过异步方式对IO进行处理从而提高系统吞吐量。
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// 检查key是否已被占用,如果没有则设置超时时间及唯一标识,初始化value=1
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 锁重入的情况,判断锁的key field,一致的话,value加1
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 返回剩余的过期时间
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
这里解释下这段加锁的Lua脚本具体的参数:
- KEYS[1] :需要加锁的key,这里需要是字符串类型。
- ARGV[1] :锁的超时时间,防止死锁
- ARGV[2] :锁的唯一标识,也就是刚才介绍的 id(UUID.randomUUID()) + “:” + threadId
执行这段Lua脚本当返回空,说明获取到锁;如果返回一个long数值(pttl 命令的返回值),则表明锁已被占用,通过返回剩余时间,外部可以做一些等待时间的判断及调整的逻辑。
tryLock(long waitTime, long leaseTime, TimeUnit unit) 有leaseTime参数的申请锁方法会按照leaseTime时间来自动释放锁。
对于没有leaseTime参数的情况,比如tryLock()或者tryLock(long waitTime, TimeUnit unit)以及lock()是会一直持有锁的。
unlock解锁逻辑
解锁的核心逻辑也是通过Lua脚本实现的,可以看出Redisson也是通过脚本来保证加锁、解锁的原子性,这与我们在文章开头时候的讲解也是保持一致的。
我们接着看一下unlock()方法的核心逻辑。
@Override
public void unlock() {
// 解锁核心逻辑
Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
// 解锁返回空,抛出异常
if (opStatus == null) {
throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}
if (opStatus) {
// 解锁成功之后取消更新锁expire的时间的任务
cancelExpirationRenewal();
}
}
当解锁成功之后,调用cancelExpirationRenewal(),移除更新锁expire时间的任务,也就是锁都不存在了,也就没必要再进行锁过期时间续约了。简单看下它的代码实现:
void cancelExpirationRenewal() {
Timeout task = expirationRenewalMap.remove(getEntryName());
if (task != null) {
task.cancel();
}
}
进入unlockInnerAsync方法。
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 如果锁的key已经不存在,表明锁已经被解锁,直接发布redis消息
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
// key和field不匹配,说明当前的客户端线程并没有持有锁,不能进行主动解锁操作。
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// 将value减1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
// 如果counter>0表明锁进行了重入,不能删除key,也就是不进行解锁操作
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
// 否则删除key并发布解锁消息进行解锁
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
可以看到这里是通过Lua脚本执行的解锁,那么我们来分析下这段脚本的具体含义。
- KEYS[1] :需要加锁的key,这里需要是字符串类型。
- KEYS[2] :redis消息的ChannelName,一个分布式锁对应唯一的一个channelName:“redisson_lockchannel{” + getName() + “}”
- ARGV[1] :reids消息体,这里只需要一个字节的标记就可以,主要标记redis的key已经解锁,再结合redis的Subscribe,能唤醒其他订阅解锁消息的客户端线程申请锁。
- ARGV[2] :锁的超时时间,防止死锁
- ARGV[3] :锁的唯一标识,也就是刚才介绍的 id(UUID.randomUUID()) + “:” + threadId
从代码的注释应当能够较为清楚的把握解锁的核心脉络。
额外提一下,我们可以看到在lua解锁脚本中使用了publish命令,它的作用为:
通过在锁的唯一通道发布解锁消息,能够减少其他分布式节点的等待或者空转,整体上能提高加锁效率。
我们在看下Redisson如何处理unlock消息,此处的消息的内容即:unlockMessage = 0L。它和unlock方法中publish的内容是对应的。
public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {
// 解锁消息
public static final Long UNLOCK_MESSAGE = 0L;
public static final Long READ_UNLOCK_MESSAGE = 1L;
...省略部分逻辑...
@Override
protected void onMessage(RedissonLockEntry value, Long message) {
// 如果订阅的消息为解锁消息,UNLOCK_MESSAGE = 0L
if (message.equals(UNLOCK_MESSAGE)) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute != null) {
runnableToExecute.run();
}
// 释放一个许可,并唤醒等待entry.
value.getLatch().release();
}
......
}
}
lock方法
除了tryLock方式能够获取锁外,Redisson还提供了lock方法直接获取锁,我们再看下它是如何进行锁获取操作的。
@Override
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
看下lockInterruptibly的具体逻辑。
@Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}
点击去看下lockInterruptibly(long leaseTime, TimeUnit unit)这个重载。
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
// 尝试获取锁
Long ttl = tryAcquire(leaseTime, unit, threadId);
// 锁获取成功
if (ttl == null) {
return;
}
// 通过异步方式订阅Redis的channel,阻塞方式获取订阅结果
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
// 通过循环判断,直到锁获取成功,经典写法。
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// 锁获取成功,跳出循环
if (ttl == null) {
break;
}
// 如果剩余时间 TTL 大于0,从Entry的信号量获取一个许可(除非发生中断或者一直不存在可用的许可)
// 否则就在wait time时间范围内等待可以通过的信号量
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
// 无论最终获取锁是否成功,都需要取消订阅解锁消息,防止死锁发生。
unsubscribe(future, threadId);
}
}
这段逻辑是不是有种很熟悉的感觉,它和我们上文中讲到的tryLock逻辑很像。具体的逻辑在注释中已经写得比较清晰了就不再赘述。
到此就是Redisson重入锁加解锁核心逻辑的源码解析,相信会为聪明的你一些帮助。
总结
本文,我们从分布式锁的概述入手,对Redis实现分布式锁的原理进行了较为全面的剖析。并且重点对Redisson的分布式锁实现进行了详细的讲解,从笔者对Redisson的封装类库的调用实例入手,对Redisson的重入锁进行了深入的源码解析。经过这一系列的学习,深入浅出了Redis/Redisson分布式锁的实现机理,相信之后遇到的类似问题,我们一定可以胸有成竹。
更多分布式锁的实现及源码解析,将会陆续发布,请拭目以待。
参考链接
版权声明:
原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。