目录
stream介绍
stream *** 作命令
追加消息XADD
范围查询XRANGE,XREVRANGE
读取消息命令XREAD
消费组模式相关命令
创建消费组XGROUP CREATE
读取消费组消息XREADGROUP
全部命令
stream实现原理
radix查找和遍历
radix查找过程
radix遍历
radix插入
radix删除
redis相关实现原理
radix相关结构定义
stream结构定义
listpack紧凑列表
消费组功能实现
总结
stream介绍
stream是redis5.0引入的新数据类型,是消息队列的一种实现方式,实现了完整的消息队列功能。主要特点包括:
- stream 提供了消息的持久化和主备复制功能,保证消息不会丢失。阻塞 *** 作,允许消费者等待生产者向流中添加新数据。消费组功能(类似Kafka实现),允许一组客户端协作使用同一消息流的不同部分。
stream *** 作命令 追加消息XADD
XADD命令向stream中追加消息,如果stream不存在,则创建。消息格式:
XADD key ID field value [feild value ...] key: stream名称,如果不存在则创建 ID: 消息ID,*表示redis自动创建ID field value: 值,K-V格式
执行XADD命令插入内容,返回消息ID,消息ID包含两部分${时间戳}-${序列}
> XADD teststream * name redis version 6.0 "1641651260618-0"
执行XADD命令,指定消息ID
> XADD teststream 1641651260618-1 name java version 1.8 "1641651260618-1"范围查询XRANGE,XREVRANGE
XRANGE命令实现范围查询,XREVRANGE逆序查询,命令格式:
XRANGE|XREVRANGE key start end [COUNT count] key:straem名称 start:开始ID,- 表示最小 end:结束ID,+ 表示最大 count:返回数量
命令演示
> XRANGE teststream - + 1) 1) "1641651260618-0" 2) 1) "name" 2) "redis" 3) "version" 4) "6.0" 2) 1) "1641651260618-1" 2) 1) "name" 2) "java" 3) "version" 4) "1.8" # 指定返回数量 > XRANGE teststream - + count 1 1) 1) "1641651260618-0" 2) 1) "name" 2) "redis" 3) "version" 4) "6.0" # 指定时间参数 > XRANGE teststream 1641651260618 + 1) 1) "1641651260618-0" 2) 1) "name" 2) "redis" 3) "version" 4) "6.0" 2) 1) "1641651260618-1" 2) 1) "name" 2) "java" 3) "version" 4) "1.8" # 指定开始结束ID参数 > XRANGE teststream 1641651260618-0 1641651260618-0 1) 1) "1641651260618-0" 2) 1) "name" 2) "redis" 3) "version" 4) "6.0" # XREVRANGE命令逆序遍历 > XREVRANGE teststream + - 1) 1) "1641651260618-1" 2) 1) "name" 2) "java" 3) "version" 4) "1.8" 2) 1) "1641651260618-0" 2) 1) "name" 2) "redis" 3) "version" 4) "6.0"读取消息命令XREAD
XREAD命令读取消息,可以阻塞或非阻塞,命令格式:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...] COUNT :数量 BLOCK :可选,阻塞毫秒数,没有设置就是非阻塞模式 key :stream名称 ID :消息 ID,$表示stream中最大的ID,即读取最新的消息
XREAD命令演示,非阻塞方式
> XREAD STREAMS teststream 0 1) 1) "teststream" 2) 1) 1) "1641651260618-0" 2) 1) "name" 2) "redis" 3) "version" 4) "6.0" 2) 1) "1641651260618-1" 2) 1) "name" 2) "java" 3) "version" 4) "1.8"
XREAD命令演示,阻塞方式
> XREAD BLOCK 1000000 STREAMS teststream $ 1) 1) "teststream" 2) 1) 1) "1641653381688-0" 2) 1) "name" 2) "mysql" 3) "version" 4) "8.0" (29.36s) # 另开一个终端,push消息 > xadd teststream * name mysql version 8.0 "1641653381688-0"消费组模式相关命令 创建消费组XGROUP CREATE
XGROUP [CREATE key groupname ID|$ [MKSTREAM]] [SETID key groupname ID|$] [DESTROY key groupname] [CREATEConSUMER key groupname consume key :stream名称,如果不存在就创建 groupname :消费组名 ID : 指定开始消费ID,或使用 $表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略
命令演示
# 创建消费组,从头开始消费 > XGROUP CREATE teststream testgroup1 0 OK # 创建消费组,消费最新消息 > XGROUP CREATE teststream testgroup2 $ OK读取消费组消息XREADGROUP
XREADGROUP指定消费组消费消息,与XREAD命令一样,XREADGROUP支持阻塞模式,命令格式:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...] group :消费组名 consumer :消费者名 COUNT : 读取数量 BLOCK : 阻塞毫秒数 key : stream名称 ID : 消息 ID,特殊ID > 表示读取未消费过的消息
命令演示
> XREADGROUP GROUP testgroup1 Alice STREAMS teststream > 1) 1) "teststream" 2) 1) 1) "1641651260618-0" 2) 1) "name" 2) "redis" 3) "version" 4) "6.0" 2) 1) "1641651260618-1" 2) 1) "name" 2) "java" 3) "version" 4) "1.8" 3) 1) "1641653381688-0" 2) 1) "name" 2) "mysql" 3) "version" 4) "8.0"全部命令
由于篇幅原因,这里不对全部命令一一介绍了,有需要的可以去redis官网自行查阅
XGROUP [
CREATE
SETID
DELCONSUMER
DESTROY
]
消费者相关 *** 作命令XINFO [
GROUPS
STREAM
]
查看流和消费者组的相关信息stream使用radix(基数树)作为存储结构,radix是一树形结构,适合查找和存储,radix特点:
树由叶子结点和非叶子结点组成同级结点按照顺序排列结点的key是由从根节点到结点的所有key组合而成如果结点为空且只有一个子结点,需要与子结点合并
radix查找和遍历 radix查找过程 radix遍历radix树的遍历与二叉树类似,可以参考二叉树的遍历方式。
radix插入radix树插入的各种场景
radix删除删除是插入的逆 *** 作,也需要区分场景,这里不详细介绍。
redis相关实现原理 radix相关结构定义rax结构定义radix树结构,结点中raxNode.data保存数据,包含key和value。
//radix根结点定义 typedef struct rax { raxNode *head; uint64_t numele; uint64_t numnodes; } rax; // #define RAX_NODE_MAX_SIZE ((1<<29)-1) typedef struct raxNode { uint32_t iskey:1; uint32_t isnull:1; uint32_t iscompr:1; uint32_t size:29; unsigned char data[]; } raxNode;stream结构定义
stream相关结构定义,内部使用rax存储数据。
typedef struct streamID { uint64_t ms; uint64_t seq; } streamID; typedef struct stream { rax *rax; uint64_t length; streamID last_id; rax *cgroups; } stream; typedef struct streamIterator { stream *stream; streamID master_id; uint64_t master_fields_count; unsigned char *master_fields_start; unsigned char *master_fields_ptr; int entry_flags; int rev; uint64_t start_key[2]; uint64_t end_key[2]; raxIterator ri; unsigned char *lp; unsigned char *lp_ele; unsigned char *lp_flags; unsigned char field_buf[LP_INTBUF_SIZE]; unsigned char value_buf[LP_INTBUF_SIZE]; } streamIterator;listpack紧凑列表
除了radix,stream中还有一种数据结构:紧凑列表,用于存储结点数据。从名字就可以看出,紧凑列表是一种结构非常紧凑的数据结构,由于篇幅原因这里不详细介绍了。感兴趣的同学可以研究下。
消费组功能实现消费组相关功能也是通过radix结构实现,其中streamCG结构保存消费组相关信息。
typedef struct streamCG { streamID last_id; rax *pel; rax *consumers; } streamCG; typedef struct streamConsumer { mstime_t seen_time; sds name; rax *pel; } streamConsumer; typedef struct streamNACK { mstime_t delivery_time; uint64_t delivery_count; streamConsumer *consumer; } streamNACK;总结
截至目前为止,stream类型应该还没有在各大厂应用,Kafka的地位仍然稳固。当然多一种选择也没什么坏处,如果你有消息队列相关需求,但又觉得Kafka有点笨重,不妨试一下stream。即插即用,简单方便。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)