基于Redis的分布式锁实现

基于Redis的分布式锁实现,第1张

基于Redis的分布式锁实现

使用技术:SpringBoot + Redis

❗️本人初学,代码或许有些考虑不周的地方,仅做参考,其他锁相关知识请自行百度,随着后续学习,或许会更新基于Zookeeper的实现
原文链接:https://zeroclian.github.io/posts/661e05ec.html

初步结构

代码结构:

准备工作(百度一堆):

  1. 整合Redis到SpringBoot中
  2. 本地或服务器搭建好redis环境

目标:实现一个可重入自旋的分布式锁,并提供注解形式,便于使用。

源码地址:https://github.com/ZeroClian/z-blog

一、工具类


public class RedisReentrantLockUtils {

    
    public static final String MAC = "macAddress";
    
    public static final String JVM = "jvmPid";
    
    public static final String THREAD = "threadId";

    
    public static String getLockKey(ProceedingJoinPoint joinPoint) {
        DistributeLock distributeLock = getDistributeLock(joinPoint);
        StringBuffer lockKey = new StringBuffer();
        //key前缀
        if (Strings.isNotBlank(distributeLock.param())) {
            Object[] args = joinPoint.getArgs();
            try {
                Integer index = new Integer(distributeLock.param());
                Object prefix = args[index];
                if (args.length >= index) {
                    lockKey.append(prefix.toString()).append("@");
                }
            } catch (Exception e) {
            }
        }
        //根据策略生成key
        LockKeyStrategy lockKeyStrategy = distributeLock.keyStrategy();
        MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
        String signature = getSignature(methodSignature.getMethod());
        switch (lockKeyStrategy) {
            case USER_METHOD:
                // 获取请求头中的token
                HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
                return lockKey.append(request.getHeader(HttpHeaders.AUTHORIZATION) + "@" + signature).toString();
            case CUSTOM:
                return lockKey.append(Strings.isNotBlank(distributeLock.lockKey()) ? distributeLock.lockKey() : signature).toString();
            case METHOD:
            default:
                return lockKey.append(signature).toString();
        }
    }

    
    public static String getLockValue() {
        HashMap map = new HashMap<>(3);
        map.put(MAC, getMacAddress());
        map.put(JVM, getJvmId());
        map.put(THREAD, getThreadId());
        return JSON.toJSON(map);
    }

    public static String getMacAddress() {
        String address = "";
        try {
            // 获取localhost的网卡设备
            NetworkInterface networkInterface = NetworkInterface.getByInetAddress(InetAddress.getLocalHost());
            // 获取硬件地址
            byte[] hardwareAddress = networkInterface.getHardwareAddress();
            StringBuilder builder = new StringBuilder();
            for (int i = 0; i < hardwareAddress.length; i++) {
                builder.append(String.format("%02X%s", hardwareAddress[i], (i < hardwareAddress.length - 1) ? "-" : ""));
            }
            address = builder.toString();
        } catch (Exception e) {
            return "";
        }
        return address;
    }

    public static Integer getJvmId() {
        try {
            RuntimeMXBean runtimeMxBean = ManagementFactory.getRuntimeMXBean();
            Field jvm = runtimeMxBean.getClass().getDeclaredField("jvm");
            jvm.setAccessible(true);
            VMManagement vmManagement = (VMManagement) jvm.get(runtimeMxBean);
            Method pidMethod = vmManagement.getClass().getDeclaredMethod("getProcessId");
            pidMethod.setAccessible(true);
            return (Integer) pidMethod.invoke(vmManagement);
        } catch (Exception e) {
            return -1;
        }
    }

    public static long getThreadId() {
        return Thread.currentThread().getId();
    }

    
    public static DistributeLock getDistributeLock(JoinPoint joinPoint) {
        // 获取被增强的方法相关信息
        // getSignature():修饰符+ 包名+组件名(类名) +方法名
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        // 作用目标类是否有锁
        Class targetClass = joinPoint.getTarget().getClass();
        DistributeLock distributeLock = targetClass.getAnnotation(DistributeLock.class);
        if (distributeLock == null) {
            //获取方法上的锁
            Method method = signature.getMethod();
            distributeLock = method.getAnnotation(DistributeLock.class);
            return distributeLock;
        } else {
            return distributeLock;
        }
    }

    
    private static String getSignature(Method method) {
        StringBuilder builder = new StringBuilder();
        Class returnType = method.getReturnType();
        if (returnType != null) {
            builder.append(returnType.getName()).append("#");
        }
        builder.append(method.getName());
        Class[] parameters = method.getParameterTypes();
        for (int i = 0; i < parameters.length; i++) {
            if (i == 0) {
                builder.append(":");
            } else {
                builder.append(",");
            }
            builder.append(parameters[i].getName());
        }
        return builder.toString();
    }

}

二、锁的具体实现

public class RedisReentrantLock {

    private final Logger log = LoggerFactory.getLogger(RedisReentrantLock.class);
    
    private final ThreadLocal lockCount;
    
    private final String lockKey;
    
    private final String lockValue;
    
    private final long timeOut;
    
    private final LockStrategy strategy;
    
    private static final long SPIN_TIME = 500;
    
    private final long REDIS_UNLOCK_SUCCESS = 1;
    
    private final String REDIS_UNLOCK_LUA
            = " if redis.call('get',KEYS[1]) == ARGV[1] " +
            " then " +
            " return redis.call('del',KEYS[1]) " +
            " else " +
            " return 0 end";

    public RedisReentrantLock(String lockKey, String lockValue, long timeOut, LockStrategy strategy) {
        this.lockKey = lockKey;
        this.lockValue = lockValue;
        this.timeOut = timeOut;
        this.strategy = strategy;
        //初始化当前线程的重入次数
        lockCount = ThreadLocal.withInitial(AtomicInteger::new);
    }

    public void lock() throws Exception {
        Boolean getLock = RedisManager.setIfAbsent(lockKey, lockValue, timeOut);
        if (!getLock) {
            if (isLockOwner(lockKey)) {
                int count = lockCount.get().incrementAndGet();
                log.debug("重入锁[ {} ] 成功,当前LockCount: {}", lockKey, count);
                getLock = true;
            }
        }
        switch (strategy) {
            case RETRY:
                while (!getLock) {
                    log.debug("获取锁[ {}-{} ]失败", lockKey, lockValue);
                    getLock = RedisManager.setIfAbsent(lockKey, lockValue, timeOut);
                    Thread.sleep(SPIN_TIME);
                }
                log.debug("获取锁[ {}-{} ]成功", lockKey, lockValue);
                break;
            case ONCE:
                if (!getLock) {
                    throw new Exception("获取锁 " + lockKey + " 失败 , 退出方法");
                }
                log.debug("获取锁[ {}-{} ]成功", lockKey, lockValue);
                break;
            default:
                break;
        }
    }

    public void unlock() {
        int count = lockCount.get().get();
        if (count == 0) {
            if (execUnlockscript(lockKey, RedisReentrantLockUtils.getLockValue())) {
                log.debug("释放锁 [ {}-{} ] 成功", lockKey, RedisReentrantLockUtils.getLockValue());
            } else {
                log.debug("释放锁 [ {}-{} ] 失败", lockKey, RedisReentrantLockUtils.getLockValue());
            }
        } else {
            lockCount.get().decrementAndGet();
            log.debug("重入[ {} ] 锁 LockCount -1 ,当前lockCount : {}", lockKey, lockCount);
        }
    }

    
    private Boolean execUnlockscript(String lockKey, String lockValue) {
        DefaultRedisscript unlockscript = new DefaultRedisscript();
        unlockscript.setResultType(Long.class);
        unlockscript.setscriptText(REDIS_UNLOCK_LUA);
        List keys = Collections.singletonList(lockKey);
        return (Long) RedisManager.executeLuascript(unlockscript, keys, lockValue) == REDIS_UNLOCK_SUCCESS;
    }

    
    private Boolean isLockOwner(String lockKey) {
        if (RedisManager.exists(lockKey)) {
            String lockValueJsonString = RedisManager.get(lockKey);
            JsonNode lockValue = JSON.parse(lockValueJsonString, JsonNode.class);
            assert lockValue != null;
            return RedisReentrantLockUtils.getMacAddress().equals(lockValue.get(RedisReentrantLockUtils.MAC).textValue()) &&
                    RedisReentrantLockUtils.getJvmId() == lockValue.get(RedisReentrantLockUtils.JVM).intValue() &&
                    RedisReentrantLockUtils.getThreadId() == lockValue.get(RedisReentrantLockUtils.THREAD).intValue();
        }
        return false;
    }

}

三、锁的守护线程

@Slf4j
public class RedisReentrantLockDaemon implements Runnable {

    
    private static final long REDIS_EXPIRE_SUCCESS = 1;

    
    private final String lockKey;
    
    private final String lockValue;
    
    private final long timeOut;
    
    private volatile Boolean signal;

    
    private final String REDIS_EXPIRE_LUA
            = " if redis.call('get',KEYS[1]) == ARGV[1] " +
            " then " +
            " return redis.call('expire',KEYS[1],ARGV[2]) " +
            " else " +
            " return 0 end";

    public RedisReentrantLockDaemon(String lockKey, String lockValue, long timeOut) {
        this.lockKey = lockKey;
        this.lockValue = lockValue;
        this.timeOut = timeOut;
        this.signal = true;
    }

    @Override
    public void run() {
        log.debug(">>>>>>守护线程启动");
        long waitTime = timeOut * 1000 * 2 / 3;
        while (signal) {
            try {
                Thread.sleep(waitTime);
                if (execExpandTimescript(lockKey, lockValue, timeOut)) {
                    log.debug("锁 [ {} ] 延期成功", lockKey);
                } else {
                    log.debug("锁 [ {} ] 延期失败", lockKey);
                    this.stop();
                }
            } catch (InterruptedException e) {
                log.debug("锁 [ {} ] 的守护线程被中断", lockKey);
            }
        }
    }

    
    private Boolean execExpandTimescript(String lockKey, String lockValue, long timeOut) {
        DefaultRedisscript unLockscript = new DefaultRedisscript();
        unLockscript.setResultType(Long.class);
        unLockscript.setscriptText(REDIS_EXPIRE_LUA);
        List keys = Collections.singletonList(lockKey);
        return (Long) RedisManager.executeLuascript(unLockscript, keys, lockValue, timeOut) == REDIS_EXPIRE_SUCCESS;
    }

    
    private void stop() {
        this.signal = false;
    }
}

四、注解及aop处理

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@documented
public @interface DistributeLock {

    String lockKey() default "";

    
    long timeOut() default 60;

    
    String param() default "";

    
    LockKeyStrategy keyStrategy() default LockKeyStrategy.CUSTOM;

    
    LockStrategy strategy() default LockStrategy.RETRY;

}

@Slf4j
@Aspect
@Component
public class DistributeLockAspect {

    
    private static final long RETRY_TIME = 1000;

    
    private static final long RETRY_TIME_MULTIPLE = 2;

    
    @Pointcut()
    public void pointCut() {

    }

    @Around("@annotation(cn.github.zeroclian.distributedlock.annotation.DistributeLock)")
    public Object distributeLockAroundAop(ProceedingJoinPoint joinPoint) throws Throwable {
        // 获取注解
        DistributeLock distributeLock = RedisReentrantLockUtils.getDistributeLock(joinPoint);
        // 获取锁键
        String lockKey = RedisReentrantLockUtils.getLockKey(joinPoint);
        log.debug("分布式注解获取锁键:[ {} ]", lockKey);
        // 获取锁值
        String lockValue = RedisReentrantLockUtils.getLockValue();
        log.debug("分布式注解获取锁值:[ {} ]", lockValue);
        // 过期时间,避免死锁
        long timeOut = distributeLock.timeOut();
        // 获取锁的策略
        LockStrategy strategy = distributeLock.strategy();
        // 初始化锁
        RedisReentrantLock lock = new RedisReentrantLock(lockKey, lockValue, timeOut, strategy);
        // 初始化守护线程
        RedisReentrantLockDaemon daemon = new RedisReentrantLockDaemon(lockKey, lockValue, timeOut);
        Thread daemonThread = new Thread(daemon);
        try {
            lock.lock();
            daemonThread.setDaemon(Boolean.TRUE);
            daemonThread.start();
            return joinPoint.proceed();
        } finally {
            daemonThread.interrupt();
            lock.unlock();
        }
    }
}

五、相关策略类

public enum LockKeyStrategy {
    
    METHOD,
    
    USER_METHOD,
    
    CUSTOM
}
public enum LockStrategy {

    
    ONCE,
    
    RETRY
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5665577.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存