日常设计-- 延迟队列

日常设计-- 延迟队列,第1张

日常设计-- 延迟队列

最近项目需要用到延迟队列,本来考虑使用MQ实现,但由于目前使用RocketMq,不支持灵活的延迟时间配置,最终采用redis实现延迟功能。

一. Redis ZSET实现延迟队列

大体思路:使用ZSet结构,以messageId作为value,延迟时间delayTIme作为score。每次获取小于当前时间的数据

  1. 推送消息
    推送消息只是简单将数据以及延迟时间放入延迟队列中。
 public void push(long messageId,long delayTime) {
        long score = System.currentTimeMillis() + delayTime;
        redisTemplate.boundZSetOps(DELAY_QUEUE_NAME).add(String.valueOf(messageId),score);
    }
  1. 处理消息
    通过启动一个线程,循环获取队列中的数据。
 	@PostConstruct
    public void loopEvent() {
    	// 通过启用一个线程池,管理单线程
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("push-message-pool-%d").get();
        ExecutorService singleThreadPool = new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new linkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
        singleThreadPool.execute(() -> {
            BoundZSetOperations operations = redisTemplate.boundZSetOps(DELAY_QUEUE_NAME);
            // 不断获取Zset中过期的队列
            while (true) {
                try {
                    long current = System.currentTimeMillis();
                    // 获取小于当前时间的消息
                    Set messageIdSet = redisTemplate.opsForZSet().rangeByScore(DELAY_QUEUE_NAME,0, current,0,1);
                    // 当前没有过期的消息,休息一下
                    if (CollectionUtils.isEmpty(messageIdSet)) {
                        Thread.sleep(500);
                        continue;
                    }
                    Iterator iterator = messageIdSet.iterator();
                    String messageId = iterator.next();
                    if (operations.remove(messageId)>0) {
                    	// 具体处理消息的业务逻辑
                        handleDelayMessage(messageId);
                    }
                } catch (Exception e) {
                    log.error("消息消费异常",e);
                }
            }
        });
    }

思考下operations.remove作用是什么?
这里是避免重复消费消息,当remove>0表示删除成功,当前未被其他线程消费。

上面的结构虽然满足了基本需要,但还有几个点要清楚:
- 采用redis,数据的持久化得到了一定保证
- 缺乏应答机制,这个是硬伤,获取到数据消费异常,需要业务手动进行补偿,这是对比MQ的延迟最大的缺憾
- 如果延迟队列一直为空,虽然采用了休眠,但是造成了一定延迟,且长期的存在延迟队列为空,会造成CPU资源浪费
- 上面的demo每次获取一条数据,如果多个实例竞争激烈,获取的message往往都是已失效的,造成一些无效的请求,这一步可用增大获取的消息数量得到缓解
二. Redisson 实现延迟队列

基于上文的考虑,最终采用了redisson提供的封装。以前redisson使用往往是在分布式锁的场景引用,没想到其他场景使用封装的也不错。

redisson设计结构: 采用三个队列一个ZSet结构的队列timeout_queue,一个List结构的堵塞队列block_queue,还有一个记录所有数据的队列queue。

  • timeout_queue队列: 将要延迟的数据添加入延迟队列
  • block_queue队列: timeout_queue到期的数据加入该队列,同时移除延迟队列中的数据
  • queue队列: 记录了所有数据的加入顺序等

timeout_queue队列实现正常的延迟功能,block_queue获取到期数据。如果没有到期数据,线程将会堵塞住,避免CPU的浪费,
由于采用了lua脚本,执行了获取过期数据,添加数据到堵塞队列的过程,避免了分布式场景下重复消费的问题。

  1. 推送消息
    推送消息采用offerAsync方法,异步推送。
     @Autowired
    private RDelayedQueue delayedQueue;
    
	public void push(long messageId,long delayTime) {
        delayedQueue.offerAsync(messageId, delayTime, TimeUnit.SECONDS);
    }
  1. 处理消息
    通过启动一个线程,循环获取队列中的数据。
	@Autowired
    private RBlockingQueue blockQueue;
 	@PostConstruct
    public void loopEvent() {
    	// 通过启用一个线程池,管理单线程
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("push-message-pool-%d").get();
        ExecutorService singleThreadPool = new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new linkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
        singleThreadPool.execute(() -> {
            while (true) {
                try {
                	// 具体处理消息的业务逻辑,take方法会堵塞直到获取到数据
                    handleDelayMessage(blockQueue.take());
                } catch (Exception e) {
                    log.error("消费延迟推送消息异常", e);
                }
            }
        });
    }
三. 总结

采用了Redisson的延迟队列,极大的避免了一些性能问题,但是缺乏应答机制,还是会造成一些困扰,对消息的安全性要求高的,建议还是采用MQ去处理。
下篇将讨论下,Redisson延迟队列的实现机制。关联的项目demo地址

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5583155.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-14
下一篇 2022-12-14

发表评论

登录后才能评论

评论列表(0条)

保存