【红锁】Redis Jedis springboot 低版本 普通红锁

【红锁】Redis Jedis springboot 低版本 普通红锁,第1张

本文借鉴了CSDN和知乎上很多内容,不在此一一列举,在此对那些贡献自己知识和经验的人儿致敬。

目标
简单在Jedis低版本(JedisClient小于3.0)中实现Redis的红(宏)锁功能,解决集群环境对业务功能或代码块(定时任务、消息队列消费场景)进行加锁处理,解决并发下产的重复执行或并发执行导致的数据固话问题,仅供参考。

红锁简介

这个小白比较懒,摘录个简介(暂时不收引流费用,等我火的):

https://blog.csdn.net/zlg40/article/details/123325830

必须的巨人肩膀依赖

<dependency>
     <groupId>redis.clientsgroupId>
     <artifactId>jedisartifactId>
     <version>2.9.1version>
dependency>
核心加解锁
/**
     * 加锁
     * @author:reoverflow@qq.com
     * @param keyPrefix 键值前缀
     * @return
     */
    public boolean lock(KeyPrefix keyPrefix){
        // 标记当前线程,防止误删其它线程锁
        String uuid = UUIDUtil.uuid();
        return this.lock(keyPrefix, "general", uuid);
    }

    /**
     * 加锁
     * @author:reoverflow@qq.com
     * @param keyPrefix 键值前缀
     * @param key       加锁唯一标识
     * @param value     释放锁唯一标识
     */
    @SneakyThrows
    public boolean lock(KeyPrefix keyPrefix, String key, String value) {
        String realKey = keyPrefix.getPrefix() + key;
        Assert.isTrue(StringUtils.isNotBlank(key), "redis locks are identified as null.");
        Assert.isTrue(StringUtils.isNotBlank(value), "the redis release lock is identified as null.");
        int cycles = CYCLES;
        //尝试获取锁,当获取到锁,则直接返回,否则,循环尝试获取
        while (!tryLock(realKey, value, keyPrefix.expireSeconds())) {
            if (0 == (cycles--)) {
                return false;
            }
            TimeUnit.MILLISECONDS.sleep(SLEEP_TIME);
        }
        return true;
    }

    /**
     * 尝试获取锁
     * @author:reoverflow@qq.com
     * @param key     加锁唯一标识
     * @param value   释放锁唯一标识(建议使用线程ID作为value)
     * @param timeout 超时时间(单位:S)
     * @return [true: 加锁成功; false: 加锁失败]
     */
    private boolean tryLock(String key, String value, Integer timeout) {
        try (Jedis jedis = this.getJedis()) {
            //String result = jedis.set(DISTRIBUTION_LOCK_KEY + key, value, SetParams.setParams().nx().px(timeout));
            // 低版本Jedis采用lua连续 *** 作保证原子性
            String script = "if redis.call('setNx',KEYS[1],ARGV[1])  then " +
                    "           if redis.call('get',KEYS[1])==ARGV[1] then " +
                    "               return redis.call('expire',KEYS[1],ARGV[2]) " +
                    "           else " +
                    "               return 0 " +
                    "           end " +
                    "        else " +
                    "           return 0" +
                    "        end ";
            List<String> paramList = new ArrayList<>();
            paramList.add(value);
            paramList.add(String.valueOf(timeout));
            Object result = jedis.eval(script, Collections.singletonList(key), paramList);
            return result != null && 1 == (long) result;
        }
    }

    /**
     * 释放锁
     * @author:reoverflow@qq.com
     * @param keyPrefix 键值前缀
     * @param key   加锁唯一标识
     * @param value 释放锁唯一标识(建议使用线程ID作为value)
     */
    public boolean unLock(KeyPrefix keyPrefix,String key, String value) {
        String realKey = keyPrefix.getPrefix() + key;
        try (Jedis jedis = this.getJedis()) {
            Assert.isTrue(StringUtils.isNotBlank(key), "redis locks are identified as null.");
            Assert.isTrue(StringUtils.isNotBlank(value), "the redis release lock is identified as null.");
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
            Object result = jedis.eval(script, Collections.singletonList(realKey), Collections.singletonList(value));
            return result != null && 1 == (long) result;
        }
    }
缓存键值包装(可忽略,比较啰嗦)
/**
 * 通用红锁
 * 缓存超时时间单位(秒)
 */
public class CommonRedLockKey extends BasePrefix {

    /*** 分布式锁过期时间 (秒)***/
    public static final int STATUS_REFRESH_JOB_EXPIRE = 30;

    public CommonRedLockKey(int expireSeconds, String prefix) {
        super(expireSeconds, prefix);
    }

    /**
     * XXX超时,刷新XX状态
     */
    public static CommonRedLockKey XXX_REFRESH_JOB = new CommonRedLockKey(STATUS_REFRESH_JOB_EXPIRE, "XXXXXRefresh");
}

/**
 * 缓存前缀基类
 */
public abstract class BasePrefix implements KeyPrefix {

    private int expireSeconds;

    private String prefix;

    public BasePrefix(String prefix) {//0代表永不过期
        this(0, prefix);
    }

    public BasePrefix(int expireSeconds, String prefix) {
        this.expireSeconds = expireSeconds;
        this.prefix = prefix;
    }

    public int expireSeconds() {//默认0代表永不过期
        return expireSeconds;
    }

    public String getPrefix() {
        String className = getClass().getSimpleName();
        return className + ":" + prefix + ":";
    }
}

测试类
@Autowired
    private RedisService redisService;

    /**
     * @author:reoverflow@qq.com
     * 测试分布式Redis锁
     */
    @SneakyThrows
    @Test
    public void testLock() {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(100, 100, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(200));
        int idx = 100;
        while (--idx > 0) {
            threadPoolExecutor.execute(() -> {
                try {
                    boolean lock = redisService.lock(CommonRedLockKey.XXX_REFRESH_JOB);
                    if (lock) {
                        //正确结果是只有一个线程获取锁成功
                        log.info("---获取锁成功------------------------------------------");
                    } else {
                        log.info("---获取锁失败,当前线程ID为---");
                    }
                } catch (Exception e) {
                    log.error("加锁出现异常", e);
                }
            });
            TimeUnit.MILLISECONDS.sleep(50);
        }
        //线程池使用局部变量使用时记得手动关闭
        threadPoolExecutor.shutdown();

        while (!threadPoolExecutor.isTerminated() || !threadPoolExecutor.getQueue().isEmpty()) {
            Thread.sleep(2000);
        }
        log.info("执行结束");
    }

    /**
     * @author:reoverflow@qq.com
     * 测试分布式Redis解锁
     */
    @SneakyThrows
    @Test
    public void testUnLock() {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(100, 100, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(200));
        redisService.set(CommonRedLockKey.XXX_REFRESH_JOB.getPrefix() + "general", "10001");
        int idx = 100;
        while (--idx > 0) {
            threadPoolExecutor.execute(() -> {
                try {
                    boolean general = redisService.unLock(CommonRedLockKey.XXX_REFRESH_JOB, "general", "10001");
                    //正确结果是只有一个线程解锁成功
                    if (general) {
                        log.info("---解锁成功------------------------------------------");
                    } else {
                        log.info("---解锁失败,当前线程ID为---{}", Thread.currentThread().getId());
                    }
                } catch (Exception e) {
                    log.error("加锁出现异常", e);
                }
            });
            TimeUnit.MILLISECONDS.sleep(50);
        }
        //线程池使用局部变量使用时记得手动关闭
        threadPoolExecutor.shutdown();

        while (!threadPoolExecutor.isTerminated() || !threadPoolExecutor.getQueue().isEmpty()) {
            Thread.sleep(2000);
        }
        log.info("执行结束");
    }

芭芭拉纳什说过:天上下雨地下滑,自己跌倒自己爬,要让朋友拉一把,还得酒换酒来茶换茶。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/915719.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-16
下一篇 2022-05-16

发表评论

登录后才能评论

评论列表(0条)

保存