锁:就是对代码块背后的资源进行锁定,在同一时间段只允许有一个线程访问修改
常用的线程安全机制:
1、sychronized jvm 自带的锁, 可以重入,没有超时时间,不能被外部释放jdk 8对sychronized 优化,性能更加友好
线程间通信 sychronized 和 wait() notify() 配合使用 2、lock(ReentrantLock可重入锁) lock 是基于java 代码实现的乐观锁 ,底层用到cas + aqs
可以重入,由超时机制,可以被外部释放 使用起来更加灵活,可以避免死锁
线程间通信 使用 Condition 中的 await() signal() 两个程序配合使用 3、cas
1.读取a 得值(旧值)
2.比较并替换 cas(旧值,要修改的值)
如果旧值 和 此时此刻 的值 相等 ,则将 要修改的值 赋值给该变量a
3.修改变量a 的值 为要修改的值
如果旧值和当前值不相等,说明被其他线程修改过
原子类: 线程的安全的自增 自减 底层就是 cas 机制
AutomicInteger
AutomicFloat
Automic*
4、ThreadLocal ThreadLocal 就是线程副本,会为每一个线程对该变量存储一个副本,每个线程都访问自己的变量
5、volatile:不可以保证线程安全 只能保证 数据的可见性 ,禁止指令重排序
接下来咱们在一个例子中体会分布式锁
2.例子秒杀例子
秒杀商品:
1.判断库存是否足够
2.库存充裕 则
生成订单,扣减库存
3.返回结构,生成订单
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.xml.ws.soap.Addressing; @RestController public class GoodsController { @Autowired private StringRedisTemplate redisTemplate; @RequestMapping("/init") public String init(String goods){ // 初始化库存 redisTemplate.opsForValue().set("stack_"+goods, 1000+""); // 初始化订单 redisTemplate.opsForValue().set("order_"+goods, 0+""); return "当前商品:"+goods +"--库存:"+ redisTemplate.opsForValue().get("stack_"+goods)+ "--订单:"+ redisTemplate.opsForValue().get("order_"+goods); } @RequestMapping("/killGoods") public synchronized String killGoods(String goods){ String stackStr = redisTemplate.opsForValue().get("stack_"+goods); int stack = Integer.valueOf(stackStr); // 1.判断库存 if (stack<1){// 说明没有库存 return "很遗憾商品已售罄"+goods; } // 生成订单 订单加1 自增1 redisTemplate.opsForValue().increment("order_"+goods); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } redisTemplate.opsForValue().set("stack_"+goods,(stack-1)+""); return "当前抢购商品成功"+goods; } @GetMapping("/getState") public String getState(String goods){ return "当前商品:"+goods +"--库存:"+ redisTemplate.opsForValue().get("stack_"+goods)+ "--订单:"+ redisTemplate.opsForValue().get("order_"+goods); } }
获取订单状态
秒杀
使用工具进行秒杀
下载ab压力测试
ab -n 请求数 -c 并发数 访问的路径 ab -n 5000 -c 20 http://localhost:8080/killGoods?goods=p50
可以使用 synchronized 解决线程安全
多台机器同时访问
synchronized无法解决多应用线程问题
3.使用分布式锁解决线程安全 使用redis 解决分布式线程安全问题 1.引入redis 锁
导入依赖
application.properties的创建org.springframework.boot spring-boot-starter-parent2.2.4.RELEASE org.springframework.boot spring-boot-starter-weborg.springframework.boot spring-boot-starter-actuatororg.springframework.boot spring-boot-starter-testtest org.springframework.boot spring-boot-starter-data-redis
spring.redis.host=8.130.166.101 spring.redis.port=6379
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import java.util.concurrent.TimeUnit; @Component public class RedisLock { @Autowired public StringRedisTemplate redisTemplate; public boolean lock(String key,String value,int time){ // 通过 setnx 来判断key 的标记位是否存在 // 如果存在说明别人已经 持有锁 // 不存在 创建,并获得锁 return redisTemplate.opsForValue().setIfAbsent(key,value,time, TimeUnit.SECONDS); } public boolean unlock(String key){ return redisTemplate.delete(key); } }
2.使用
@Autowired private RedisLock redisLock; @RequestMapping("/redisKillGoods") public synchronized String redisKillGoods(String goods){ if (redisLock.lock("lock_"+goods,"0",5)){// 拿到锁 String stackStr = redisTemplate.opsForValue().get("stack_"+goods); int stack = Integer.valueOf(stackStr); // 1.判断库存 if (stack<1){// 说明没有库存 // 释放锁 redisLock.unlock("lock_"+goods); return "很遗憾商品已售罄"+goods; } // 生成订单 订单加1 自增1 redisTemplate.opsForValue().increment("order_"+goods); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } redisTemplate.opsForValue().set("stack_"+goods,(stack-1)+""); // 释放锁 redisLock.unlock("lock_"+goods); return "当前抢购商品成功"+goods; }else { return "很遗憾没有抢到宝贝"+goods; } }
使用redis setnx 完成分布式锁缺陷
1.setnx 本身是两条命令 a.set值 b.配置其过期时间
2.如果redis 是主从模式,有可能造成 主从数据延迟问题
3.setnx 有时间限制,如果在使用锁期间,key时间过期了,也会造成安全问题?
解决方案:
1.使用 redis 调用lua 脚本 *** 作 setnx 保证原子性 *** 作复杂
2.我们使用 redlock (红锁) 我们通过多个key 保证一个分布式锁,通过投票决定是否获取锁,配置的key还拥有看门狗机制(可以查看key是否即将过期,如果快要过期,可以续命)
解决 遇到到 1,2,3 问题
1.引入zk 依赖
2.容器加入zk org.apache.zookeeper zookeeper3.6.0 slf4j-api org.slf4j slf4j-log4j12 org.slf4j log4j log4j org.apache.curator curator-recipes4.0.1 slf4j-api org.slf4j
import org.apache.curator.RetryPolicy; import org.apache.curator.framework.Curatorframework; import org.apache.curator.framework.CuratorframeworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class ZkConfig { @Bean public Curatorframework getCuratorframework(){ // 当客户端 和 服务端 连接异常时,会发起重试 每隔2s 重试1次,总共试 3次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(2000,3); Curatorframework curatorframework = CuratorframeworkFactory.builder() .retryPolicy(retryPolicy).connectString("192.168.12.145:2181;192.168.12.145:2182;192.168.12.145:2183").build(); curatorframework.start(); return curatorframework; } }
3.使用zk 分布式锁
@Autowired private Curatorframework curatorframework; @RequestMapping("/zkKillGoods") public synchronized String zkKillGoods(String goods) throws Exception { // 封装zk 的分布式锁 相当于 jvm synchronized 中的监视器 InterProcessMutex interProcessMutex = new InterProcessMutex(curatorframework,"/"+goods); // interProcessMutex.acquire(5, TimeUnit.SECONDS) 去获取锁,若果没有获取到,就等待5s,还没有则放弃 if (interProcessMutex.acquire(5, TimeUnit.SECONDS)){// 拿到锁 String stackStr = redisTemplate.opsForValue().get("stack_"+goods); int stack = Integer.valueOf(stackStr); // 1.判断库存 if (stack<1){// 说明没有库存 // 释放锁 interProcessMutex.release(); return "很遗憾商品已售罄"+goods; } // 生成订单 订单加1 自增1 redisTemplate.opsForValue().increment("order_"+goods); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } redisTemplate.opsForValue().set("stack_"+goods,(stack-1)+""); // 释放锁 interProcessMutex.release(); return "当前抢购商品成功"+goods; }else { return "很遗憾没有抢到宝贝"+goods; } }
总结:
使用zk 作为分布锁,比较可靠,但是效率太低,一般适用于并发量不大的场合 如果并发量太高,zk 就是最大的性能瓶颈欢迎分享,转载请注明来源:内存溢出
评论列表(0条)