使用技术:SpringBoot + Redis
❗️本人初学,代码或许有些考虑不周的地方,仅做参考,其他锁相关知识请自行百度,随着后续学习,或许会更新基于Zookeeper的实现
原文链接:https://zeroclian.github.io/posts/661e05ec.html
初步结构
代码结构:
准备工作(百度一堆):
- 整合Redis到SpringBoot中
- 本地或服务器搭建好redis环境
目标:实现一个可重入自旋的分布式锁,并提供注解形式,便于使用。
源码地址:https://github.com/ZeroClian/z-blog
一、工具类
public class RedisReentrantLockUtils { public static final String MAC = "macAddress"; public static final String JVM = "jvmPid"; public static final String THREAD = "threadId"; public static String getLockKey(ProceedingJoinPoint joinPoint) { DistributeLock distributeLock = getDistributeLock(joinPoint); StringBuffer lockKey = new StringBuffer(); //key前缀 if (Strings.isNotBlank(distributeLock.param())) { Object[] args = joinPoint.getArgs(); try { Integer index = new Integer(distributeLock.param()); Object prefix = args[index]; if (args.length >= index) { lockKey.append(prefix.toString()).append("@"); } } catch (Exception e) { } } //根据策略生成key LockKeyStrategy lockKeyStrategy = distributeLock.keyStrategy(); MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature(); String signature = getSignature(methodSignature.getMethod()); switch (lockKeyStrategy) { case USER_METHOD: // 获取请求头中的token HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest(); return lockKey.append(request.getHeader(HttpHeaders.AUTHORIZATION) + "@" + signature).toString(); case CUSTOM: return lockKey.append(Strings.isNotBlank(distributeLock.lockKey()) ? distributeLock.lockKey() : signature).toString(); case METHOD: default: return lockKey.append(signature).toString(); } } public static String getLockValue() { HashMapmap = new HashMap<>(3); map.put(MAC, getMacAddress()); map.put(JVM, getJvmId()); map.put(THREAD, getThreadId()); return JSON.toJSON(map); } public static String getMacAddress() { String address = ""; try { // 获取localhost的网卡设备 NetworkInterface networkInterface = NetworkInterface.getByInetAddress(InetAddress.getLocalHost()); // 获取硬件地址 byte[] hardwareAddress = networkInterface.getHardwareAddress(); StringBuilder builder = new StringBuilder(); for (int i = 0; i < hardwareAddress.length; i++) { builder.append(String.format("%02X%s", hardwareAddress[i], (i < hardwareAddress.length - 1) ? "-" : "")); } address = builder.toString(); } catch (Exception e) { return ""; } return address; } public static Integer getJvmId() { try { RuntimeMXBean runtimeMxBean = ManagementFactory.getRuntimeMXBean(); Field jvm = runtimeMxBean.getClass().getDeclaredField("jvm"); jvm.setAccessible(true); VMManagement vmManagement = (VMManagement) jvm.get(runtimeMxBean); Method pidMethod = vmManagement.getClass().getDeclaredMethod("getProcessId"); pidMethod.setAccessible(true); return (Integer) pidMethod.invoke(vmManagement); } catch (Exception e) { return -1; } } public static long getThreadId() { return Thread.currentThread().getId(); } public static DistributeLock getDistributeLock(JoinPoint joinPoint) { // 获取被增强的方法相关信息 // getSignature():修饰符+ 包名+组件名(类名) +方法名 MethodSignature signature = (MethodSignature) joinPoint.getSignature(); // 作用目标类是否有锁 Class> targetClass = joinPoint.getTarget().getClass(); DistributeLock distributeLock = targetClass.getAnnotation(DistributeLock.class); if (distributeLock == null) { //获取方法上的锁 Method method = signature.getMethod(); distributeLock = method.getAnnotation(DistributeLock.class); return distributeLock; } else { return distributeLock; } } private static String getSignature(Method method) { StringBuilder builder = new StringBuilder(); Class> returnType = method.getReturnType(); if (returnType != null) { builder.append(returnType.getName()).append("#"); } builder.append(method.getName()); Class>[] parameters = method.getParameterTypes(); for (int i = 0; i < parameters.length; i++) { if (i == 0) { builder.append(":"); } else { builder.append(","); } builder.append(parameters[i].getName()); } return builder.toString(); } }
二、锁的具体实现
public class RedisReentrantLock { private final Logger log = LoggerFactory.getLogger(RedisReentrantLock.class); private final ThreadLocal lockCount; private final String lockKey; private final String lockValue; private final long timeOut; private final LockStrategy strategy; private static final long SPIN_TIME = 500; private final long REDIS_UNLOCK_SUCCESS = 1; private final String REDIS_UNLOCK_LUA = " if redis.call('get',KEYS[1]) == ARGV[1] " + " then " + " return redis.call('del',KEYS[1]) " + " else " + " return 0 end"; public RedisReentrantLock(String lockKey, String lockValue, long timeOut, LockStrategy strategy) { this.lockKey = lockKey; this.lockValue = lockValue; this.timeOut = timeOut; this.strategy = strategy; //初始化当前线程的重入次数 lockCount = ThreadLocal.withInitial(AtomicInteger::new); } public void lock() throws Exception { Boolean getLock = RedisManager.setIfAbsent(lockKey, lockValue, timeOut); if (!getLock) { if (isLockOwner(lockKey)) { int count = lockCount.get().incrementAndGet(); log.debug("重入锁[ {} ] 成功,当前LockCount: {}", lockKey, count); getLock = true; } } switch (strategy) { case RETRY: while (!getLock) { log.debug("获取锁[ {}-{} ]失败", lockKey, lockValue); getLock = RedisManager.setIfAbsent(lockKey, lockValue, timeOut); Thread.sleep(SPIN_TIME); } log.debug("获取锁[ {}-{} ]成功", lockKey, lockValue); break; case ONCE: if (!getLock) { throw new Exception("获取锁 " + lockKey + " 失败 , 退出方法"); } log.debug("获取锁[ {}-{} ]成功", lockKey, lockValue); break; default: break; } } public void unlock() { int count = lockCount.get().get(); if (count == 0) { if (execUnlockscript(lockKey, RedisReentrantLockUtils.getLockValue())) { log.debug("释放锁 [ {}-{} ] 成功", lockKey, RedisReentrantLockUtils.getLockValue()); } else { log.debug("释放锁 [ {}-{} ] 失败", lockKey, RedisReentrantLockUtils.getLockValue()); } } else { lockCount.get().decrementAndGet(); log.debug("重入[ {} ] 锁 LockCount -1 ,当前lockCount : {}", lockKey, lockCount); } } private Boolean execUnlockscript(String lockKey, String lockValue) { DefaultRedisscript unlockscript = new DefaultRedisscript(); unlockscript.setResultType(Long.class); unlockscript.setscriptText(REDIS_UNLOCK_LUA); Listkeys = Collections.singletonList(lockKey); return (Long) RedisManager.executeLuascript(unlockscript, keys, lockValue) == REDIS_UNLOCK_SUCCESS; } private Boolean isLockOwner(String lockKey) { if (RedisManager.exists(lockKey)) { String lockValueJsonString = RedisManager.get(lockKey); JsonNode lockValue = JSON.parse(lockValueJsonString, JsonNode.class); assert lockValue != null; return RedisReentrantLockUtils.getMacAddress().equals(lockValue.get(RedisReentrantLockUtils.MAC).textValue()) && RedisReentrantLockUtils.getJvmId() == lockValue.get(RedisReentrantLockUtils.JVM).intValue() && RedisReentrantLockUtils.getThreadId() == lockValue.get(RedisReentrantLockUtils.THREAD).intValue(); } return false; } }
三、锁的守护线程
@Slf4j public class RedisReentrantLockDaemon implements Runnable { private static final long REDIS_EXPIRE_SUCCESS = 1; private final String lockKey; private final String lockValue; private final long timeOut; private volatile Boolean signal; private final String REDIS_EXPIRE_LUA = " if redis.call('get',KEYS[1]) == ARGV[1] " + " then " + " return redis.call('expire',KEYS[1],ARGV[2]) " + " else " + " return 0 end"; public RedisReentrantLockDaemon(String lockKey, String lockValue, long timeOut) { this.lockKey = lockKey; this.lockValue = lockValue; this.timeOut = timeOut; this.signal = true; } @Override public void run() { log.debug(">>>>>>守护线程启动"); long waitTime = timeOut * 1000 * 2 / 3; while (signal) { try { Thread.sleep(waitTime); if (execExpandTimescript(lockKey, lockValue, timeOut)) { log.debug("锁 [ {} ] 延期成功", lockKey); } else { log.debug("锁 [ {} ] 延期失败", lockKey); this.stop(); } } catch (InterruptedException e) { log.debug("锁 [ {} ] 的守护线程被中断", lockKey); } } } private Boolean execExpandTimescript(String lockKey, String lockValue, long timeOut) { DefaultRedisscript unLockscript = new DefaultRedisscript(); unLockscript.setResultType(Long.class); unLockscript.setscriptText(REDIS_EXPIRE_LUA); Listkeys = Collections.singletonList(lockKey); return (Long) RedisManager.executeLuascript(unLockscript, keys, lockValue, timeOut) == REDIS_EXPIRE_SUCCESS; } private void stop() { this.signal = false; } }
四、注解及aop处理
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @documented public @interface DistributeLock { String lockKey() default ""; long timeOut() default 60; String param() default ""; LockKeyStrategy keyStrategy() default LockKeyStrategy.CUSTOM; LockStrategy strategy() default LockStrategy.RETRY; }
@Slf4j @Aspect @Component public class DistributeLockAspect { private static final long RETRY_TIME = 1000; private static final long RETRY_TIME_MULTIPLE = 2; @Pointcut() public void pointCut() { } @Around("@annotation(cn.github.zeroclian.distributedlock.annotation.DistributeLock)") public Object distributeLockAroundAop(ProceedingJoinPoint joinPoint) throws Throwable { // 获取注解 DistributeLock distributeLock = RedisReentrantLockUtils.getDistributeLock(joinPoint); // 获取锁键 String lockKey = RedisReentrantLockUtils.getLockKey(joinPoint); log.debug("分布式注解获取锁键:[ {} ]", lockKey); // 获取锁值 String lockValue = RedisReentrantLockUtils.getLockValue(); log.debug("分布式注解获取锁值:[ {} ]", lockValue); // 过期时间,避免死锁 long timeOut = distributeLock.timeOut(); // 获取锁的策略 LockStrategy strategy = distributeLock.strategy(); // 初始化锁 RedisReentrantLock lock = new RedisReentrantLock(lockKey, lockValue, timeOut, strategy); // 初始化守护线程 RedisReentrantLockDaemon daemon = new RedisReentrantLockDaemon(lockKey, lockValue, timeOut); Thread daemonThread = new Thread(daemon); try { lock.lock(); daemonThread.setDaemon(Boolean.TRUE); daemonThread.start(); return joinPoint.proceed(); } finally { daemonThread.interrupt(); lock.unlock(); } } }
五、相关策略类
public enum LockKeyStrategy { METHOD, USER_METHOD, CUSTOM }
public enum LockStrategy { ONCE, RETRY }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)