本文涉及内容:
- 分布式锁介绍;
- 用数据表做分布式锁原理介绍 & 数据表设计;
- 用redis做分布式锁原理介绍 & 代码实 *** ;
- 用redisson做分布式锁原理介绍 & 代码实 *** ;
- 用zookeeper做分布式锁原理介绍;
- 用curator做分布式锁代码实 *** ;
- 实现分布式锁的各方案比较;
- 完整项目的GitHub地址
1、锁的应用场景:
在单体应用中,我们会使用ReentrantLock或Synchronized来应对并发场景。
比如最常见的卖票场景,假如总共有100张票,线程A和线程B同时 *** 作,如下图:
这时有一个共享变量100,线程A和B将100拷贝到自己的工作内存中,当线程A抢到执行权的时候,此时A工作内存中的值是100,然后售票,进行自减 *** 作,将自己工作内存中的值变成了99。当A还没来得及将99刷回到主内存的时候,线程B进来了,此时B拿到的主内存的值还是100,然后售票,进行自减,也是99。这就出现了同一张票出售了两次的情况。所以我们会加锁加volatile保证原子性保证可见性。
2、分布式锁是什么?
上面的场景中,我们可以通过ReentrantLock或者Synchronized搞定,因为你的项目只运行在一台服务器上,只有一个JVM,所有的共享变量都加载到同一个主内存中。而分布式应用中,一个项目部署在多台服务器上,最基本的架构如下图:
比如现在server1、server2和server3读取到数据库的票数都是100,在每一个server中,我们可以用JDK的锁来保证多个用户同时访问我这台server时不会出问题。但问题是,如果client1访问到的是server1,票数是100,然后购票,还没来得及将数据库票数改为99,client2也开始访问系统购票了,client2如果访问的是server1,自然不会出问题,如果访问的是server2,这时server2读取到数据库的票数还是100,那么就出问题了,又出现了同一张票卖了两次的情况。在分布式应用中,JDK的锁机制就无法满足需求了,所以就出现了分布式锁。
3、分布式锁应该满足的条件:
- 四个一:同一个方法在同一时刻只能被一台机器的一个线程执行
- 三个具备:具备可重入特性;具备锁失效机制,防止死锁;具备非阻塞锁特性,即没获取到锁返回获取锁失败,而不是一直等待
- 两个高:高性能地获取与释放锁;高可用的获取与释放锁
4、分布式锁的实现方式:
- 基于数据库:用数据库的排他锁实现
- 基于redis:利用redis的set key value NX EX 30000;也可以用redis的第三方库比如Redisson
- 基于zookeeper:利用zookeeper的临时顺序节点实现;也可以用zookeeper的第三方库比如Curator
1、建表:
CREATE TABLE `tb_distributed_lock` ( `dl_id` INT NOT NULL auto_increment COMMENT '主键,自增', `dl_method_name` VARCHAr (64) NOT NULL DEFAULT '' COMMENT '方法名', `dl_device_info` VARCHAr (100) NOT NULL DEFAULT '' COMMENT 'ip+线程id', `dl_operate_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATe CURRENT_TIMESTAMP COMMENT '数据被 *** 作的时间', PRIMARY KEY (`dl_id`), UNIQUE KEY `uq_method_name` (`dl_method_name`) USING BTREE ) ENGINE = INNODB DEFAULT charset = utf8 COMMENT = '分布式锁表';
2、思路:
当执行一个方法的时候,我们首先尝试往表中插入一条数据。如果插入成功,则占锁成功,继续往下执行,执行完删除该记录。如果插入失败,我们再以当前方法名、当前机器ip+线程id、数据被 *** 作时间为5分钟内(5分钟表示锁失效的时间)为条件去查询,如果有记录,表示该机器的该线程在5分钟内占有过锁了,直接往下执行最后删除记录;如果没有记录,占有锁失败。
一个用户就是一个线程,所以我们可以把机器ip和用户id组合一起当成dl_device_info。
3、占有锁和释放锁:
- 占有锁:
INSERT INTO tb_distributed_lock ( dl_method_name, dl_device_info ) VALUES ('方法名', 'ip&用户id');
如果insert失败,则:
SELECT count(*) FROM tb_distributed_lock WHERe dl_method_name = '方法名' AND dl_device_info = 'ip&用户id' AND dl_operate_time < SYSDATE() - 5;
- 释放锁:
DELETe FROM tb_distributed_lock WHERe dl_method_name = '方法名' AND dl_device_info = 'ip&用户id';
4、小总结:
以上表结构可能并不是很好,只是提供了这么一个思路。下面说它的优缺点:
- 优点:成本低,不需要引入其他的技术
- 缺点:对数据库依赖性强,如果数据库挂了,那就凉凉了,所以数据库最好也是高可用的
1、原理:
基于redis的set key value nx ex 30,这条语句的意思就是如果key不存在就设置,并且过期时间为30s,如果key已经存在就会返回false。如果要以毫秒为单位,把ex换成px就好了。我们执行方法前,先将方法名当成key,执行这条语句,如果执行成功就是获取锁成功,执行失败就是获取锁失败。
2、代码实现:
- RedisUtil的部分代码:
public static boolean setIfAbsent(String key, String value, Long timeout) { return redisTemplate.opsForValue().setIfAbsent(key, value, timeout, TimeUnit.SECONDS); } public static String getString(String key) { return (String) redisTemplate.opsForValue().get(key); } public static boolean delKey(String key) { return redisTemplate.delete(key); }
- 业务方法中使用:
public String hello() { // 方法名当作key String key = "hello"; String value = "hellolock"; if (RedisUtil.setIfAbsent(key, value, 60 * 2L)) { System.out.println("成功获取到锁,开始执行业务逻辑……"); // 假如执行业务逻辑需要1分钟 try {TimeUnit.MINUTES.sleep(1L); } catch (Exception e) { e.printStackTrace();}; // 释放锁先校验value,避免释放错 if (value.equals(RedisUtil.getString(key))) { RedisUtil.delKey(key); System.out.println("执行完业务逻辑,释放锁成功"); } return "success"; } else { System.out.println("锁被别的线程占有,获取锁失败"); return "acquire lock failed"; } }
3、小总结:
-
优点:简单易用,一条redis命令就搞定。可以设置过期时间,避免释放锁失败造成其他线程长时间无法获取锁的问题。
-
缺点:这种做法只适合redis是单机的时候,如果redis有集群,这样做就会出问题。假如一个线程在master上获取锁成功了,在master还没来得及将数据同步到slave上的时候,master挂了,slave升级为master。第二个线程进来尝试获取锁,因为新的master上并没有这个key,所以,也能成功获取到锁。
-
解决办法:针对上面的缺点,我们可以采用redis的RedLock算法。假如集群中有n个redis,我们先从这n个redis中尝试获取锁(锁的过期时间为x),并记录获取锁的消耗的总时间t,获取锁成功数量为s,当且仅当t < x 并且 s >= (n/2 + 1)时,认为获取锁成功。
1、是什么?
官网地址:https://github.com/redisson/redisson/wiki/Table-of-Content
Redisson是一个功能十分强大的redis客户端,封装了很多分布式 *** 作,比如分布式对象、分布式集合、分布式锁等。它的分布式锁也很多,什么公平锁、可重入锁、redlock等一应俱全,下面来看看如何在springboot项目中使用它。
2、使用redisson做分布式锁:
- 添加依赖:
org.redisson redisson-spring-boot-starter3.12.3 io.netty netty-all
- application.yml:
spring: application: name: distributed-lock redis: # redis单机版的写法 host: 192.168.2.43 port: 6379 # 集群的写法 #cluster: #nodes: #- 192.168.0.106,192.168.0.107 #哨兵的写法 #sentinel: #master: 192.168.0.106 #nodes: #- 192.168.0.107,192.168.0.108
- 用法:直接注入RedissonClient,然后用它获取锁,得到锁之后就可以进行占锁和释放锁了。有阻塞式锁,也有非阻塞式锁,具体用法如下:
@Autowired private RedissonClient redisson; @GetMapping("/testLock") public String testLock() { log.info("进入testLock方法,开始获取锁"); String key = "testLock"; RLock lock = redisson.getLock(key); lock.lock(); log.info("获取锁成功,开始执行业务逻辑……"); try {TimeUnit.SECONDS.sleep(10L); } catch (Exception e) { e.printStackTrace();}; log.info("执行完业务逻辑,释放锁"); lock.unlock(); return "success"; } @GetMapping("/testTryLock") public String testTryLock() { log.info("进入testTryLock方法,开始获取锁"); String key = "testTryLock"; RLock lock = redisson.getLock(key); boolean res = lock.tryLock(); if (!res) { log.error("尝试获取锁失败"); return "fail"; } else { log.info("获取锁成功,开始执行业务逻辑……"); try {TimeUnit.SECONDS.sleep(30L); } catch (Exception e) { e.printStackTrace();}; log.info("执行完业务逻辑,释放锁"); lock.unlock(); return "success"; } } @GetMapping("/testLockTimeout") public String testLockTimeout() { log.info("进入testLockTimeout方法,开始获取锁"); String key = "testLockTimeout"; RLock lock = redisson.getLock(key); // 20秒后自动释放锁 lock.lock(20, TimeUnit.SECONDS); log.info("获取锁成功,开始执行业务逻辑……"); try {TimeUnit.SECONDS.sleep(10L); } catch (Exception e) { e.printStackTrace();}; lock.unlock(); return "success"; } @GetMapping("/testTryLockTimeout") public String testTryLockTimeout() { log.info("进入testTryLockTimeout方法,开始获取锁"); String key = "testTryLockTimeout"; RLock lock = redisson.getLock(key); boolean res = false; try { res = lock.tryLock(15, 20, TimeUnit.SECONDS); } catch (InterruptedException e1) { e1.printStackTrace(); } if (!res) { log.error("尝试获取锁失败"); return "fail"; } else { log.info("获取锁成功,开始执行业务逻辑……"); try {TimeUnit.SECONDS.sleep(10L); } catch (Exception e) { e.printStackTrace();}; log.info("执行完业务逻辑,释放锁"); lock.unlock(); return "success"; } }
3、小总结:
以上就是使用redisson做分布式锁的简单demo,用起来十分的方便。上面是与springboot项目集成,直接用它提供的springboot的starter就好了。用它来做分布式锁的更多用法请移步至官网:redisson分布式锁。
1、zookeeper知识点回顾:
zookeeper有四种类型的节点:
-
持久节点:默认的节点类型,客户端与zookeeper断开连接后,节点依然存在
-
持久顺序节点:首先是持久节点,顺序的意思是,zookeeper会根据节点创建的顺序编号
-
临时节点:客户端与zookeeper断开连接后节点不复存在
-
临时顺序节点:客户端与zookeeper断开连接后节点不复存在,zookeeper会根据节点创建的顺序编号
2、基于zookeeper实现分布式锁的原理:
我们正是利用了zookeeper的临时顺序节点来实现分布式锁。首先我们创建一个名为lock(节点名称随意)的持久节点。线程1获取锁时,就在lock下面创建一个名为lock1的临时顺序节点,然后查找lock下所有的节点,判断自己的lock1是不是第一个,如果是,获取锁成功,继续执行业务逻辑,执行完后删除lock1节点;如果不是第一个,获取锁失败,就watch排在自己前面一位的节点,当排在自己前一位的节点被干掉时,再检查自己是不是排第一了,如果是,获取锁成功。图解过程如下:
线程1创建了一个lock1,发现lock1的第一个节点,占锁成功;在线程1还没释放锁的时候,线程2来了,创建了一个lock2,发现lock2不是第一个,便监控lock1,线程3此时进行就监控lock2。直到自己是第一个节点时才占锁成功。假如某个线程释放锁的时候zookeeper崩了也没关系,因为是临时节点,断开连接节点就没了,其他线程还是可以正常获取锁,这就是要用临时节点的原因。
说清楚了原理,用代码实现也就不难了,可以引入zookeeper的客户端zkClient,自己写代码实现(偷个懒,自己就不写了,有兴趣的可以参考我zookeeper的文章,肯定可以自己写出来的)。不过有非常优秀的开源解决方案比如curator,下面就看看curator怎么用。
六、基于curator实现1、springboot整合curator:
- pom.xml:
org.apache.zookeeper zookeeper3.4.14 org.apache.curator curator-framework4.2.0 org.apache.curator curator-recipes4.2.0 org.seleniumhq.selenium selenium-java
- application.yml:注意,curator下面这些属性spring是没有集成的,也就是说写的时候不会有提示
curator: retryCount: 5 # 连接失败的重试次数 retryTimeInterval: 5000 # 每隔5秒重试一次 url: 192.168.2.43:2181 # zookeeper连接地址 sessionTimeout: 60000 # session超时时间1分钟 connectionTimeout: 5000 # 连接超时时间5秒钟
- 配置类:读取application.yml中的属性,创建Curatorframework实例
@Configuration public class CutatorConfig { @Value("${curator.retryCount}") private Integer retryCount; @Value("${curator.retryTimeInterval}") private Integer retryTimeInterval; @Value("${curator.url}") private String url; @Value("${curator.sessionTimeout}") private Integer sessionTimeout; @Value("${curator.connectionTimeout}") private Integer connectionTimeout; @Bean public Curatorframework curatorframework() { return CuratorframeworkFactory.newClient(url, sessionTimeout, connectionTimeout, new RetryNTimes(retryCount, retryTimeInterval)); } }
- 测试类:测试整合curator框架是否成功
@SpringBootTest(classes = {DistributedLockApplication.class}) @RunWith(SpringRunner.class) public class DistributedLockApplicationTests { @Autowired private Curatorframework curatorframework; @Test public void contextLoads() { curatorframework.start(); try { curatorframework.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/zhusl", "test".getBytes()); } catch (Exception e) { e.printStackTrace(); } } }
在确保zookeeper成功启动了的情况下,执行这个单元测试,最后回到linux中,用zkCli.sh连接,查看是否成功创建节点。
2、使用Curator做分布式锁:
Curator封装了很多锁,比如可重入共享锁、不可重入共享锁、可重入读写锁、联锁等。具体可以参考官网:curator分布式锁的用法。
- ZookeeperUtil.java:工具类,封装获取锁,释放锁等方法。这里主要简单地封装了上面说的四种锁,仅供参考。
@Component @Slf4j public class ZookeeperUtil { private static Curatorframework curatorframework; private static InterProcessLock lock; private final static String ROOT_PATH = "/lock/"; private static InterProcessMutex interProcessMutex; private static InterProcessSemaphoreMutex interProcessSemaphoreMutex; private static InterProcessReadWriteLock interProcessReadWriteLock; private static InterProcessMultiLock interProcessMultiLock; @Autowired private void setCuratorframework(Curatorframework curatorframework) { ZookeeperUtil.curatorframework = curatorframework; ZookeeperUtil.curatorframework.start(); } public static boolean interProcessMutex(String lockName) { interProcessMutex = new InterProcessMutex(curatorframework, ROOT_PATH + lockName); lock = interProcessMutex; return acquireLock(lockName, lock); } public static boolean interProcessSemaphoreMutex(String lockName) { interProcessSemaphoreMutex = new InterProcessSemaphoreMutex(curatorframework, ROOT_PATH + lockName); lock = interProcessSemaphoreMutex; return acquireLock(lockName, lock); } public static boolean interProcessReadLock(String lockName) { interProcessReadWriteLock = new InterProcessReadWriteLock(curatorframework, ROOT_PATH + lockName); lock = interProcessReadWriteLock.readLock(); return acquireLock(lockName, lock); } public static boolean interProcessWriteLock(String lockName) { interProcessReadWriteLock = new InterProcessReadWriteLock(curatorframework, ROOT_PATH + lockName); lock = interProcessReadWriteLock.writeLock(); return acquireLock(lockName, lock); } public static boolean interProcessMultiLock(ListlockNames) { if (lockNames == null || lockNames.isEmpty()) { log.error("no lockNames found"); return false; } interProcessMultiLock = new InterProcessMultiLock(curatorframework, lockNames); try { if (!interProcessMultiLock.acquire(10, TimeUnit.SECONDS)) { log.info("Thread:" + Thread.currentThread().getId() + " acquire distributed lock fail"); return false; } else { log.info("Thread:" + Thread.currentThread().getId() + " acquire distributed lock success"); return true; } } catch (Exception e) { log.info("Thread:" + Thread.currentThread().getId() + " release lock occured an exception = " + e); return false; } } public static void releaseLock(String lockName) { try { if (lock != null && lock.isAcquiredInThisProcess()) { lock.release(); curatorframework.delete().inBackground().forPath(ROOT_PATH + lockName); log.info("Thread:" + Thread.currentThread().getId() + " release lock success"); } } catch (Exception e) { log.info("Thread:" + Thread.currentThread().getId() + " release lock occured an exception = " + e); } } public static void releaseMultiLock(List lockNames) { try { if (lockNames == null || lockNames.isEmpty()) { log.error("no no lockNames found to release"); return; } if (interProcessMultiLock != null && interProcessMultiLock.isAcquiredInThisProcess()) { interProcessMultiLock.release(); for (String lockName : lockNames) { curatorframework.delete().inBackground().forPath(ROOT_PATH + lockName); } log.info("Thread:" + Thread.currentThread().getId() + " release lock success"); } } catch (Exception e) { log.info("Thread:" + Thread.currentThread().getId() + " release lock occured an exception = " + e); } } private static boolean acquireLock(String lockName, InterProcessLock interProcessLock) { int flag = 0; try { while (!interProcessLock.acquire(2, TimeUnit.SECONDS)) { flag++; if (flag > 1) { break; } } } catch (Exception e) { log.error("acquire lock occured an exception = " + e); return false; } if (flag > 1) { log.info("Thread:" + Thread.currentThread().getId() + " acquire distributed lock fail"); return false; } else { log.info("Thread:" + Thread.currentThread().getId() + " acquire distributed lock success"); return true; } } }
- ZookeeperLockController.java:写一个接口,用Curator加锁,然后用浏览器进行访问
@RestController @RequestMapping("/zookeeper-lock") public class ZookeeperLockController { @GetMapping("/testLock") public String testLock() { // 获取锁 boolean lockResult = ZookeeperUtil.interProcessMutex("testLock"); if (lockResult) { try { // 模拟执行业务逻辑 TimeUnit.MINUTES.sleep(1L); } catch (InterruptedException e) { e.printStackTrace(); } // 释放锁 ZookeeperUtil.releaseLock("testLock"); return "success"; } else { return "fail"; } } }
打开一个浏览器窗口访问,后台打印出获取锁成功的日志,在1分钟之内,开启另一个窗口再次访问,打印出获取锁失败的日志,说明分布式锁生效了。
七、实现分布式锁的各方案比较- 基于数据库实现最简单,不需要引入第三方应用。但是因为每次加锁和解锁都要进行IO *** 作,性能不是很好。
- 基于redis实现比较均衡,性能很好,也不是很难,比较可靠。
- 基于zookeeper实现难度较大,因为需要维护一个zookeeper集群,如果项目原本没有用到zookeeper,还是用redis比较好。
本文项目地址:分布式锁
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)