基于redis实现延时任务

基于redis实现延时任务,第1张

redis的zset是有序集合,默认根据score升序排序。并且可以根据scope范围查询,因此可以启动一个线程循环执行范围查询,获取当前时间之前的数据,即要执行任务,(因为不是严格按照时间匹配的,因此可能会有一点时间偏差,但一般情况下不会有影响),处理完后删除缓存。考虑到线程有可能会异常退出(比如redis连接异常等),因此使用监听者模式设计了线程重启方案,监听者会监听线程,当线程出现异常时监听者会重启线程。下面是具体代码。

@Configuration
public class RedisTemplateConfig {

    @Bean(name = "myredis")
    public RedisTemplate redisTemplate(RedisConnectionFactory factory){
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate();
        redisTemplate.setConnectionFactory(factory);
        // 使用StringRedisSerializer来序列化和反序列化redis的key值
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        redisTemplate.setKeySerializer(stringRedisSerializer);
        redisTemplate.setHashKeySerializer(stringRedisSerializer);
        // 使用GenericFastJsonRedisSerializer 来序列化和反序列化redis的value值
        GenericFastJsonRedisSerializer genericFastJsonRedisSerializer = new GenericFastJsonRedisSerializer();
        redisTemplate.setValueSerializer(genericFastJsonRedisSerializer);
        redisTemplate.setHashValueSerializer(genericFastJsonRedisSerializer);
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }
}
public class DelayTaskConsumerThread extends Observable implements Runnable {

    @Override
    public void run() {
        System.out.println("线程启动:"+Thread.currentThread().getId());
        while (true){
            try {
                long time = System.currentTimeMillis();
                consumeMsg("testDelayTask",time);
            } catch (Exception e) {
                e.printStackTrace();

                System.out.println("线程异常:"+Thread.currentThread().getId());
                doBusiness();
                break;
            }
        }
    }
    private void consumeMsg(String key,long time){
        RedisTemplate redisTemplate = (RedisTemplate) SpringContextUtil.getBean("myredis");
        Set set = redisTemplate.opsForZSet().rangeByScore(key,0l, time);
        if (set.size() > 0){
            System.out.println("线程:"+Thread.currentThread().getId()+","+"获取到["+set.size()+"]条数据,[time="+time+"]");
            Iterator iterator = set.iterator();
            String next="";
            while (iterator.hasNext()){
                try {
                    next = (String) iterator.next();
                    System.out.println("线程:"+Thread.currentThread().getId()+","+"消费消息:[value="+ next +"]");
                }catch (Exception e){
                    //这里可以对消费异常的数据做进一步处理
                    throw e;
                }finally {
                    redisTemplate.opsForZSet().remove(key,next);
                }
            }
        }else {
            System.out.println("线程:"+Thread.currentThread().getId()+","+"没有消息[time="+time+"]");
        }
    }

    public void doBusiness(){
        if(true){
            super.setChanged();
        }
        notifyObservers();
    }
}
public class DelayTaskConsumerListener implements Observer {
    @Override
    public void update(Observable o, Object arg) {
        System.out.println("DelayTaskConsumerThread线程异常退出,重新启动");
        DelayTaskConsumerThread run = new DelayTaskConsumerThread();
        run.addObserver(this);
        ThreadPoolUtil.execute(run);
        System.out.println("DelayTaskConsumerThread线程已重启");
    }
}
@Component
public class DelayTaskConsumerRunner implements ApplicationRunner {
    @Autowired
    @Qualifier(value = "myredis")
    private RedisTemplate redisTemplate;

    @Override
    public void run(ApplicationArguments args) {
        redisTemplate.delete("testDelayTask");
        DelayTaskConsumerThread taskConsumerThread = new DelayTaskConsumerThread();
        DelayTaskConsumerListener taskConsumerListener = new DelayTaskConsumerListener();
        taskConsumerThread.addObserver(taskConsumerListener);

        ThreadPoolUtil.execute(taskConsumerThread);
    }
}
@Component
public class SpringContextUtil implements ApplicationContextAware {
    /**
     * 上下文对象实例
     */
    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        SpringContextUtil.applicationContext = applicationContext;
    }

    /**
     * 获取applicationContext
     *
     * @return
     */
    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    /**
     * 通过name获取 Bean.
     *
     * @param name
     * @return
     */
    public static Object getBean(String name) {
        return getApplicationContext().getBean(name);
    }

    /**
     * 通过class获取Bean.
     *
     * @param clazz
     * @param 
     * @return
     */
    public static <T> T getBean(Class<T> clazz) {
        return getApplicationContext().getBean(clazz);
    }

    /**
     * 通过name,以及Clazz返回指定的Bean
     *
     * @param name
     * @param clazz
     * @param 
     * @return
     */
    public static <T> T getBean(String name, Class<T> clazz) {
        return getApplicationContext().getBean(name, clazz);
    }

}
@GetMapping(value = "/test", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public CommonResponse test() {
        produceMsg("testDelayTask", UUID.randomUUID().toString(),System.currentTimeMillis());//这里的时间戳实际是需要从页面选择的任务执行时间,这里为了方便直接使用了系统当前时间戳
        return new CommonResponse();
    }

    /**
     *
     * @param key
     * @param value
     * @param score 设置的任务执行时间
     */
    private void produceMsg(String key,String value,long score){
        Boolean add = redisTemplate.opsForZSet().add(key, value, score);
        if (add){
            System.out.println("发送消息:[value="+value+",score="+score+"]");
        }else {
            System.out.println("消息发送失败:[value="+value+",score="+score+"]");
        }
    }

缺点:
1、需要一直频繁的访问redis,就算redis中没有要执行的任务了也会一直访问,造成资源浪费
2、时间控制不精准

优化思路:
可以借鉴zookeeper中的节点监听机制,当某个数据节点被删除时,由服务端主动通知监听此节点的客户端,这样就不需要客户端一直轮询访问了。但zookeeper没有redis中的超时删除功能,因此不合适。幸运的是,etcd相比于zookeeper,增加了超时删除功能,另外相比于redis,增加了回调功能,因此可以考虑采用etcd或相关思想进行优化。

知识点:
1、redis的zset数据结构
2、java监听者模式

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/741344.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-28
下一篇 2022-04-28

发表评论

登录后才能评论

评论列表(0条)

保存