Redis、ZooKeeper实现分布式锁

Redis、ZooKeeper实现分布式锁,第1张

Redis、ZooKeeper实现分布式锁 1分布式锁

锁:就是对代码块背后的资源进行锁定,在同一时间段只允许有一个线程访问修改

常用的线程安全机制:

1、sychronized    jvm 自带的锁, 可以重入,没有超时时间,不能被外部释放
  jdk 8对sychronized  优化,性能更加友好
  线程间通信  sychronized  和 wait() notify() 配合使用 2、lock(ReentrantLock可重入锁) lock 是基于java 代码实现的乐观锁 ,底层用到cas + aqs  
可以重入,由超时机制,可以被外部释放 使用起来更加灵活,可以避免死锁
线程间通信 使用 Condition  中的 await() signal() 两个程序配合使用  3、cas
1.读取a 得值(旧值)
2.比较并替换   cas(旧值,要修改的值)  
    如果旧值 和 此时此刻 的值 相等 ,则将 要修改的值 赋值给该变量a
3.修改变量a  的值 为要修改的值  
如果旧值和当前值不相等,说明被其他线程修改过

原子类: 线程的安全的自增 自减  底层就是 cas 机制
AutomicInteger
AutomicFloat
Automic*
    4、ThreadLocal ThreadLocal 就是线程副本,会为每一个线程对该变量存储一个副本,每个线程都访问自己的变量

5、volatile:不可以保证线程安全 只能保证 数据的可见性 ,禁止指令重排序

接下来咱们在一个例子中体会分布式锁

2.例子

秒杀例子

秒杀商品:

1.判断库存是否足够

2.库存充裕 则

生成订单,扣减库存

3.返回结构,生成订单

​
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
​
import javax.xml.ws.soap.Addressing;
​
@RestController
public class GoodsController {
​
​
    @Autowired
    private StringRedisTemplate redisTemplate;
​
    
    @RequestMapping("/init")
    public String init(String goods){
​
        // 初始化库存
        redisTemplate.opsForValue().set("stack_"+goods, 1000+"");
​
        // 初始化订单
        redisTemplate.opsForValue().set("order_"+goods, 0+"");
​
​
        return "当前商品:"+goods +"--库存:"+ redisTemplate.opsForValue().get("stack_"+goods)+
                "--订单:"+ redisTemplate.opsForValue().get("order_"+goods);
    }
​
​
    
    @RequestMapping("/killGoods")
    public synchronized String killGoods(String goods){
​
        String stackStr = redisTemplate.opsForValue().get("stack_"+goods);
​
        int stack = Integer.valueOf(stackStr);
​
        // 1.判断库存
        if (stack<1){// 说明没有库存
​
            return "很遗憾商品已售罄"+goods;
        }
​
        // 生成订单 订单加1  自增1
        redisTemplate.opsForValue().increment("order_"+goods);
​
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
​
        redisTemplate.opsForValue().set("stack_"+goods,(stack-1)+"");
​
​
        return "当前抢购商品成功"+goods;
​
    }
​
​
    
    @GetMapping("/getState")
    public String getState(String goods){
        return "当前商品:"+goods +"--库存:"+ redisTemplate.opsForValue().get("stack_"+goods)+
                "--订单:"+ redisTemplate.opsForValue().get("order_"+goods);
​
​
    }
}

 

获取订单状态

 

秒杀

 

使用工具进行秒杀

下载ab压力测试

ab -n 请求数 -c 并发数 访问的路径
ab   -n  5000 -c 20  http://localhost:8080/killGoods?goods=p50

 

 

可以使用 synchronized 解决线程安全

 

 

多台机器同时访问

synchronized无法解决多应用线程问题

 

 

3.使用分布式锁解决线程安全

使用redis 解决分布式线程安全问题 1.引入redis 锁

导入依赖


        org.springframework.boot
        spring-boot-starter-parent
        2.2.4.RELEASE
    

    
        
        
            org.springframework.boot
            spring-boot-starter-web
        
        
            org.springframework.boot
            spring-boot-starter-actuator
        
        
            org.springframework.boot
            spring-boot-starter-test
            test
        
        
        
            org.springframework.boot
            spring-boot-starter-data-redis
        
    

​application.properties的创建
spring.redis.host=8.130.166.101
spring.redis.port=6379

 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
​
import java.util.concurrent.TimeUnit;
​
@Component
public class RedisLock {
​
​
    @Autowired
    public StringRedisTemplate redisTemplate;
​
    
    public boolean lock(String key,String value,int time){
​
      // 通过 setnx  来判断key 的标记位是否存在
      // 如果存在说明别人已经 持有锁
      //     不存在 创建,并获得锁
      return   redisTemplate.opsForValue().setIfAbsent(key,value,time, TimeUnit.SECONDS);
    }
​
​
    
    public boolean unlock(String key){
​
     return    redisTemplate.delete(key);
    }
​
}
2.使用
   @Autowired
    private RedisLock redisLock;
​
    @RequestMapping("/redisKillGoods")
    public synchronized String redisKillGoods(String goods){
​
​
        if (redisLock.lock("lock_"+goods,"0",5)){// 拿到锁
​
            String stackStr = redisTemplate.opsForValue().get("stack_"+goods);
            int stack = Integer.valueOf(stackStr);
​
            // 1.判断库存
            if (stack<1){// 说明没有库存
​
                // 释放锁
                redisLock.unlock("lock_"+goods);
​
                return "很遗憾商品已售罄"+goods;
            }
​
            // 生成订单 订单加1  自增1
            redisTemplate.opsForValue().increment("order_"+goods);
​
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
​
            redisTemplate.opsForValue().set("stack_"+goods,(stack-1)+"");
​
            // 释放锁
            redisLock.unlock("lock_"+goods);
​
            return "当前抢购商品成功"+goods;
        }else {
​
            return "很遗憾没有抢到宝贝"+goods;
        }
​
​
    }

使用redis setnx 完成分布式锁缺陷

1.setnx 本身是两条命令 a.set值 b.配置其过期时间

2.如果redis 是主从模式,有可能造成 主从数据延迟问题

3.setnx 有时间限制,如果在使用锁期间,key时间过期了,也会造成安全问题?

解决方案:

1.使用 redis 调用lua 脚本 *** 作 setnx 保证原子性 *** 作复杂

2.我们使用 redlock (红锁) 我们通过多个key 保证一个分布式锁,通过投票决定是否获取锁,配置的key还拥有看门狗机制(可以查看key是否即将过期,如果快要过期,可以续命)
解决 遇到到 1,2,3 问题

使用zookeeper 实现分布式锁

1.引入zk 依赖

​
      
  
            org.apache.zookeeper
            zookeeper
            3.6.0
            
                
                    slf4j-api
                    org.slf4j
                
                
                    slf4j-log4j12
                    org.slf4j
                
                
                    log4j
                    log4j
                
            
        
​
        
            org.apache.curator
            curator-recipes
            4.0.1
            
                
                    slf4j-api
                    org.slf4j
                
            
        
2.容器加入zk
​
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
​
@Configuration
public class ZkConfig {
​
​
    
    @Bean
    public  Curatorframework getCuratorframework(){
​
        // 当客户端 和 服务端 连接异常时,会发起重试 每隔2s 重试1次,总共试 3次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(2000,3);
​
        Curatorframework curatorframework = CuratorframeworkFactory.builder()
                .retryPolicy(retryPolicy).connectString("192.168.12.145:2181;192.168.12.145:2182;192.168.12.145:2183").build();
​
​
        curatorframework.start();
​
        return curatorframework;
    }
​
​
​
}
3.使用zk 分布式锁
    @Autowired
    private Curatorframework curatorframework;
​
    
    @RequestMapping("/zkKillGoods")
    public synchronized String zkKillGoods(String goods) throws Exception {
​
        // 封装zk 的分布式锁 相当于 jvm synchronized 中的监视器
        InterProcessMutex interProcessMutex = new InterProcessMutex(curatorframework,"/"+goods);
​
        // interProcessMutex.acquire(5, TimeUnit.SECONDS) 去获取锁,若果没有获取到,就等待5s,还没有则放弃
        if (interProcessMutex.acquire(5, TimeUnit.SECONDS)){// 拿到锁
​
            String stackStr = redisTemplate.opsForValue().get("stack_"+goods);
            int stack = Integer.valueOf(stackStr);
​
            // 1.判断库存
            if (stack<1){// 说明没有库存
​
                // 释放锁
               interProcessMutex.release();
​
                return "很遗憾商品已售罄"+goods;
            }
​
            // 生成订单 订单加1  自增1
            redisTemplate.opsForValue().increment("order_"+goods);
​
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
​
            redisTemplate.opsForValue().set("stack_"+goods,(stack-1)+"");
​
            // 释放锁
            interProcessMutex.release();
​
            return "当前抢购商品成功"+goods;
        }else {
​
            return "很遗憾没有抢到宝贝"+goods;
        }
​
​
    }

 

总结:

使用zk 作为分布锁,比较可靠,但是效率太低,一般适用于并发量不大的场合 如果并发量太高,zk 就是最大的性能瓶颈

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5605161.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存