Redis源码阅读(二)- stream与radix树

Redis源码阅读(二)- stream与radix树,第1张

Redis源码阅读(二)- stream与radix树

目录

stream介绍

stream *** 作命令

追加消息XADD

范围查询XRANGE,XREVRANGE

读取消息命令XREAD

 消费组模式相关命令

创建消费组XGROUP CREATE

 读取消费组消息XREADGROUP

全部命令

stream实现原理

radix查找和遍历

radix查找过程       

radix遍历

radix插入

radix删除

redis相关实现原理

radix相关结构定义

stream结构定义

listpack紧凑列表

消费组功能实现

总结


stream介绍

        stream是redis5.0引入的新数据类型,是消息队列的一种实现方式,实现了完整的消息队列功能。主要特点包括:

    stream 提供了消息的持久化和主备复制功能,保证消息不会丢失。阻塞 *** 作,允许消费者等待生产者向流中添加新数据。消费组功能(类似Kafka实现),允许一组客户端协作使用同一消息流的不同部分。

   

stream *** 作命令 追加消息XADD

XADD命令向stream中追加消息,如果stream不存在,则创建。消息格式:

XADD key ID field value [feild value ...]

key: stream名称,如果不存在则创建
ID: 消息ID,*表示redis自动创建ID
field value: 值,K-V格式

执行XADD命令插入内容,返回消息ID,消息ID包含两部分${时间戳}-${序列}

> XADD teststream * name redis version 6.0
"1641651260618-0"

执行XADD命令,指定消息ID

> XADD teststream 1641651260618-1 name java version 1.8
"1641651260618-1"
范围查询XRANGE,XREVRANGE

XRANGE命令实现范围查询,XREVRANGE逆序查询,命令格式:

XRANGE|XREVRANGE key start end [COUNT count]

key:straem名称
start:开始ID,- 表示最小
end:结束ID,+ 表示最大
count:返回数量

 命令演示

> XRANGE teststream - + 
1) 1) "1641651260618-0"
   2) 1) "name"
      2) "redis"
      3) "version"
      4) "6.0"
2) 1) "1641651260618-1"
   2) 1) "name"
      2) "java"
      3) "version"
      4) "1.8"

# 指定返回数量
> XRANGE teststream - + count 1
1) 1) "1641651260618-0"
   2) 1) "name"
      2) "redis"
      3) "version"
      4) "6.0"

# 指定时间参数
> XRANGE teststream 1641651260618 + 
1) 1) "1641651260618-0"
   2) 1) "name"
      2) "redis"
      3) "version"
      4) "6.0"
2) 1) "1641651260618-1"
   2) 1) "name"
      2) "java"
      3) "version"
      4) "1.8"

# 指定开始结束ID参数
> XRANGE teststream 1641651260618-0 1641651260618-0 
1) 1) "1641651260618-0"
   2) 1) "name"
      2) "redis"
      3) "version"
      4) "6.0"

# XREVRANGE命令逆序遍历
> XREVRANGE teststream + -
1) 1) "1641651260618-1"
   2) 1) "name"
      2) "java"
      3) "version"
      4) "1.8"
2) 1) "1641651260618-0"
   2) 1) "name"
      2) "redis"
      3) "version"
      4) "6.0"
读取消息命令XREAD

XREAD命令读取消息,可以阻塞或非阻塞,命令格式:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

COUNT :数量
BLOCK :可选,阻塞毫秒数,没有设置就是非阻塞模式
key :stream名称
ID :消息 ID,$表示stream中最大的ID,即读取最新的消息

XREAD命令演示,非阻塞方式

> XREAD STREAMS teststream 0
1) 1) "teststream"
   2) 1) 1) "1641651260618-0"
         2) 1) "name"
            2) "redis"
            3) "version"
            4) "6.0"
      2) 1) "1641651260618-1"
         2) 1) "name"
            2) "java"
            3) "version"
            4) "1.8"

XREAD命令演示,阻塞方式

> XREAD BLOCK 1000000 STREAMS teststream $
1) 1) "teststream"
   2) 1) 1) "1641653381688-0"
         2) 1) "name"
            2) "mysql"
            3) "version"
            4) "8.0"
(29.36s)

# 另开一个终端,push消息
> xadd teststream * name mysql version 8.0
"1641653381688-0"
 消费组模式相关命令 创建消费组XGROUP CREATE
XGROUP [CREATE key groupname ID|$ [MKSTREAM]] [SETID key groupname ID|$] [DESTROY key groupname] [CREATEConSUMER key groupname consume

key :stream名称,如果不存在就创建
groupname :消费组名
ID : 指定开始消费ID,或使用 $表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略

 命令演示

# 创建消费组,从头开始消费
> XGROUP CREATE teststream testgroup1 0
OK

# 创建消费组,消费最新消息
> XGROUP CREATE teststream testgroup2 $
OK
 读取消费组消息XREADGROUP

XREADGROUP指定消费组消费消息,与XREAD命令一样,XREADGROUP支持阻塞模式,命令格式:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

group :消费组名
consumer :消费者名
COUNT : 读取数量
BLOCK : 阻塞毫秒数
key : stream名称
ID : 消息 ID,特殊ID > 表示读取未消费过的消息

命令演示

> XREADGROUP GROUP testgroup1 Alice STREAMS teststream >
1) 1) "teststream"
   2) 1) 1) "1641651260618-0"
         2) 1) "name"
            2) "redis"
            3) "version"
            4) "6.0"
      2) 1) "1641651260618-1"
         2) 1) "name"
            2) "java"
            3) "version"
            4) "1.8"
      3) 1) "1641653381688-0"
         2) 1) "name"
            2) "mysql"
            3) "version"
            4) "8.0"
全部命令

由于篇幅原因,这里不对全部命令一一介绍了,有需要的可以去redis官网自行查阅

命令说明XTRIM 对流进行修剪,限制长度,ID较小的项目可能会被删除XDEL删除消息XLEN获取消息长度XRANGE | XREVRANGE范围查询XREAD读取消息消费组相关

 XGROUP [

        CREATE

        SETID

        DELCONSUMER

        DESTROY

]

消费者相关 *** 作命令 XREADGROUP GROUP读取消费组中的消息XACK将消息标记为"已处理"XPENDING显示待处理消息的相关信息XCLAIM改变待处理消息的所有权

XINFO [

        GROUPS

        STREAM

]

查看流和消费者组的相关信息 stream实现原理

        stream使用radix(基数树)作为存储结构,radix是一树形结构,适合查找和存储,radix特点:

树由叶子结点和非叶子结点组成同级结点按照顺序排列结点的key是由从根节点到结点的所有key组合而成如果结点为空且只有一个子结点,需要与子结点合并

radix查找和遍历 radix查找过程       

radix遍历

        radix树的遍历与二叉树类似,可以参考二叉树的遍历方式。

radix插入

        radix树插入的各种场景       

radix删除

        删除是插入的逆 *** 作,也需要区分场景,这里不详细介绍。

redis相关实现原理 radix相关结构定义

        rax结构定义radix树结构,结点中raxNode.data保存数据,包含key和value。

//radix根结点定义
typedef struct rax {
    raxNode *head;
    uint64_t numele;
    uint64_t numnodes;
} rax;

//
#define RAX_NODE_MAX_SIZE ((1<<29)-1)
typedef struct raxNode {
    uint32_t iskey:1;     
    uint32_t isnull:1;    
    uint32_t iscompr:1;   
    uint32_t size:29;     
    
    unsigned char data[];
} raxNode;
stream结构定义

        stream相关结构定义,内部使用rax存储数据。    

typedef struct streamID {
    uint64_t ms;        
    uint64_t seq;       
} streamID;

typedef struct stream {
    rax *rax;               
    uint64_t length;        
    streamID last_id;       
    rax *cgroups;           
} stream;


typedef struct streamIterator {
    stream *stream;         
    streamID master_id;     
    uint64_t master_fields_count;       
    unsigned char *master_fields_start; 
    unsigned char *master_fields_ptr;   
    int entry_flags;                    
    int rev;                
    uint64_t start_key[2];  
    uint64_t end_key[2];    
    raxIterator ri;         
    unsigned char *lp;      
    unsigned char *lp_ele;  
    unsigned char *lp_flags; 
    
    unsigned char field_buf[LP_INTBUF_SIZE];
    unsigned char value_buf[LP_INTBUF_SIZE];
} streamIterator;
listpack紧凑列表

        除了radix,stream中还有一种数据结构:紧凑列表,用于存储结点数据。从名字就可以看出,紧凑列表是一种结构非常紧凑的数据结构,由于篇幅原因这里不详细介绍了。感兴趣的同学可以研究下。

消费组功能实现

        消费组相关功能也是通过radix结构实现,其中streamCG结构保存消费组相关信息。

typedef struct streamCG {
    streamID last_id;       
    rax *pel;               
    rax *consumers;         
} streamCG;


typedef struct streamConsumer {
    mstime_t seen_time;         
    sds name;                   
    rax *pel;                   
} streamConsumer;


typedef struct streamNACK {
    mstime_t delivery_time;     
    uint64_t delivery_count;    
    streamConsumer *consumer;   
} streamNACK;
总结

        截至目前为止,stream类型应该还没有在各大厂应用,Kafka的地位仍然稳固。当然多一种选择也没什么坏处,如果你有消息队列相关需求,但又觉得Kafka有点笨重,不妨试一下stream。即插即用,简单方便。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5705965.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存