最近项目需要用到延迟队列,本来考虑使用MQ实现,但由于目前使用RocketMq,不支持灵活的延迟时间配置,最终采用redis实现延迟功能。
一. Redis ZSET实现延迟队列大体思路:使用ZSet结构,以messageId作为value,延迟时间delayTIme作为score。每次获取小于当前时间的数据。
- 推送消息
推送消息只是简单将数据以及延迟时间放入延迟队列中。
public void push(long messageId,long delayTime) { long score = System.currentTimeMillis() + delayTime; redisTemplate.boundZSetOps(DELAY_QUEUE_NAME).add(String.valueOf(messageId),score); }
- 处理消息
通过启动一个线程,循环获取队列中的数据。
@PostConstruct public void loopEvent() { // 通过启用一个线程池,管理单线程 ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("push-message-pool-%d").get(); ExecutorService singleThreadPool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new linkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); singleThreadPool.execute(() -> { BoundZSetOperationsoperations = redisTemplate.boundZSetOps(DELAY_QUEUE_NAME); // 不断获取Zset中过期的队列 while (true) { try { long current = System.currentTimeMillis(); // 获取小于当前时间的消息 Set messageIdSet = redisTemplate.opsForZSet().rangeByScore(DELAY_QUEUE_NAME,0, current,0,1); // 当前没有过期的消息,休息一下 if (CollectionUtils.isEmpty(messageIdSet)) { Thread.sleep(500); continue; } Iterator iterator = messageIdSet.iterator(); String messageId = iterator.next(); if (operations.remove(messageId)>0) { // 具体处理消息的业务逻辑 handleDelayMessage(messageId); } } catch (Exception e) { log.error("消息消费异常",e); } } }); }
思考下operations.remove作用是什么?
这里是避免重复消费消息,当remove>0表示删除成功,当前未被其他线程消费。
上面的结构虽然满足了基本需要,但还有几个点要清楚: - 采用redis,数据的持久化得到了一定保证 - 缺乏应答机制,这个是硬伤,获取到数据消费异常,需要业务手动进行补偿,这是对比MQ的延迟最大的缺憾 - 如果延迟队列一直为空,虽然采用了休眠,但是造成了一定延迟,且长期的存在延迟队列为空,会造成CPU资源浪费 - 上面的demo每次获取一条数据,如果多个实例竞争激烈,获取的message往往都是已失效的,造成一些无效的请求,这一步可用增大获取的消息数量得到缓解二. Redisson 实现延迟队列
基于上文的考虑,最终采用了redisson提供的封装。以前redisson使用往往是在分布式锁的场景引用,没想到其他场景使用封装的也不错。
redisson设计结构: 采用三个队列一个ZSet结构的队列timeout_queue,一个List结构的堵塞队列block_queue,还有一个记录所有数据的队列queue。
- timeout_queue队列: 将要延迟的数据添加入延迟队列
- block_queue队列: timeout_queue到期的数据加入该队列,同时移除延迟队列中的数据
- queue队列: 记录了所有数据的加入顺序等
timeout_queue队列实现正常的延迟功能,block_queue获取到期数据。如果没有到期数据,线程将会堵塞住,避免CPU的浪费,
由于采用了lua脚本,执行了获取过期数据,添加数据到堵塞队列的过程,避免了分布式场景下重复消费的问题。
- 推送消息
推送消息采用offerAsync方法,异步推送。
@Autowired private RDelayedQueuedelayedQueue; public void push(long messageId,long delayTime) { delayedQueue.offerAsync(messageId, delayTime, TimeUnit.SECONDS); }
- 处理消息
通过启动一个线程,循环获取队列中的数据。
@Autowired private RBlockingQueue三. 总结blockQueue; @PostConstruct public void loopEvent() { // 通过启用一个线程池,管理单线程 ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("push-message-pool-%d").get(); ExecutorService singleThreadPool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new linkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); singleThreadPool.execute(() -> { while (true) { try { // 具体处理消息的业务逻辑,take方法会堵塞直到获取到数据 handleDelayMessage(blockQueue.take()); } catch (Exception e) { log.error("消费延迟推送消息异常", e); } } }); }
采用了Redisson的延迟队列,极大的避免了一些性能问题,但是缺乏应答机制,还是会造成一些困扰,对消息的安全性要求高的,建议还是采用MQ去处理。
下篇将讨论下,Redisson延迟队列的实现机制。关联的项目demo地址
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)