延时队列的几种实现方式(只有原理,并没有源码)

延时队列的几种实现方式(只有原理,并没有源码),第1张

延时队列的几种实现方式(只有原理,并没有源码) 延时队列 需求描述 场景一

在淘宝下了订单,过半个小时未支付就取消订单

场景二

还是淘宝(别问,问就是淘宝资深剁手党),发货后超过15天未确认就自动收货


需求分析

​ 本质上都是超过xxx时间,就异步去做一件事。说到异步那基本上就是搞个定时任务去轮询或者消息队列+轮询。基本上有几种实现方式,挨个看一下。


实现方式 DelayQueue+Delayed

Java的并发包java.util.concurrent下提供了延时队列DelayQueue,它内部维护了一个优先级队列PriorityQueue来维护任务顺序,方便取出到时间的任务。PriorityQueue是个二叉堆,这就意味着它的插入、删除的时间复杂度都是O(logn)。

定时任务Quartz

Quartz是一个任务调度框架,不过它有一定的周期性,可能很多单子已经超时,但还没到达触发执行的时间点,那么就会造成订单处理的不够及时。如果对超时的时间精度要求没那么高的情况下可以使用。

Redis Redis的Zset

Redis有种数据类型Zset,它利用score属性为集合内元素维护一个顺序,通过Zset就可以实现延时队列。

做法大概是:

任务插入的时候key是你的单号或者唯一ID,score是时间戳异步去redis用zrange批量取出超时的单号,然后进行处理 Redis的发布订阅功能

Redis也提供了发布订阅功能,可以修改配置文件redis.conf中的:notify-keyspace-events Ex,监听超时的key,然后编写监听器处理。

RocketMQ RocketMQ的延迟消息机制

RocketMQ提供的延迟消息机制。如果往RocketMQ发送了一条延迟消息,它不会立刻对消费者可见,而是在指定的时间后再投递给消费者。那么我们可以给RocketMQ投递延迟消息,然后到时间去消费,检查订单是否已经支付,如果未支付就取消订单,如果支付就继续走后面的业务逻辑。

RocketMQ事务消息的反查机制

如果RocketMQ在生成订单的时候用上了事务消息,那么可以用事务消息的状态回查机制来替代定时任务。在下单时,给 Broker返回一个UNKNOWN的未知状态。而在状态回查的方法中去查询订单的支付状态。我们只需要配置RocketMQ中的事务消息回查次数(默认15次)和事务回查间隔时 间(messageDelayLevel),就可以更优雅的完成这个支付状态检查的需求。

RabbitMQ的TTL+DXL实现延时队列

RabbitMQ可以通过消息存活时间TTL+死信队列的Exchange(DXL)来实现延时队列。

Time To Live(TTL):

TTL 顾名思义:指的是消息的存活时间,RabbitMQ可以通过x-message-tt参数来设置指定Queue(队列)和 Message(消息)上消息的存活时间,它的值是一个非负整数,单位为微秒。

RabbitMQ 可以从两种维度设置消息过期时间,分别是队列和消息本身:

设置队列过期时间,那么队列中所有消息都具有相同的过期时间。设置消息过期时间,对队列中的某一条消息设置过期时间,每条消息TTL都可以不同。

如果同时设置队列和队列中消息的TTL,则TTL值以两者中较小的值为准。而队列中的消息存在队列中的时间,一旦超过TTL过期时间则成为Dead Letter(死信)。

Dead Letter Exchanges(DLX):

DLX即死信交换机,绑定在死信交换机上的即死信队列。RabbitMQ的Queue(队列)可以配置两个参数x-dead-letter-exchange和x-dead-letter-routing-key(可选),一旦队列内出现了Dead Letter(死信),则按照这两个参数可以将消息重新路由到另一个Exchange(交换机),让消息重新被消费。

x-dead-letter-exchange:队列中出现Dead Letter后将Dead Letter重新路由转发到指定 exchange(交换机)。

x-dead-letter-routing-key:指定routing-key发送,一般为要指定转发的队列。

队列出现Dead Letter的情况有:

消息或者队列的TTL过期队列达到最大长度消息被消费端拒绝(basic.reject or basic.nack)

所以RabbitMQ的原理大概是:

通过设置ttl+dlx,消息到指定时间后,投递到另外一个Exchange中去消费,去检查是否已经支付,未支付就取消,支付了就继续走后序的业务流程。

时间轮 时间轮介绍

时间轮有简单时间轮(Simple Timing Wheel)和分层时间轮(Hierarchical Timing Wheel)两类。两者各有利弊,也都有各自的使用场景。Kafka 采用的是分层时间轮。

分层时间轮

分层时间轮可以简单的认为,一圈时间轮8格,每个格子1s,那么第九秒怎么办?那就再往高抽出层的概念,用第二层+第一格来表示。当然Kafka的分层时间轮比这复杂的多。再举个例子:想想我们生活中的手表。手表由时针、分针和秒针组成,它们各自有独立的刻度,但又彼此相关:秒针转动一圈,分针会向前推进一格;分针转动一圈,时针会向前推进一格。这就是典型的分层时间轮。

和手表不太一样的是,Kafka 自己有专门的术语。

在 Kafka 中,手表中的“一格”叫“一 个桶(Bucket)”,而“推进”对应于 Kafka 中的“滴答”,也就是 tick。

除此之外,每个 Bucket 下也不是白板一块,它实际上是一个双向循环链表(Doubly linked Cyclic List),里面保存了一组延时请求。

在 Kafka 源码中,时间轮对应 utils.timer 包下的 TimingWheel 类,每个 Bucket 下的链 表对应 TimerTaskList 类,链表元素对应 TimerTaskEntry 类,而每个链表元素里面保存的 延时任务对应 TimerTask。 在这些类中,TimerTaskEntry 与 TimerTask 是 1 对 1 的关系,TimerTaskList 下包含多 个 TimerTaskEntry,TimingWheel 包含多个 TimerTaskList。

Kafka 延时请求实模块现

​ 延迟请求模块属于 Kafka 的冷门组件,Kafka通过延时模块来异步循环 *** 作和管理定时任务。内部是基于时间轮算法实现的。

​ 举个例子:比如配置了 acks=all 的生产者发送的请求可能不会立刻完成,要等 ISR 中的所有副本都要成功才会响应这次写入。只有满足了条件或发生了超时,Kafka 才会把该请求标记为完成状态。

​ 这种请求在Kafka内部是通过TimingWheel类建模时间轮模型,SystemTimer封装了底层的时间轮,由 DelayedOperation 调用, 再通过DelayedOperationPurgatory 管理 DelayedOperation。它们共同实现了 Broker 端对于延迟请求的处理,基本思想就是,能立即完成的请求马上完成,否则就放入到缓冲区,再由DelayedOperationPurgatory 类的方法会自动地处理这些延迟请求。

分层时间轮体系

TimerTask 类:建模 Kafka 延时请求。它是一个 Runnable 类,Kafka 使用一个单独线 程异步添加延时请求到时间轮。

TimerTaskEntry 类:建模时间轮 Bucket 下延时请求链表的元素类型,封装了 TimerTask 对象和定时任务的过期时间戳信息。 TimerTaskList 类:建模时间轮 Bucket 下的延时请求双向循环链表,提供 O(1) 时间复 杂度的请求插入和删除。

TimingWheel 类:建模时间轮类型,统一管理下辖的所有 Bucket 以及定时任务。

分层时间轮的上层组件

包括 Timer 接口及其实现类 SystemTimer、DelayedOperation 类以及 DelayedOperationPurgatory 类:

SystemTimer 类:Kafka 定义的定时器类,封装了底层分层时间轮,实现了时间轮 Bucket 的管理以及时钟向前推进功能。它是实现延迟请求后续被自动处理的基础。

DelayedOperation 类:延迟请求的高阶抽象类,提供了完成请求以及请求完成和过期 后的回调逻辑实现。

DelayedOperationPurgatory 类:Purgatory 实现类,该类定义了 WatcherList 对象 以及对 WatcherList 的 *** 作方法,而 WatcherList 是实现延迟请求后续自动处理的关键 数据结构。

基于kafka实现延迟队列

基于Kafka怎么实现延迟队列?看下网上大神的描述:

    在发送延迟消息时不直接发送到目标topic,而是发送到一个用于处理延迟消息的topic,例如delay-minutes-1写一段代码拉取delay-minutes-1中的消息,将满足条件的消息发送到真正的目标主题里。

​ 可以看出,实际上如果非要用Kafka去做延时队列,可以,但是基本上跟自己实现也差不多了。并且自己实现的时候,拉取消息的时候还要去做特殊的处理,不能发现消息没有过期直接不处理。因为如果你没在指定的时间(max.poll.interval.ms参数配置)去处理,kafka会认为你这个消费者挂掉了,然后去做reblance处理把你踢掉。而reblance可以参考之前整理的博客,并不是很友好的 *** 作,跟JVMGC导致的STW差不多。

总结

列举出了几种延时队列的实践方式,各有优劣:(自己总结,如果有不正确的地方请指正)

DelayQueue+异步:底层的数据结构是二叉堆,插入、删除的时间复杂度是O(logn),需要定时扫描很庞大的一个订单信息,数据量很大的时候不是特别的友好,而且宕机数据会丢失定时任务Quartz,分布式情况下可能会有问题Redis的Zset+异步:可以批量处理,不过准确性取决于你异步处理的频率Redis的发布订阅:实际上就是订阅每个key过期的事件,意味着不能批量处理,当数据量过大的时候非常不友好RocketMQ的延迟消息:数据量大的时候能否处理的过来要依赖于RocketMQ的延迟消息处理能力;是否及时处理也要依赖于消费者那边的处理能力;RocketMQ的事务消息反查机制:实际上是一条一条的处理,数据量大的时候不是特别的友好;RabbitMQ的TTL+DXL:如果数据量过大,消费者消费死信队列数据的时候处理不及时导致消息积压,RabbitMQ的性能会急剧下降。这是RabbitMQ本身特性造成的;

​ 当然也可以参考Kafka中的时间轮算法,自己去实现延时队列;个人认为基于时间轮算法实现的延时队列添加、查找时间复杂度都是

O(1),单看添加、查找的时间复杂度是最好的。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5701740.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存