- Redis学习之基础知识
- Redis学习之事件驱动模型
- Redis学习之集群
本文基于Spring Boot 2.6.6、redisson 3.16.0简单分析Redisson分布式锁自动续期的实现过程。
Demo 依赖<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-data-redisartifactId>
<exclusions>
<exclusion>
<groupId>io.lettucegroupId>
<artifactId>lettuce-coreartifactId>
exclusion>
exclusions>
dependency>
<dependency>
<groupId>redis.clientsgroupId>
<artifactId>jedisartifactId>
dependency>
<dependency>
<groupId>org.redissongroupId>
<artifactId>redissonartifactId>
<version>3.16.0version>
dependency>
测试代码
public class LockDemo {
private final RedissonClient redissonClient;
public LockDemo(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}
public static void main(String[] args) throws InterruptedException {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
LockDemo lockDemo = new LockDemo(Redisson.create(config));
new Thread(lockDemo::reentrantLock).start();
lockDemo.release();
}
public void release() {
this.redissonClient.shutdown();
}
public void reentrantLock() {
RLock reentrantLock = redissonClient.getLock("reentrant-lock");
reentrantLock.lock();
try {
// do something
} finally {
reentrantLock.unlock();
}
}
}
简析
获取锁
Redisson分布式锁获取有两种方式:
lock()
:未指定过期时间,实现时会设置过期时间,默认30s,然后采用Watchdog不断续期,直至释放锁;lock(long leaseTime, TimeUnit unit)
:指定过期时间,超过有效期时间后,会自动释放锁;
本文关注未指定过期时间的获取锁方式,RedissonLock.lock()
代码如下:
public void lock() {
try {
// 过期时间为-1,表示永不过期
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
if (ttl == null) {
// 获取到锁直接返回
return;
}
// 还未获取到锁
// 订阅锁,这样锁释放时会被通知到
RFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}
try {
while (true) {
ttl = tryAcquire(-1, leaseTime, unit, threadId);
if (ttl == null) {
// 获取到锁则可以退出死循环
break;
}
if (ttl >= 0) {
try {
// 指定超时时间内获取
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
// 未指定超时时间获取
if (interruptibly) {
future.getNow().getLatch().acquire();
} else {
future.getNow().getLatch().acquireUninterruptibly();
}
}
}
} finally {
// 取消锁的订阅
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime != -1) {
// 指定过期时间
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// 未指定过期时间
// 过期时间设为看门狗超时时间,然后由看门狗一直续期,直到锁释放
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
if (ttlRemaining == null) {
// 获取到锁
if (leaseTime != -1) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
// 未指定过期时间,需要开启Watchdog自动续期
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
首先看下尝试获取锁的实现,tryLockInnerAsync
方法通过EVAL执行LUA脚本,代码如下:
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
主要逻辑如下:
- 若锁不存在,则创建锁,并设置过期时间,然后返回
nil
; - 若锁存在且由本线程持有,则锁计数加一,并重设过期时间,然后返回
nil
; - 否则返回锁的过期时间;
由上可知,当返回nil
才意味着获取到锁,否则获取锁失败;
再看下开启Watchdog任务自动续期的实现,scheduleExpirationRenewal
方法代码如下:
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
// Watchdog任务已存在,则添加本次线程ID即可
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
// 创建Watchdog任务,用于重设过期时间
renewExpiration();
}
}
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
// 借助Netty的Timeout实现自动续期
// 超时时间为1/3过期时间,确保在过期前能够重设过期时间
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
// 不存在则无需继续执行
// 释放锁后会删除该Key
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
// 锁还存在,则需要继续开启Watchdog
// 递归执行,重设过期时间
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
renewExpirationAsync
方法通过EVAL执行LUA脚本实现重设锁的过期时间,代码如下:
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}
逻辑如下:
- 若锁存在,则重设过期时间,然后返回
1
; - 否则,返回
0
;
至此,Redisson获取分布式锁时通过开启Watchdog完成自动重设过期时间的实现就分析完了,接下来分析释放锁时如何关闭Watchdog;
释放锁Redisson分布式锁释放的方法为RedissonLock.unlock()
,代码如下:
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
}
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<>();
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
// 取消自动续期
cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
return;
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
result.trySuccess(null);
});
return result;
}
首先看下释放锁的实现,unlockInnerAsync
方法通过EVAL执行LUA脚本,代码如下:
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
主要逻辑如下:
- 若锁不存在,则直接返回
nil
; - 若锁计数减一后还大于0,则重设过期时间,然后返回
0
; - 否则删除锁,并发布解锁消息(通知其它线程可以获取锁),然后返回
1
;
再看下关闭Watchdog的实现,scheduleExpirationRenewal
方法代码如下:
protected void cancelExpirationRenewal(Long threadId) {
ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (task == null) {
// Watchdog任务已不存在则直接退出
return;
}
if (threadId != null) {
// 移除本次线程
task.removeThreadId(threadId);
}
// 当前Watchdog任务已经没有绑定线程,则可以关闭
if (threadId == null || task.hasNoThreads()) {
Timeout timeout = task.getTimeout();
if (timeout != null) {
// 取消Watchdog任务
timeout.cancel();
}
// 删除该Key
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
}
}
至此,Redisson释放分布式锁时关闭Watchdog任务的实现就分析完了。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)