查看对象编码
可以看到对象的数据类型为stream,返回的streamID 的基本结构为 "{timestamp}-{sequence}"
使用XINFO命令查看该 stream Key 的基本信息
可以看到stream中使用了一种 "radix-tree" 数据结构。
Radix树对上述单词的存储
当插入first时
相比于普通的字典树,Radix树会将单节点树枝压缩为一个节点,以节省存储空间,提高搜索的效率。在Streams结构中,默认生成的StreamID为 时间戳+自增序列号 的形式,当消息的时间分布紧凑时,这种存储结构多个StreamID复用的前缀将很长,可以将存储空间压缩得很小,这对于内存使用内存存储数据的Redis很重要。Radix树的另一个优点是解决了hash冲突的问题。对于一般的hash结构,当遇到hash冲突的情况,一般采用链地址法的方式解决冲突。这样一个hash槽中的链可能会很长,如果采用扩容的方式,执行效率一般也不是很高。采用树结构索引数据解决了hash冲突的问题。当然,Radix树在遇到新插入的数据时也会遇到数据节点拆分的问题。
stream 是一种 append-only 的数据结构,只有 del 命令可以删除对应的Key。
指定添加数据的最大长度(~表示只在节点删除时清除数据,能保证数据个数的最大长度不小于 LENGTH 个,不需要每次调整Radix树,性能相对较好)
阻塞式,获取最新1条的数据(阻塞单位:ms)
根据Streams支持的命令,要使用Streams作为多消费队列,主要有以下几点:
官网介绍
中文官网命令介绍
图解Redis数据结构,讲的比较好
知乎上基于Redis消息队列方案介绍
用户登录检测一次订单是否过期是一种常见的做法,其流程如下:1. 用户下单。当用户下单时,订单信息会保存在数据库中,并且会记录下订单的创建时间和过期时间等信息。
2. 用户登录。当用户登录时,系统会检查该用户是否有未支付的订单,如果有,则会检查订单的过期时间是否已到,如果已到,则将该订单标记为过期。
3. 提示用户。如果用户登录后发现有过期订单,系统会提示用户该订单已过期,需要重新下单。用户可以选择重新下单或者取消订单。
需要注意的是,订单的过期时间应该合理设置,一般可以根据业务需要和交付周期等因素来确定。同时,为了避免用户误解订单状态,系统应该在订单过期前提前进行提醒,以便用户及时处理。
总之,用户登录检测一次订单是否过期是一种有效的方法,可以帮助用户及时了解订单状态,避免不必要的麻烦和误解,提高用户满意度。
https://gitchat.csdn.net/activity/5c9a452004714778e37df815
批处理
可以通过multi进行批处理
语法格式为:
使用XLEN命令来获取一个Stream的条目数量:
按范围查询: XRANGE 和 XREVRANGE
要根据范围查询Stream,我们只需要提供两个ID,即start 和 end。返回的区间数据将会包括ID是start和end的元素,因此区间是完全包含的。两个特殊的ID - 和 + 分别表示可能的最小ID和最大ID。
XREVRANGE命令与XRANGE相同,但是以相反的顺序返回元素,因此XREVRANGE的实际用途是检查一个Stream中的最后一项是什么:
请注意:XREVRANGE命令以 相反的顺序 获取start 和 stop参数。
消费者组模式的支持主要由两个命令实现:
三个在同一组 mpGroup 消费者 A、B、C 在消费消息时(消费者在消费时指定即可,不用预先创建),有着 互斥原则 。
用于在消息队列 mq 上创建消费组 mpGroup,最后一个参数0,表示该组从第一条消息开始消费(意义与XREAD的0一致)。除了支持CREATE外,还支持SETID设置起始ID,DESTROY销毁组,DELCONSUMER删除组内消费者等 *** 作。
用于组mqGroup内消费者consumerA在队列mq中消费,参数>表示未被组内消费的起始消息,参数count 1表示获取一条。语法与XREAD基本一致,不过是增加了组的概念。
可以进行组内消费的基本原理是,STREAM 类型会为每个组记录一个最后处理(交付)的消息ID(lastdeliveredid),这样在组内消费时,就可以从这个值后面开始读取,保证 不重复消费 。
以上就是消费组的基础 *** 作。除此之外,消费组消费时,还有一个必须要考虑的问题,就是若某个消费者,消费了某条消息,但是并没有处理成功时(例如消费者进程宕机),这条消息可能会丢失,因为组内其他消费者不能再次消费到该消息了。下面继续讨论解决方案。
为了解决组内消息读取但处理期间消费者崩溃带来的消息丢失问题,STREAM 设计了 Pending 列表,用于记录读取但并未处理完毕的消息。命令XPENDIING 用来获消费组或消费内消费者的未处理完毕的消息。
每个Pending的消息有4个属性:
读取了之后要如何表示消费者处理完毕了消息呢?使用命令 XACK 完成告知消息处理完成,演示如下:
有了这样一个 Pending 机制,就意味着在某个消费者读取消息但未处理后,消息是不会丢失的。等待消费者再次上线后,可以读取该 Pending 列表,就可以继续处理该消息了,保证消息的有序和不丢失。
此时还有一个问题,就是若某个消费者宕机之后,没有办法再上线了,那么就需要将该消费者 Pending 的消息,转义给其他的消费者处理,就是消息转移。请继续。
消息转移的 *** 作时将某个消息转移到自己的 Pending 列表中。使用语法XCLAIM来实现,需要设置组、转移的目标消费者和消息ID,同时需要提供IDLE(已被读取时长),只有超过这个时长,才能被转移。演示如下:
转移除了要指定ID外,还需要指定 IDLE,保证是长时间未处理的才被转移。 被转移的消息的IDLE 会被重置 ,用以保证不会被重复转移,以为可能会出现将过期的消息同时转移给多个消费者的并发 *** 作,设置了 IDLE,则可以使后面的转移不会成功,因为 IDLE 不满足条件。
消息被读取次数,delivery counter,该属性的作用由于统计消息被读取的次数,包括被转移也算。这个属性主要用在判定是否为错误数据上。请继续看:
如果某个消息,不能被消费者处理,也就是不能被 XACK,这是要长时间处于 Pending 列表中,即使被反复的转移给各个消费者也是如此。此时该消息的 delivery counter 就会 反复累加 ,当累加到某个我们预设的临界值时,我们就认为是坏消息,由于有了判定条件,我们将坏消息处理掉即可,删除即可。
删除一个消息,使用XDEL语法,演示如下:
本例中,并没有删除 Pending 中的消息因此你查看 Pending,消息还会在。可以执行XACK标识其处理完毕!
Stream提供了XINFO来实现对服务器信息的监控,可以查询:
至此,消息队列的基本 *** 作说明大体结束!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)