本文借鉴了CSDN和知乎上很多内容,不在此一一列举,在此对那些贡献自己知识和经验的人儿致敬。
目标
简单在Jedis低版本(JedisClient小于3.0)中实现Redis的红(宏)锁功能,解决集群环境对业务功能或代码块(定时任务、消息队列消费场景)进行加锁处理,解决并发下产的重复执行或并发执行导致的数据固话问题,仅供参考。
这个小白比较懒,摘录个简介(暂时不收引流费用,等我火的):
必须的巨人肩膀依赖https://blog.csdn.net/zlg40/article/details/123325830
<dependency>
<groupId>redis.clientsgroupId>
<artifactId>jedisartifactId>
<version>2.9.1version>
dependency>
核心加解锁
/**
* 加锁
* @author:reoverflow@qq.com
* @param keyPrefix 键值前缀
* @return
*/
public boolean lock(KeyPrefix keyPrefix){
// 标记当前线程,防止误删其它线程锁
String uuid = UUIDUtil.uuid();
return this.lock(keyPrefix, "general", uuid);
}
/**
* 加锁
* @author:reoverflow@qq.com
* @param keyPrefix 键值前缀
* @param key 加锁唯一标识
* @param value 释放锁唯一标识
*/
@SneakyThrows
public boolean lock(KeyPrefix keyPrefix, String key, String value) {
String realKey = keyPrefix.getPrefix() + key;
Assert.isTrue(StringUtils.isNotBlank(key), "redis locks are identified as null.");
Assert.isTrue(StringUtils.isNotBlank(value), "the redis release lock is identified as null.");
int cycles = CYCLES;
//尝试获取锁,当获取到锁,则直接返回,否则,循环尝试获取
while (!tryLock(realKey, value, keyPrefix.expireSeconds())) {
if (0 == (cycles--)) {
return false;
}
TimeUnit.MILLISECONDS.sleep(SLEEP_TIME);
}
return true;
}
/**
* 尝试获取锁
* @author:reoverflow@qq.com
* @param key 加锁唯一标识
* @param value 释放锁唯一标识(建议使用线程ID作为value)
* @param timeout 超时时间(单位:S)
* @return [true: 加锁成功; false: 加锁失败]
*/
private boolean tryLock(String key, String value, Integer timeout) {
try (Jedis jedis = this.getJedis()) {
//String result = jedis.set(DISTRIBUTION_LOCK_KEY + key, value, SetParams.setParams().nx().px(timeout));
// 低版本Jedis采用lua连续 *** 作保证原子性
String script = "if redis.call('setNx',KEYS[1],ARGV[1]) then " +
" if redis.call('get',KEYS[1])==ARGV[1] then " +
" return redis.call('expire',KEYS[1],ARGV[2]) " +
" else " +
" return 0 " +
" end " +
" else " +
" return 0" +
" end ";
List<String> paramList = new ArrayList<>();
paramList.add(value);
paramList.add(String.valueOf(timeout));
Object result = jedis.eval(script, Collections.singletonList(key), paramList);
return result != null && 1 == (long) result;
}
}
/**
* 释放锁
* @author:reoverflow@qq.com
* @param keyPrefix 键值前缀
* @param key 加锁唯一标识
* @param value 释放锁唯一标识(建议使用线程ID作为value)
*/
public boolean unLock(KeyPrefix keyPrefix,String key, String value) {
String realKey = keyPrefix.getPrefix() + key;
try (Jedis jedis = this.getJedis()) {
Assert.isTrue(StringUtils.isNotBlank(key), "redis locks are identified as null.");
Assert.isTrue(StringUtils.isNotBlank(value), "the redis release lock is identified as null.");
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(realKey), Collections.singletonList(value));
return result != null && 1 == (long) result;
}
}
缓存键值包装(可忽略,比较啰嗦)
/**
* 通用红锁
* 缓存超时时间单位(秒)
*/
public class CommonRedLockKey extends BasePrefix {
/*** 分布式锁过期时间 (秒)***/
public static final int STATUS_REFRESH_JOB_EXPIRE = 30;
public CommonRedLockKey(int expireSeconds, String prefix) {
super(expireSeconds, prefix);
}
/**
* XXX超时,刷新XX状态
*/
public static CommonRedLockKey XXX_REFRESH_JOB = new CommonRedLockKey(STATUS_REFRESH_JOB_EXPIRE, "XXXXXRefresh");
}
/**
* 缓存前缀基类
*/
public abstract class BasePrefix implements KeyPrefix {
private int expireSeconds;
private String prefix;
public BasePrefix(String prefix) {//0代表永不过期
this(0, prefix);
}
public BasePrefix(int expireSeconds, String prefix) {
this.expireSeconds = expireSeconds;
this.prefix = prefix;
}
public int expireSeconds() {//默认0代表永不过期
return expireSeconds;
}
public String getPrefix() {
String className = getClass().getSimpleName();
return className + ":" + prefix + ":";
}
}
测试类
@Autowired
private RedisService redisService;
/**
* @author:reoverflow@qq.com
* 测试分布式Redis锁
*/
@SneakyThrows
@Test
public void testLock() {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(100, 100, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(200));
int idx = 100;
while (--idx > 0) {
threadPoolExecutor.execute(() -> {
try {
boolean lock = redisService.lock(CommonRedLockKey.XXX_REFRESH_JOB);
if (lock) {
//正确结果是只有一个线程获取锁成功
log.info("---获取锁成功------------------------------------------");
} else {
log.info("---获取锁失败,当前线程ID为---");
}
} catch (Exception e) {
log.error("加锁出现异常", e);
}
});
TimeUnit.MILLISECONDS.sleep(50);
}
//线程池使用局部变量使用时记得手动关闭
threadPoolExecutor.shutdown();
while (!threadPoolExecutor.isTerminated() || !threadPoolExecutor.getQueue().isEmpty()) {
Thread.sleep(2000);
}
log.info("执行结束");
}
/**
* @author:reoverflow@qq.com
* 测试分布式Redis解锁
*/
@SneakyThrows
@Test
public void testUnLock() {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(100, 100, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(200));
redisService.set(CommonRedLockKey.XXX_REFRESH_JOB.getPrefix() + "general", "10001");
int idx = 100;
while (--idx > 0) {
threadPoolExecutor.execute(() -> {
try {
boolean general = redisService.unLock(CommonRedLockKey.XXX_REFRESH_JOB, "general", "10001");
//正确结果是只有一个线程解锁成功
if (general) {
log.info("---解锁成功------------------------------------------");
} else {
log.info("---解锁失败,当前线程ID为---{}", Thread.currentThread().getId());
}
} catch (Exception e) {
log.error("加锁出现异常", e);
}
});
TimeUnit.MILLISECONDS.sleep(50);
}
//线程池使用局部变量使用时记得手动关闭
threadPoolExecutor.shutdown();
while (!threadPoolExecutor.isTerminated() || !threadPoolExecutor.getQueue().isEmpty()) {
Thread.sleep(2000);
}
log.info("执行结束");
}
芭芭拉纳什说过:天上下雨地下滑,自己跌倒自己爬,要让朋友拉一把,还得酒换酒来茶换茶。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)