在开始使用消息分组之前,我们必须手动创建分组才行,以下是几个和 Stream 分组有关的命令,我们先来学习一下它的使用。
消息分组命令创建消费者群组
127.0.0.1:6379> xgroup create mq group1 0-0 OK
相关语法:
xgroup create stream-key group-key ID
其中:
- mq 为 Stream 的 key;
- group1 为分组的名称;
- 0-0 表示从第一条消息开始读取。
如果要从当前最后一条消息向后读取,使用 $ 即可,命令如下:
127.0.0.1:6379> xgroup create mq group2 $ OK
读取消息
127.0.0.1:6379> xreadgroup group group1 c1 count 1 streams mq > 1) 1) "mq" 2) 1) 1) "1580959593553-0" 2) 1) "name" 2) "redis" 3) "age" 4) "10"
相关语法:
xreadgroup group group-key consumer-key streams stream-key
其中:
- > 表示读取下一条消息;
- group1 表示分组名称;
- c1 表示 consumer(消费者)名称。
xreadgroup 命令和 xread 使用类似,也可以设置阻塞读取,命令如下:
127.0.0.1:6379> xreadgroup group group1 c2 streams mq > 1) 1) "mq" 2) 1) 1) "1580959606181-0" 2) 1) "name" 2) "java" 3) "age" 4) "20" 127.0.0.1:6379> xreadgroup group group1 c2 streams mq > (nil) #队列中的消息已经被读取完 127.0.0.1:6379> xreadgroup group group1 c1 count 1 block 0 streams mq > #阻塞读取
此时打开另一个命令行创建使用 xadd 添加一条消息,阻塞命令执行结果如下:
127.0.0.1:6379> xreadgroup group group1 c1 count 1 block 0 streams mq > 1) 1) "mq" 2) 1) 1) "1580961475368-0" 2) 1) "name" 2) "sql" 3) "age" 4) "20" (86.14s)
消息消费确认
接收到消息之后,我们要手动确认一下(ack),命令如下:
127.0.0.1:6379> xack mq group1 1580959593553-0 (integer) 1
相关语法:
xack key group-key ID [ID ...]
消费确认增加了消息的可靠性,一般在业务处理完成之后,需要执行 ack 确认消息已经被消费完成,整个流程的执行如下图所示:
查询未确认的消费队列
127.0.0.1:6379> xpending mq group1 1) (integer) 1 #未确认(ack)的消息数量为 1 条 2) "1580994063971-0" 3) "1580994063971-0" 4) 1) 1) "c1" 2) "1" 127.0.0.1:6379> xack mq group1 1580994063971-0 #消费确认 (integer) 1 127.0.0.1:6379> xpending mq group1 1) (integer) 0 #没有未确认的消息 2) (nil) 3) (nil) 4) (nil)
xinfo 查询相关命令
1. 查询流信息
127.0.0.1:6379> xinfo stream mq 1) "length" 2) (integer) 2 #队列中有两个消息 3) "radix-tree-keys" 4) (integer) 1 5) "radix-tree-nodes" 6) (integer) 2 7) "groups" 8) (integer) 1 #一个消费分组 9) "last-generated-id" 10) "1580959606181-0" 11) "first-entry" 12) 1) "1580959593553-0" 2) 1) "name" 2) "redis" 3) "age" 4) "10" 13) "last-entry" 14) 1) "1580959606181-0" 2) 1) "name" 2) "java" 3) "age" 4) "20"
相关语法:
xinfo stream stream-key
2. 查询消费组消息
127.0.0.1:6379> xinfo groups mq 1) 1) "name" 2) "group1" #消息分组名称 3) "consumers" 4) (integer) 1 #一个消费者客户端 5) "pending" 6) (integer) 1 #一个未确认消息 7) "last-delivered-id" 8) "1580959593553-0" #读取的最后一条消息 ID
相关语法:
xinfo groups stream-key
3. 查看消费者组成员信息
127.0.0.1:6379> xinfo consumers mq group1 1) 1) "name" 2) "c1" #消费者名称 3) "pending" 4) (integer) 0 #未确认消息 5) "idle" 6) (integer) 481855
相关语法:
xinfo consumers stream group-key
删除消费者
127.0.0.1:6379> xgroup delconsumer mq group1 c1 (integer) 1
相关语法:
xgroup delconsumer stream-key group-key consumer-key
删除消费组
127.0.0.1:6379> xgroup destroy mq group1 (integer) 1
相关语法:
xgroup destroy stream-key group-key代码实战
接下来我们使用 python 来实现 Stream 分组消息队列,代码如下:
# 生产者 from redis import StrictRedis redis_cli = StrictRedis(host="", port=xx, password="xx", db=xx, decode_responses=True) def pub(): # if redis_cli.exists("mq_group"): # redis_cli.delete("mq_group") # return for i in range(1000): stream_id = redis_cli.xadd("mq_group", {"name": "mrli", "province": "beijing", "flag": i}) print("%s add success! " % stream_id) redis_cli.xgroup_create("mq_group", "gb") print("create group gb success!")
结果(截取一小部分)
1640856840606-0 add success!
1640856840656-0 add success!
1640856840715-0 add success!
1640856840778-0 add success!
1640856840841-0 add success!
1640856840908-0 add success!
1640856840974-0 add success!
1640856841048-0 add success!
1640856841122-0 add success!
# 消费者 from redis import StrictRedis redis_cli = StrictRedis(host="", port=xx, password="xx", db=xx, decode_responses=True) import time def sub1(): while True: msg = redis_cli.xreadgroup("gb", "consumer1", {"mq_group": ">"}, count=2) print("sub1>>>>>> msg is %s: " % msg) time.sleep(3) def sub2(): while True: msg = redis_cli.xreadgroup("gb", "consumer2", {"mq_group": ">"}, count=2) print("sub2>>>>>> msg is %s: " % msg) time.sleep(3)
结果:(一部分)
sub2>>>>>> msg is [['mq_group', [('1640856814714-0', {'name': 'mrli', 'province': 'beijing', 'flag': '687'}), ('1640856815004-0', {'name': 'mrli', 'province': 'beijing', 'flag': '688'})]]]:
sub1>>>>>> msg is [['mq_group', [('1640856814632-0', {'name': 'mrli', 'province': 'beijing', 'flag': '685'}), ('1640856814672-0', {'name': 'mrli', 'province': 'beijing', 'flag': '686'})]]]:
sub2>>>>>> msg is [['mq_group', [('1640856815045-0', {'name': 'mrli', 'province': 'beijing', 'flag': '689'}), ('1640856815100-0', {'name': 'mrli', 'province': 'beijing', 'flag': '690'})]]]:
sub1>>>>>> msg is [['mq_group', [('1640856815386-0', {'name': 'mrli', 'province': 'beijing', 'flag': '691'}), ('1640856815444-0', {'name': 'mrli', 'province': 'beijing', 'flag': '692'})]]]:
可以看出,同一个分组内的多个 consumer 会读取到不同消息,不同的 consumer 不会读取到分组内的同一条消息。
小结本文我们介绍了 Stream 分组的相关知,通过本文我们也知道了,一个分组内的多个 consumer 会轮询收到消息队列的消息,并且不会出现一个消息被多个 consumer 读取的情况。
如果你看了本文的知识还是觉得没看懂,那是因为你没有结合实践去理解,所以如果对本文还有疑问,跟着本文一步一步实践起来吧。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)