redis缓存击穿、穿透、雪崩

redis缓存击穿、穿透、雪崩,第1张

redis缓存击穿、穿透、雪崩 高并发下缓存失效问题 缓存穿透

缓存穿透:
指查询一个一定不存在的数据,由于缓存是不命中,将去查询数据库,但是数据库也无此记录,我们没有将这次查询的null写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义
风险:
利用不存在的数据进行攻击,数据库瞬时压力增大,最终导致崩溃
解决:
方法一:null结果缓存,并加入短暂过期时间【可能导致redis里面全是null,黑客每次都生成一个唯一的UUID】
方法二:布隆过滤器【不放行不存在的查询 id=UUID】【过滤器存储mysql中所有的id号】

缓存雪崩

缓存雪崩:
缓存雪崩是指在我们设置缓存时key采用了相同的过期 时间,导致缓存在某一时刻同时失效,请求全部转发到 DB,DB瞬时压力过重雪崩。
解决:
原有的失效时间基础上增加一个随机值,比如1-5分钟 随机,这样每 个缓存的过期时间的重复率就会降低, 就很难引发集体失效的事件。
如果已经出现的情况:
解决方法一:熔断、降级

缓存穿透

缓存穿透:

  • 对于一些设置了过期时间的key, 如果这些key可能 会在某些时间点被超高并发地访问,是一种非常“热点”的数据。
  • 如果这个key在大量请求同时进来前正好失效,那么 所有对这个key的数据查询都落到db, 我们称为缓存击穿。

解决:
加锁
大量并发只让一个去查,其他人等待,查到以后释放锁, 其他人获取到锁,先查缓存,就会有数据,不用去db

一条数据过期了,高并发情况下导致所有请求到达DB
解决:加分布式锁
获取到锁,先查缓存,其他人就有数据,不用去DB

分布式锁

SpringBoot整合redis作为缓存
        
            org.springframework.boot
            spring-boot-starter-data-redis
        
  • springboot2.0以后默认使用lettuce作为 *** 作redis的客户端。他使用netty进行网络通信
  • lettuce的bug导致netty堆外内存溢出 -Xmx1024m;netty如果没有指定堆外内存,默认使用-Xmx1024m,跟jvm设置的一样【迟早会出异常】
    • 可以通过-Dio.netty.maxDirectMemory进行设置【仍然会异常】
      解决方案:不能使用-Dio.netty.maxDirectMemory
      • 1)升级lettuce客户端;【2.3.2已解决】【lettuce使用netty吞吐量很大】
      
            io.lettuce
            lettuce-core
            5.3.7.RELEASE
        
      
      • 2)切换使用jedis客户端【这里学习一下如何使用jedis,但是最后不选用】
      
            org.springframework.boot
            spring-boot-starter-data-redis
            
                
                    io.lettuce
                    lettuce-core
                
            
        
        
        
       
            redis.clients
            jedis
       
      





读 *** 作—分布式锁【防止所有请求到达DB,只放行一个】

版本一解决不了。要使用分布式锁
1、这里先模拟集群,在Run Dashboard中右键copy configuration
2、–server.port=10001 然后修改下服务的名字,直接启动
3、压测的时候使用nginx:gulimall.com 80 /index/catalog.json
4、找redis文档:http://www.redis.cn/commands/set.html【redis命令=》string=》set】
这个set是原子性的
演示:使用redis演示分布式锁
1、打开多个Tabby会话,每个会话都进入redis客户端
2、在Tabby里面一次性发送给多个会话窗口,这里不演示以后百度:
docker exec -it redis redis-cli
3、多个会话同时使用占锁命令【原子性的】:【NX,如果redis中不存在key=lock,则设置】【只有一个会话会返回OK,其他返回Nil】
set lock haha NX
4、设置过期时间和占坑需要是一个原子 *** 作:
set lock 1111 EX 300 NX:原子 *** 作
ttl lock:查看过期时间

BUG:
1、死锁问题:没有设置过期时间
解决:获取到锁后设置过期时间
2、死锁问题:正要设置过期时间,宕机了
解决:获取锁+设置过期时间 必须是原子 *** 作setNx
3、过期时间到自动释放锁,导致解锁 *** 作时解的不是自己的锁
解决:解锁的时候先查询判断是不是自己的锁,是揪解锁
4、获取值+解锁不是原子 *** 作,仍有可能解别人的锁
解决:使用lua脚本,而不是del命令
5、时间不够,业务还没处理完,锁就过期了
总结:
加锁+设置过期时间,与解锁都必须是原子 *** 作

自己实现版本:手写lua脚本+原子删 *** 作+原子setNX *** 作【抢占setIfAbsent】
弊端:没有自动续期功能业务 *** 作未结束,过期时间到了
@Autowired
StringRedisTemplate redisTemplate;


    @Override
    public Map> getCatalogJson() {
        //1、加入缓存机制
        String catalogJson = redisTemplate.opsForValue().get("catalogJson");
        if (StringUtils.isEmpty(catalogJson)) {
            Map> dataFromDB = getCatalogJsonFromDbWithRedisLock();
            redisTemplate.opsForValue().set("catalogJson", JSON.toJSONString(dataFromDB));
            return dataFromDB;
        }
//
        Map> result = JSON.parseObject(catalogJson, new TypeReference>>() {
        });
        return result;
    }

    
    public Map> getCatalogJsonFromDbWithRedisLock() {
        // 1、占 分布式锁。去redis占坑,同时设置过期时间
        while (true) {
            String uuid = UUID.randomUUID().toString();
            Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);
            if (lock) {
                // 加锁成功....执行业务【内部会判断一次redis是否有值】
                System.out.println("加锁成功....执行业务");
                Map> dataFromDB = null;
                try {
                    dataFromDB = getDataFromDB();
                } finally {
                    // 2、查询UUID是否是自己,是自己的lock就删除
                    // 查询+删除 必须是原子 *** 作:lua脚本解锁
                    String luascript = "if redis.call("get",KEYS[1]) == ARGV[1]n" +
                            "thenn" +
                            "    return redis.call("del",KEYS[1])n" +
                            "elsen" +
                            "    return 0n" +
                            "end";
                    // 删除锁
                    Long lock1 = redisTemplate.execute(new DefaultRedisscript(luascript, Long.class), Arrays.asList("lock"), uuid);
                    return dataFromDB;
                }
            } else {
                // 加锁失败....重试
                System.out.println("加锁失败....重试");
                // 休眠100ms重试
//                try {
//                    Thread.sleep(100);
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
            }
        }

    }

	
    public Map> getCatalogJsonFromDbWithLocalLock() {
        // TODO 本地锁:synchronized,JUC(lock),在分布式情况下,需要使用分布式锁
        synchronized (this) {
            // 得到锁以后还要检查一次,double check
            return getDataFromDB();
        }
    }



    
    public Map> getDataFromDB(){
        String catalogJSON = redisTemplate.opsForValue().get("catalogJson");
        // double check,拿到锁了还要判断下是否有数据
        if (!StringUtils.isEmpty(catalogJSON)) {
            return JSON.parseObject(catalogJSON, new TypeReference>>() {
            });
        }
        System.out.println("查数据库。。。。。。。");
        List categoryEntities = baseMapper.selectList(null);
        // 获得一级分类
        List level1Categorys = getParent_cid(categoryEntities,0L);

        Map> collect = level1Categorys.stream().collect(Collectors.toMap(k -> k.getCatId().toString(), level1 -> {
            // 查到当前1级分类的2级分类
            List category2level = getParent_cid(categoryEntities, level1.getCatId());
            List catalog2Vos = null;
            if (category2level != null) {
                catalog2Vos = category2level.stream().map(level12 -> {
                    // 查询当前2级分类的3级分类
                    List category3level = getParent_cid(categoryEntities, level12.getCatId());
                    List catalog3Vos = null;
                    if (category3level != null) {
                        catalog3Vos = category3level.stream().map(level13 -> {
                            return new Catalog2Vo.Catalog3Vo(level12.getCatId().toString(), level13.getCatId().toString(), level13.getName());
                        }).collect(Collectors.toList());
                    }
                    return new Catalog2Vo(level1.getCatId().toString(), catalog3Vos, level12.getCatId().toString(), level12.getName());
                }).collect(Collectors.toList());
            }
            return catalog2Vos;
        }));
        return collect;
    }

    
    private List getParent_cid(List selectList, Long parent_cid){
        return selectList.stream().filter(item -> item.getParentCid().equals(parent_cid)).collect(Collectors.toList());
    }

使用redisson作分布式锁
==================使用方式:===================
1、使用spring+@Configuration配置

   org.redisson
   redisson
   3.13.3
  
---------------------------------------------
@Configuration
public class MyRedissonConfig {
    
    @Bean(destroyMethod="shutdown")
    RedissonClient redisson() throws IOException {
        // 1、创建配置
        Config config = new Config();
        // 集群模式
//        config.useClusterServers()
//                .addNodeAddress("127.0.0.1:7004", "127.0.0.1:7001");
        config.useSingleServer().setAddress("redis://192.168.56.10:6379");
        // 2、根据config创建出RedissonClient实例
        return Redisson.create(config);
    }
}
---------------------------------------------
    @Autowired
    RedissonClient redisson;
    
    RLock lock = redisson.getLock("catalogJson-lock");
	lock.lock();
	lock.unlock();
=============================================
2、springboot
也可使用springboot的方式:https://github.com/redisson/redisson/tree/master/redisson-spring-boot-starter
     
         org.redisson
         redisson-spring-boot-starter
         3.13.3
     
     # common spring boot settings

spring.redis.database=
spring.redis.host=
spring.redis.port=
spring.redis.password=
spring.redis.ssl=
spring.redis.timeout=
spring.redis.cluster.nodes=
spring.redis.sentinel.master=
spring.redis.sentinel.nodes=

# Redisson settings

#path to config - redisson.yaml
spring.redis.redisson.config=classpath:redisson.yaml
Redisson版本:
    
    public Map> getCatalogJsonFromDbWithRedissonLock() {
        // 1、锁的名字。锁的粒度:越细越快
        // 例:具体缓存的是某个数据,11号商品,product
        RLock lock = redisson.getLock("catalogJson-lock");
        lock.lock();

        Map> dataFromDB = null;
        try {
            // 加锁成功....执行业务【内部会再判断一次redis是否有值】
            dataFromDB = getDataFromDB();
        } finally {
            // 2、查询UUID是否是自己,是自己的lock就删除
            // 查询+删除 必须是原子 *** 作:lua脚本解锁
            lock.unlock();
        }
        return dataFromDB;
    }
Lock自动续期+API【看门狗】
1、锁的自动续期,如果业务超长,运行期间自动给锁续上新的30s。不用担心业务时间长,锁自动过期被删掉
2、加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s以后自动册除。
lock.Lock(10,TimeUnit.SECONDS);//16秒自动解锁,自动解锁时间一定要大于业务的执行时间。
问题:Lock.lock(10,TimeUnit.SECONDS);在锁时间到了以后,不会自动续期。
	1、如果我们传递了锁的超时时间,就发送给redis执行脚本,进行占锁,默认超时就是我们指定的时间
	2、如果我们未指定锁的超时时间,就使用30 *1000【LocklMatchdogTimeout看门狗的默认时间】;
只要占锁成功,就会启动一个定时任务【重新给锁设置过期时间,新的过期时间就是看门狗的默认时间】,每隔10s都会自动续期成30S
internalLockLeaseTime【看门狗时间】 / 3,10s


3、tryLock:一段时间内获取锁,如果没获取到就不获取了
公平锁和非公平锁 getFairLock
synchronized:非公平锁
getFairLock(""):公平锁,按顺序获得锁
默认是非公平锁:锁一释放抢占

读写锁 getReadWriteLock
保证一定能读到最新数据,修改期间,写锁是一个排他锁(互斥锁)。读锁是一个共享锁
读锁:共享锁
写锁:互斥,排他锁

写锁没释放读就必须等待
读读:相当子无锁,并发读,只会在redis中记录好,所有当前的读锁。他们都会同时加锁成功
写读:等待写锁释放
写写:阻塞方式
读写,有读锁。写也需要等待。
只要有写的存在,都必须等待

使用:
1、获取同一把锁
2、获得写锁:lock.writeLock()
2、获得读锁:lock.ReadLock()


写锁:

读锁:

读锁同时存入多个:

信号量Semphore【限流】
acquire:获取一个信号量,为0阻塞
release:释放一个信号量,+1
tryacquire:尝试获取一个信号量,一次性不阻塞

作用:限流,所有服务上来了去获取一个信号量,一个一个放行

闭锁CountDownLatch【多线程调度】
多线程任务调度的时候,多个线程都完成了某个 *** 作才算全部

写 *** 作—缓存数据一致性【必须满足最终一致性】

双写模式+失效模式

就是缓存中的数据如何与数据库保持一致
1)双写模式:DB+CACHE都写【脏数据(写库与写缓存有延迟时间,容忍度),线程1数据覆盖了线程2的最新缓存数据】【写与写产生的问题,最终一致性】
	方案1:写 *** 作加锁【完全一致性】
	方案2:设置过期时间,达到最终一致性【暂时性脏数据,过期后就是最新数据,容忍度是多少】
2)失效模式:直接删CACHE【线程1修改DB删缓存,线程2修改DB,线程3读DB,线程2删缓存(没有缓存),线程3设置缓存(脏数据,设置的是2修改前的数据,没有读到2提交后的数据)】【读与写的问题】
	方案1:加锁【但是很慢】
	
经常修改的数据就不应该放在缓存里


我们系统的一致性问题:
	1、缓存的所有数据都有过期时间,数据过期下一次查询主动更新
	2、读写数据的时候,加上分布式的读写锁【经常写的数据不写入缓存】

解决方案:

1、不需要考虑数据一致性的数据:用户维度(订单、用户数据【不会发生高并发问题】)【设置过期时间就可以解决】
2、菜单(商品介绍、基础规格)只需要最终一致性,canal订阅binlog【设置过期时间就可以解决】
3、加读写锁【碰到写了,就锁定,写完再读】

总结:
	·我们能放入缓存的数据本就不应该是实时性、一致性要求超高的。所以缓存数据的时候加上过期时间,
保证每天拿到当前最新数据即可。
	·我们不应该过度设计,增加系统的复杂性
	·遇到实时性、一致性要求高的数据,就应该查数据库,即使慢点。

缓存数据一致性-Canal
原理: *** 作数据库,canal获得binlog二进制日志,根据日志去修改缓存,不用再调用修改缓存的代码

好处:减少了代码量

弊端:增加了一个中间件

作用:canal分析 各种表的binlog,生成一个用户推荐表,所以每个人的首页都是不一样的。

总结
1、 *** 作DB的锁是在DB层,例如行锁,而不是在service层加锁
2、这里的分布式锁是为了控制热点数据只要一条发送到DB
3、分布式锁的粒度要小,例如一件商品一个锁:就像DB里面的行锁

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5684916.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存