早期,基于Redis实现轻量化的消息队列有3种实现方式,分别是基于List的LPUSH+BRPOP (BRPOPLPUSH)的实现、PUB/SUB发布订阅模式以及基于Sorted-Set实现方式,但是,这三种模式分别有其相应的缺点。
实现方式 | 缺点 |
---|---|
基于List的LPUSH+BRPOP | 做消费者确认ACK比较麻烦,不能保证消费者消费消息后是否成功处理的问题,通常需要维护一个额外的列表,且不支持重复消费和分组消费。 |
PUB/SUB发布订阅模式 | 若客户端不在线时发布消息会丢失,且消费者客户端出现消息积压,到一定程度,会被强制断开,导致消息意外丢失,可见PUB/SUB模式不适合做消息存储,消息积压类的业务。 |
基于Sorted-Set实现 | 由于集合的特点,不允许重复消息,而且消息ID确定有误的话会导致消息的顺序出错。 |
Redis5.0中发布的Stream类型,也用来实现典型的消息队列。该Stream类型的出现,几乎满足了消息队列具备的全部内容,包括但不限于:
消息ID的序列化生成消息遍历消息的阻塞和非阻塞读取消息的分组消费未完成消息的处理消息队列监控 2、相关命令解释本文基于Redis6.2版本进行说明,注意不同版本的Redis可能有些命令的部分参数会存在差异,但不影响整体使用。Stream相关的命令主要可以分为2大类,一类是与消息队列相关的命令,另一类是与消费者组相关的命令。
与消息队列相关的命令:
与消费者组相关的命令:
XGROUPXREADGROUPXPENDINGXACKXCLAIMXINFO 2.1、XADD
解释:XADD命令用于往某个消息队列中添加消息。
解释:XREAD命令用于从某个消息队列中读取消息,分为阻塞模式和非阻塞模式。
解释:XDEL命令用于进行消息删除,注意XACK进行消息确认只是进行了标记,消息还是会存在消息队列中,并没有删除。使用XDEL命令才会将消息从消息队列中删除。
解释:XLEN命令用于获取消息队列的长度。
解释:XRANGE命令用于获取消息队列中的消息,和XREAD有点类似,XREAD只能指定开始消息ID(不包含),XRANGE可以指定开始和结束消息ID。另外还有个XREVRANGE命令用于反向获取消息列表,与XRANGE不同的是消息ID是从大到小。
解释:XTRIM命令用于对消息队列进行修剪,限制长度。
解释:XGROUP CREATE命令用于创建消费者组。
CREATE:关键字,表示创建消费者组命令。key:表示消息队列的名称。groupname:表示要创建的消费者组名称。ID|$:表示该消费者组中的消费者将从哪个消息ID开始消费消息,ID表示指定的消息ID,$表示只消费新产生的消息。[MKSTREAM]:可选参数,表示在创建消费者组时,如果指定的消息队列不存在,会自动创建。但是这种方式创建的消息队列其长度为0。 2.7.2 SETID解释:XGROUP SETID命令用于设置消费者组中下一条要读取的消息ID。
SETID:关键字,表示设置消费者组中下一条要读取的消息ID命令key:表示消息队列的名称。groupname:表示消费者组名称。ID|$:表示指定具体的消息ID,0可以表示重新开始处理消费者组中的所有消息,$表示只处理消费者组中新产生的消息。 2.7.3 DESTROY解释:XGROUP DESTROY命令用于销毁消费者组。
DESTROY:关键字,表示销毁消费者组命令。key:表示消息队列的名称。groupname:表示要销毁的消费者组名称。 2.7.4 CREATECONSUMER解释:XGROUP CREATECONSUMER命令用于创建消费者。
CREATECONSUMER:关键字,表示创建消费者命令。key:表示消息队列的名称。groupname:表示要创建的消费者所属的消费者组名称。consumername:表示要创建的消费者名称。 2.7.5 DELCONSUMER解释:XGROUP DELCONSUMER命令用于删除消费者。
DELCONSUMER:关键字,表示删除消费者命令。key:表示消息队列的名称。groupname:表示要删除的消费者所属的消费者组名称。consumername:表示要删除的消费者名称。 2.8、XREADGROUP
解释:XREADGROUP命令用于分组消费消息。
解释:XPENDING命令用于获取等待队列,等待队列中保存的是消费者组内被读取,但是还未完成处理的消息,也就是还没有ACK的消息。
解释:XACK命令用于进行消息确认。
解释:XCLAIM命令用于进行消息转移,当某个等待队列中的消息长时间没有被处理(没有ACK)的时候,可以用XCLAIM命令将其转移到其他消费者的等待列表中。
解释:XINFO CONSUMERS命令用于监控消费者。
CONSUMERS:关键字,表示查看消费者信息命令。key:表示消息队列的名称。groupname:表示消费者组名称。 2.12.2、GROUPS解释:XINFO GROUPS命令用于监控消费者组。
GROUPS:关键字,表示查看消费者组信息命令。key:表示消息队列的名称。 2.12.3、STREAM解释:XINFO STREAM命令用于监控消息队列。
STREAM:关键字,表示查看消息队列信息命令。key:表示消息队列的名称。 2.12.4、HELP解释:XINFO HELP 命令用于获取帮助。
HELP:关键字,表示获取帮助信息命令。 3、XADD/XREAD模式和消费者组模式 3.1、XADD/XREAD模式普通场景下,生产者生产消息,消费者消费消息,多个消费者可以重复的消费相同的消息,比较类似常规的发布订阅模式,订阅了某个消息队列的消费者,能够获取到生产者投放的消息。当然消费者可以采用阻塞或者非阻塞的模式进行读取消息,业务处理等。一个典型的阻塞模式使用方式如下:
#Producer
127.0.0.1:6379> XADD test-mq * key1 value1
"1622605684330-0"
127.0.0.1:6379>
127.0.0.1:6379> XADD test-mq * key2 value2
"1622605691371-0"
127.0.0.1:6379>
127.0.0.1:6379> XADD test-mq * key3 value3
"1622605698309-0"
127.0.0.1:6379>
127.0.0.1:6379> XADD test-mq * key4 value4
"1622605707261-0"
127.0.0.1:6379>
127.0.0.1:6379> XADD test-mq * key5 value5 key6 value6
"1622605714081-0"
127.0.0.1:6379>
#Consumer
127.0.0.1:6379> XREAD BLOCK 10000 STREAMS test-mq $
1) 1) "test-mq"
2) 1) 1) "1622605684330-0"
2) 1) "key1"
2) "value1"
(3.32s)
127.0.0.1:6379>
127.0.0.1:6379> XREAD BLOCK 10000 STREAMS test-mq $
1) 1) "test-mq"
2) 1) 1) "1622605691371-0"
2) 1) "key2"
2) "value2"
(2.88s)
127.0.0.1:6379>
127.0.0.1:6379> XREAD BLOCK 10000 STREAMS test-mq $
1) 1) "test-mq"
2) 1) 1) "1622605698309-0"
2) 1) "key3"
2) "value3"
(3.37s)
127.0.0.1:6379>
127.0.0.1:6379> XREAD BLOCK 10000 STREAMS test-mq $
1) 1) "test-mq"
2) 1) 1) "1622605707261-0"
2) 1) "key4"
2) "value4"
(3.75s)
127.0.0.1:6379>
127.0.0.1:6379> XREAD BLOCK 10000 STREAMS test-mq $
1) 1) "test-mq"
2) 1) 1) "1622605714081-0"
2) 1) "key5"
2) "value5"
3) "key6"
4) "value6"
(2.47s)
127.0.0.1:6379>
说明:使用阻塞模式的XREAD,XREAD BLOCK 10000 STREAMS test-mq $,最后一个参数$表示读取最新的消息,所以需要先启动消费者,阻塞等待消息,然后生产者添加消息,消费者接受消息完成处理。
3.2、消费者组模式在有些场景下,我们需要多个消费者配合来消费同一个消息队列中的消息,而不是多个消费者重复的消息,以此来提高消息处理的效率。这种模式也就是消费者组模式了。消费者组模式如下图所示:
下面是Redis Stream的结构图:
上图解释:
代码结构如下:
connHandle.go
package common
import (
"fmt"
"github.com/gomodule/redigo/redis"
"log"
"time"
)
func NewClient(opt RedisConnOpt) *RedisStreamMQClient {
return &RedisStreamMQClient{
RedisConnOpt: opt,
ConnPool: newPool(opt),
}
}
func newPool(opt RedisConnOpt) *redis.Pool{
return &redis.Pool{
MaxIdle: 3,
IdleTimeout: 240*time.Second,
MaxActive: 10,
Wait: true,
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", fmt.Sprintf("%s:%d", opt.Host, opt.Port))
if err != nil {
log.Fatalf("Redis.Dial: %v", err)
return nil, err
}
/*
if _, err := c.Do("AUTH", opt.Password); err != nil {
c.Close()
log.Fatalf("Redis.AUTH: %v", err)
return nil, err
}
*/
if _, err := c.Do("SELECT", opt.Index); err != nil {
c.Close()
log.Fatalf("Redis.SELECT: %v", err)
return nil, err
}
return c, nil
},
}
}
define.go
package common
import (
"github.com/gomodule/redigo/redis"
)
const (
STREAM_MQ_MAX_LEN = 500000 //消息队列最大长度
READ_MSG_AMOUNT = 1000 //每次读取消息的条数
READ_MSG_BLOCK_SEC = 30 //阻塞读取消息时间
TEST_STREAM_KEY = "TestStreamKey1"
)
type RedisConnOpt struct {
Enable bool
Host string
Port int32
Password string
Index int32
TTL int32
}
type RedisStreamMQClient struct {
RedisConnOpt RedisConnOpt
ConnPool *redis.Pool
StreamKey string //stream对应的key值
GroupName string //消费者组名称
ConsumerName string //消费者名称
}
//等待列表中的消息属性
type PendingMsgInfo struct {
MsgId string //消息ID
BelongConsumer string //所属消费者
IdleTime int //已读取未消费时长
ReadCount int //消息被读取次数
}
// 消息队列信息
type StreamMQInfo struct {
Length int64 // 消息队列长度
RedixTreeKeys int64 // 基数树key数
RedixTreeNodes int64 // 基数树节点数
LastGeneratedId string // 最后一个生成的消息ID
Groups int64 // 消费组个数
FirstEntry *map[string]map[string]string // 第一个消息体
LastEntry *map[string]map[string]string // 最后一个消息体
}
// 消费组信息
type GroupInfo struct {
Name string // 消费组名称
Consumers int64 // 组内消费者个数
Pending int64 // 组内所有消费者的等待列表总长度
LastDeliveredId string // 组内最后一条被消费的消息ID
}
// 消费者信息
type ConsumerInfo struct {
Name string // 消费者名称
Pending int64 // 等待队列长度
Idle int64 // 消费者空闲时间(毫秒)
}
msgHandle.go
package common
import (
"fmt"
"github.com/gomodule/redigo/redis"
)
// PutMsg 添加消息
func (mqClient *RedisStreamMQClient) PutMsg(streamKey string, msgKey string, msgValue string) (strMsgId string, err error){
conn := mqClient.ConnPool.Get()
defer conn.Close()
//*表示由Redis自己生成消息ID,设置MAXLEN可以保证消息队列的长度不会一直累加
strMsgId, err = redis.String(conn.Do("XADD",
streamKey, "MAXLEN", "=", STREAM_MQ_MAX_LEN, "*", msgKey, msgValue))
if err != nil {
fmt.Println("XADD failed, err: ", err)
return "", err
}
//fmt.Println("Reply Msg Id:", strMsgId)
return strMsgId, nil
}
// PutMsgBatch 批量添加消息
func (mqClient *RedisStreamMQClient) PutMsgBatch(streamKey string, msgMap map[string]string) (msgId string, err error){
if len(msgMap) <= 0 {
fmt.Println("msgMap len <= 0, no need put")
return msgId, nil
}
conn := mqClient.ConnPool.Get()
defer conn.Close()
vecMsg := make([]string, 0)
for msgKey, msgValue := range msgMap {
vecMsg = append(vecMsg, msgKey)
vecMsg = append(vecMsg, msgValue)
}
msgId, err = redis.String(conn.Do("XADD",
redis.Args{streamKey, "MAXLEN", "=", STREAM_MQ_MAX_LEN, "*"}.AddFlat(vecMsg)...))
if err != nil {
fmt.Println("XADD failed, err: ", err)
return "", err
}
fmt.Println("Reply Msg Id:", msgId)
return msgId, nil
}
func (mqClient *RedisStreamMQClient) ConvertVecInterface(vecReply []interface{}) (msgMap map[string]map[string][]string){
msgMap = make(map[string]map[string][]string, 0)
for keyIndex := 0; keyIndex < len(vecReply); keyIndex++ {
var keyInfo = vecReply[keyIndex].([]interface{})
var key = string(keyInfo[0].([]byte))
var idList = keyInfo[1].([]interface{})
//fmt.Println("StreamKey:", key)
msgInfoMap := make(map[string][]string, 0)
for idIndex := 0; idIndex < len(idList); idIndex++ {
var idInfo = idList[idIndex].([]interface{})
var id = string(idInfo[0].([]byte))
var fieldList = idInfo[1].([]interface{})
vecMsg := make([]string, 0)
for msgIndex := 0; msgIndex < len(fieldList); msgIndex = msgIndex + 2 {
var msgKey = string(fieldList[msgIndex].([]byte))
var msgVal = string(fieldList[msgIndex+1].([]byte))
vecMsg = append(vecMsg, msgKey)
vecMsg = append(vecMsg, msgVal)
//fmt.Println("MsgId:", id, "MsgKey:", msgKey, "MsgVal:", msgVal)
}
msgInfoMap[id] = vecMsg
}
msgMap[key] = msgInfoMap
}
return
}
// GetMsgBlock 阻塞方式读取消息
func (mqClient *RedisStreamMQClient) GetMsgBlock(blockSec int32, msgAmount int32, streamKey string) (
msgMap map[string]map[string][]string, err error){
conn := mqClient.ConnPool.Get()
defer conn.Close()
//在阻塞模式中,可以使用$,表示最新的消息ID(在非阻塞模式下$无意义)
reply, err := redis.Values(conn.Do("XREAD",
"COUNT", msgAmount, "BLOCK", blockSec*1000, "STREAMS", streamKey, "$"))
if err != nil && err != redis.ErrNil{
fmt.Println("BLOCK XREAD failed, err: ", err)
return nil, err
}
//返回消息转换
msgMap = mqClient.ConvertVecInterface(reply)
fmt.Println("MsgMap:", msgMap)
return msgMap, nil
}
// GetMsg 非阻塞方式读取消息
func (mqClient *RedisStreamMQClient) GetMsg(msgAmount int32, streamKey string, beginMsgId string) (
msgMap map[string]map[string][]string, err error) {
conn := mqClient.ConnPool.Get()
defer conn.Close()
//从消息ID=beginMsgId往后开始读取,不包含beginMsgId的消息
reply, err := redis.Values(conn.Do("XREAD", "COUNT", msgAmount, "STREAMS", streamKey, beginMsgId))
if err != nil && err != redis.ErrNil{
fmt.Println("XREAD failed, err: ", err)
return nil, err
}
//返回消息转换
msgMap = mqClient.ConvertVecInterface(reply)
fmt.Println("MsgMap:", msgMap)
return msgMap, nil
}
// DelMsg 删除消息
func (mqClient *RedisStreamMQClient) DelMsg(streamKey string, vecMsgId []string) (err error){
if len(vecMsgId) <= 0 {
fmt.Println("vecMsgId len <= 0, no need del")
return nil
}
conn := mqClient.ConnPool.Get()
defer conn.Close()
for _, msgId := range vecMsgId {
_, err := redis.Int(conn.Do("XDEL", streamKey, msgId))
if err != nil {
fmt.Println("XDEL failed, msgId:",msgId, "err:", err)
}
}
return nil
}
// ReplyAck 返回ACK
func (mqClient *RedisStreamMQClient) ReplyAck(streamKey string, groupName string, vecMsgId []string) error {
if len(vecMsgId) <= 0 {
fmt.Println("vecMsgId len <= 0, no need ack")
return nil
}
conn := mqClient.ConnPool.Get()
defer conn.Close()
//fmt.Println("Start ReplyAck, vecMsgId:", vecMsgId)
_, err := redis.Int(conn.Do("XACK", redis.Args{streamKey, groupName}.AddFlat(vecMsgId)...))
if err != nil {
fmt.Println("XACK failed, msgId:",vecMsgId, "err:", err)
return err
}
//fmt.Println("ReplyAck Success")
return nil
}
// CreateConsumerGroup 创建消费者组
func (mqClient *RedisStreamMQClient) CreateConsumerGroup(streamKey string, groupName string, beginMsgId string) error {
conn := mqClient.ConnPool.Get()
defer conn.Close()
//最后一个参数表示该组从消息ID=beginMsgId往后开始消费,不包含beginMsgId的消息
_, err := redis.String(conn.Do("XGROUP", "CREATE", streamKey, groupName, beginMsgId))
if err != nil {
fmt.Println("XGROUP CREATE Failed. err:", err)
return err
}
return nil
}
// DestroyConsumerGroup 销毁消费者组
func (mqClient *RedisStreamMQClient) DestroyConsumerGroup(streamKey string, groupName string) error {
conn := mqClient.ConnPool.Get()
defer conn.Close()
_, err := redis.String(conn.Do("XGROUP", "DESTROY", streamKey, groupName))
if err != nil {
fmt.Println("XGROUP DESTROY Failed. err:", err)
return err
}
return nil
}
// GetMsgByGroupConsumer 组内消息分配 *** 作,组内每个消费者消费多少消息
func (mqClient *RedisStreamMQClient) GetMsgByGroupConsumer(streamKey string, groupName string,
consumerName string, msgAmount int32)(msgMap map[string]map[string][]string, err error) {
conn := mqClient.ConnPool.Get()
defer conn.Close()
//>代表当前消费者还没读取的消息
reply, err := redis.Values(conn.Do("XREADGROUP",
"GROUP", groupName, consumerName, "COUNT", msgAmount, "STREAMS", streamKey, ">"))
if err != nil && err != redis.ErrNil{
fmt.Println("XREADGROUP failed, err: ", err)
return nil, err
}
//返回消息转换
msgMap = mqClient.ConvertVecInterface(reply)
//fmt.Println("MsgMap:", msgMap)
return msgMap, nil
}
// CreateConsumer 创建消费者
func (mqClient *RedisStreamMQClient) CreateConsumer(streamKey string, groupName string, consumerName string) error {
conn := mqClient.ConnPool.Get()
defer conn.Close()
_, err := redis.String(conn.Do("XGROUP", "CREATECONSUMER", streamKey, groupName, consumerName))
if err != nil {
fmt.Println("XGROUP CREATECONSUMER Failed. err:", err)
return err
}
return nil
}
// DelConsumer 删除消费者
func (mqClient *RedisStreamMQClient) DelConsumer(streamKey string, groupName string, consumerName string) error {
conn := mqClient.ConnPool.Get()
defer conn.Close()
_, err := redis.String(conn.Do("XGROUP", "DELCONSUMER", streamKey, groupName, consumerName))
if err != nil {
fmt.Println("XGROUP DELCONSUMER Failed. err:", err)
return err
}
return nil
}
// GetMsgByGroupConsumer 组内消息分配 *** 作,组内每个消费者消费多少消息
func (mqClient *RedisStreamMQClient) GetMsgBlockByGroupConsumer(blockSec int32, streamKey string, groupName string,
consumerName string, msgAmount int32)(msgMap map[string]map[string][]string, err error) {
conn := mqClient.ConnPool.Get()
defer conn.Close()
//>代表当前消费者还没读取的消息
reply, err := redis.Values(conn.Do("XREADGROUP", "GROUP", groupName,
consumerName, "COUNT", msgAmount, "BLOCK", blockSec*1000, "STREAMS", streamKey, ">"))
if err != nil && err != redis.ErrNil{
fmt.Println("BLOCK XREADGROUP failed, err: ", err)
return nil, err
}
//返回消息转换
msgMap = mqClient.ConvertVecInterface(reply)
fmt.Println("MsgMap:", msgMap)
return msgMap, nil
}
// GetPendingList 获取等待列表(读取但还未消费的消息)
func (mqClient *RedisStreamMQClient) GetPendingList(streamKey string, groupName string, consumerName string, msgAmount int32)(
vecPendingMsg []*PendingMsgInfo, err error) {
conn := mqClient.ConnPool.Get()
defer conn.Close()
reply, err := redis.Values(conn.Do("XPENDING", streamKey, groupName, "-", "+", msgAmount, consumerName))
if err != nil {
fmt.Println("XPENDING failed, err: ", err)
return nil, err
}
for iIndex := 0; iIndex < len(reply); iIndex++ {
var msgInfo = reply[iIndex].([]interface{})
var msgId = string(msgInfo[0].([]byte))
var belongConsumer = string(msgInfo[1].([]byte))
var idleTime = msgInfo[2].(int64)
var readCount = msgInfo[3].(int64)
pendingMsg := &PendingMsgInfo{msgId, belongConsumer, int(idleTime), int(readCount)}
vecPendingMsg = append(vecPendingMsg, pendingMsg)
}
return vecPendingMsg, nil
}
// MoveMsg 转移消息到其他等待列表中
func (mqClient *RedisStreamMQClient) MoveMsg(streamKey string, groupName string,
consumerName string, idleTime int, vecMsgId []string) error {
if len(vecMsgId) <= 0 {
fmt.Println("vecMsgId len <= 0, no need move")
return nil
}
conn := mqClient.ConnPool.Get()
defer conn.Close()
_, err := redis.Values(conn.Do("XCLAIM", redis.Args{streamKey, groupName, consumerName, idleTime*1000}.AddFlat(vecMsgId)...))
if err != nil {
fmt.Println("XCLAIM failed, msgId:",vecMsgId, "err:", err)
return err
}
return nil
}
// DelDeadMsg 删除不能被消费者处理,也就是不能被 XACK,长时间处于 Pending 列表中的消息
func (mqClient *RedisStreamMQClient) DelDeadMsg(streamKey string, groupName string, vecMsgId []string) error {
if len(vecMsgId) <= 0 {
fmt.Println("vecMsgId len <= 0, no need del")
return nil
}
conn := mqClient.ConnPool.Get()
defer conn.Close()
// 删除消息
_, err := redis.Int(conn.Do("XDEL", redis.Args{streamKey}.AddFlat(vecMsgId)...))
if err != nil {
fmt.Println("XDEL failed, msgId:",vecMsgId, "err:", err)
return err
}
// 设置ACK,否则消息还会存在pending list中
_, err = redis.Int(conn.Do("XACK", redis.Args{streamKey, groupName}.AddFlat(vecMsgId)...))
if err != nil {
fmt.Println("XACK failed, groupName:", groupName, "msgId:",vecMsgId, "err:", err)
return err
}
return nil
}
// GetStreamsLen 获取消息队列的长度,消息消费之后会做标记,不会删除
func (mqClient *RedisStreamMQClient) GetStreamsLen(streamKey string) int {
conn := mqClient.ConnPool.Get()
defer conn.Close()
reply, err := redis.Int(conn.Do("XLEN", streamKey))
if err != nil {
fmt.Println("XLEN failed, err:", err)
return -1
}
return reply
}
// MonitorMqInfo 监控服务器队列信息
func (mqClient *RedisStreamMQClient) MonitorMqInfo(streamKey string) (streamMQInfo *StreamMQInfo){
conn := mqClient.ConnPool.Get()
defer conn.Close()
reply, err := redis.Values(conn.Do("XINFO", "STREAM", streamKey))
if err != nil || len(reply) <= 0{
fmt.Println("XINFO STREAM failed, err:", err)
return nil
}
fmt.Println("reply len:", len(reply))
streamMQInfo = &StreamMQInfo{}
streamMQInfo.Length = reply[1].(int64)
streamMQInfo.RedixTreeKeys = reply[3].(int64)
streamMQInfo.RedixTreeNodes = reply[5].(int64)
streamMQInfo.LastGeneratedId = string(reply[7].([]byte))
streamMQInfo.Groups, _ = reply[9].(int64)
firstEntryInfo := reply[11].([]interface{})
firstEntryMsgId := string(firstEntryInfo[0].([]byte))
vecFirstEntryMsg := firstEntryInfo[1].([]interface{})
firstMsgMap := make(map[string]string, 0)
for iIndex := 0; iIndex < len(vecFirstEntryMsg); iIndex = iIndex + 2 {
msgKey := string(vecFirstEntryMsg[iIndex].([]byte))
msgVal := string(vecFirstEntryMsg[iIndex+1].([]byte))
firstMsgMap[msgKey] = msgVal
}
firstEntry := map[string]map[string]string{
firstEntryMsgId : firstMsgMap,
}
streamMQInfo.FirstEntry = &firstEntry
lastEntryInfo := reply[13].([]interface{})
lastEntryMsgId := string(lastEntryInfo[0].([]byte))
vecLastEntryMsg := lastEntryInfo[1].([]interface{})
lastMsgMap := make(map[string]string, 0)
for iIndex := 0; iIndex < len(vecLastEntryMsg); iIndex = iIndex + 2 {
msgKey := string(vecLastEntryMsg[iIndex].([]byte))
msgVal := string(vecLastEntryMsg[iIndex+1].([]byte))
lastMsgMap[msgKey] = msgVal
}
lastEntry := map[string]map[string]string{
lastEntryMsgId : lastMsgMap,
}
streamMQInfo.LastEntry = &lastEntry
return
}
// MonitorConsumerGroupInfo 监控消费者组信息
func (mqClient *RedisStreamMQClient) MonitorConsumerGroupInfo(streamKey string) (groupInfo *GroupInfo) {
conn := mqClient.ConnPool.Get()
defer conn.Close()
reply, err := redis.Values(conn.Do("XINFO", "GROUPS", streamKey))
if err != nil || len(reply) <= 0{
fmt.Println("XINFO GROUPS failed, err:", err)
return nil
}
fmt.Println("reply len:", len(reply))
oGroupInfo := reply[0].([]interface{})
name := string(oGroupInfo[1].([]byte))
consumers := oGroupInfo[3].(int64)
pending := oGroupInfo[5].(int64)
lastDeliveredId := string(oGroupInfo[7].([]byte))
groupInfo = &GroupInfo{name, consumers, pending, lastDeliveredId}
return
}
// MonitorConsumerInfo 监控消费者信息
func (mqClient *RedisStreamMQClient) MonitorConsumerInfo(streamKey string, groupName string) (vecConsumerInfo []*ConsumerInfo){
conn := mqClient.ConnPool.Get()
defer conn.Close()
reply, err := redis.Values(conn.Do("XINFO", "CONSUMERS", streamKey, groupName))
if err != nil {
fmt.Println("XINFO CONSUMERS failed, err:", err)
return nil
}
fmt.Println("reply len:", len(reply))
for iIndex := 0; iIndex < len(reply); iIndex++ {
oConsumerInfo := reply[iIndex].([]interface{})
name := string(oConsumerInfo[1].([]byte))
pending := oConsumerInfo[3].(int64)
idle := oConsumerInfo[5].(int64)
vecConsumerInfo = append(vecConsumerInfo, &ConsumerInfo{name, pending, idle})
}
return
}
testProduceClient.go
package main
import (
"RedisStreamMqDemo/common"
"fmt"
"time"
)
func testPutMsg(redisCli *common.RedisStreamMQClient, textKey string, textVal string, msgCount int) {
startTime := time.Now()
fmt.Println("Start Test Function testPutMsg")
for i := 0; i < msgCount; i++ {
var strMsgKey string = fmt.Sprintf("%s-%d", textKey, i+1)
var strMsgVal string = fmt.Sprintf("%s-%d", textVal, i+1)
_, err := redisCli.PutMsg(common.TEST_STREAM_KEY, strMsgKey, strMsgVal)
if err != nil {
fmt.Println("PutMsg Failed. err:", err)
}
}
fmt.Println("End Test Function testPutMsg")
costTime := time.Since(startTime)
fmt.Println("=========:START TIME:", startTime)
fmt.Println("=========:COST TIME:", costTime)
}
func testPutMsgBatch(redisCli *common.RedisStreamMQClient, textKey string, textVal string, msgCount int) {
startTime := time.Now()
fmt.Println("Start Test Function PutMsgBatch")
msgMap := make(map[string]string,0)
for i := 0; i < msgCount; i++ {
var strMsgKey string = fmt.Sprintf("%s-%d", textKey, i+1)
var strMsgVal string = fmt.Sprintf("%s-%d", textVal, i+1)
msgMap[strMsgKey] = strMsgVal
}
vecMsgId, err2 := redisCli.PutMsgBatch(common.TEST_STREAM_KEY, msgMap)
if err2 != nil {
fmt.Println("PutMsgBatch Failed. err:", err2)
}
fmt.Println("Reply Msg Id:", vecMsgId)
fmt.Println("End Test Function PutMsgBatch")
costTime := time.Since(startTime)
fmt.Println("=========:START TIME:", startTime)
fmt.Println("=========:COST TIME:", costTime)
}
func main() {
fmt.Println("test redis stream mq producer")
redisOpt := common.RedisConnOpt{
Enable: true,
Host: "127.0.0.1",
Port: 6379,
TTL: 240,
}
redisCli := common.NewClient(redisOpt)
fmt.Println("Test Redis Producer Client Host:", redisCli.RedisConnOpt.Host,
", Port:", redisCli.RedisConnOpt.Port, ", DB:", redisCli.RedisConnOpt.Index)
// 单条生产消息
var textKey string = "DEMO-TEST-STREAM-MSG-KEY"
var textVal string = "DEMO-TEST-STREAM-MSG-VAL"
var msgCount int = 100
testPutMsg(redisCli, textKey, textVal, msgCount)
// 批量生产消息
//var textKey2 string = "DEMO-TEST-STREAM-MSG-KEY"
//var textVal2 string = "DEMO-TEST-STREAM-MSG-VAL"
//var msgCount2 int = 500
//testPutMsgBatch(redisCli, textKey2, textVal2, msgCount2)
//testPutMsgBatch(redisCli, textKey2, textVal2, msgCount2)
//testPutMsgBatch(redisCli, textKey2, textVal2, msgCount2)
return
}
testConsumerClient.go
package main
import (
"RedisStreamMqDemo/common"
"fmt"
"time"
)
func PrintMsgMap(msgMap map[string]map[string][]string) (key2msgIds map[string][]string, msgCount int32){
key2msgIds = make(map[string][]string, 0)
msgCount = 0
for streamKey, val := range msgMap {
//fmt.Println("StreamKey:", streamKey)
vecMsgId := make([]string, 0)
for msgId, msgList := range val {
//fmt.Println("MsgId:", msgId)
vecMsgId = append(vecMsgId, msgId)
for msgIndex := 0; msgIndex < len(msgList); msgIndex = msgIndex + 2 {
//var msgKey = msgList[msgIndex]
//var msgVal = msgList[msgIndex+1]
msgCount++
//fmt.Println("MsgKey:", msgKey, "MsgVal:", msgVal)
}
}
key2msgIds[streamKey] = vecMsgId
}
return
}
// 非阻塞无消费组的情况
func testNoGroup(redisCli *common.RedisStreamMQClient) {
startTime := time.Now()
fmt.Println("Start Test Function GetMsg")
msgMap, err2 := redisCli.GetMsg(common.READ_MSG_AMOUNT, common.TEST_STREAM_KEY, "0")
if err2 != nil {
fmt.Println("GetMsg Failed. streamKey:", common.TEST_STREAM_KEY, "err:", err2)
return
}
_, msgCount := PrintMsgMap(msgMap)
fmt.Println("streamKey:", common.TEST_STREAM_KEY, "Ack Msg Count:", msgCount)
fmt.Println("End Test Function GetMsg")
costTime := time.Since(startTime)
fmt.Println("=========:START TIME:", startTime)
fmt.Println("=========:COST TIME:", costTime)
}
// 阻塞模式无消费组的情况
func testNoGroupBlock(redisCli *common.RedisStreamMQClient) {
startTime := time.Now()
fmt.Println("Start Test Function GetMsgBlock")
msgMap, err := redisCli.GetMsgBlock(common.READ_MSG_BLOCK_SEC, common.READ_MSG_AMOUNT, common.TEST_STREAM_KEY)
if err != nil {
fmt.Println("GetMsg Failed. streamKey:", common.TEST_STREAM_KEY, "err:", err)
return
}
_, msgCount := PrintMsgMap(msgMap)
fmt.Println("streamKey:", common.TEST_STREAM_KEY, "Ack Msg Count:", msgCount)
fmt.Println("End Test Function GetMsgBlock")
costTime := time.Since(startTime)
fmt.Println("=========:START TIME:", startTime)
fmt.Println("=========:COST TIME:", costTime)
}
// 非阻塞消费
func testGroupConsumer(redisCli *common.RedisStreamMQClient, groupName string, consumerName string, msgAmount int32){
/*
fmt.Println("Start Test Function CreateConsumerGroup")
err3 := redisCli.CreateConsumerGroup(common.TEST_STREAM_KEY, groupName, "0")
if err3 != nil {
fmt.Println("CreateConsumerGroup Failed. err:", err3)
return
}
fmt.Println("End Start Test Function CreateConsumerGroup")
*/
startTime := time.Now()
fmt.Println("Start Test Function GetMsgByGroupConsumer")
msgMap3, err3 := redisCli.GetMsgByGroupConsumer(common.TEST_STREAM_KEY, groupName, consumerName, msgAmount)
if err3 != nil {
fmt.Println("GetMsgByGroupConsumer Failed. err:", err3)
return
}
fmt.Println("End Test Function GetMsgByGroupConsumer")
fmt.Println("Start Test Function ReplyAck")
key2msgIds3, _ := PrintMsgMap(msgMap3)
for streamKey, vecMsgId := range key2msgIds3 {
//fmt.Println("streamKey:", streamKey, "groupName:", groupName, "consumerName:", consumerName, "Ack Msg Count:", msgCount)
err3 = redisCli.ReplyAck(streamKey, groupName, vecMsgId)
if err3 != nil {
fmt.Println("ReplyAck Failed. err:", err3)
}
}
fmt.Println("End Test Function ReplyAck")
costTime := time.Since(startTime)
fmt.Println("=========:START TIME:", startTime)
fmt.Println("=========:COST TIME:", costTime)
}
// 阻塞消费
func testGroupConsumerBlock(redisCli *common.RedisStreamMQClient, groupName string, consumerName string, msgAmount int32){
/*
fmt.Println("Start Test Function CreateConsumerGroup")
err3 := redisCli.CreateConsumerGroup(common.TEST_STREAM_KEY, groupName, "0")
if err3 != nil {
fmt.Println("CreateConsumerGroup Failed. err:", err3)
return
}
fmt.Println("End Start Test Function CreateConsumerGroup")
*/
startTime := time.Now()
fmt.Println("Start Test Function GetMsgBlockByGroupConsumer")
msgMap6, err3 := redisCli.GetMsgBlockByGroupConsumer(common.READ_MSG_BLOCK_SEC,
common.TEST_STREAM_KEY, groupName, "ConsumerName1-A", msgAmount)
if err3 != nil {
fmt.Println("GetMsgBlockByGroupConsumer Failed. err:", err3)
return
}
fmt.Println("End Test Function GetMsgBlockByGroupConsumer")
fmt.Println("Start Test Function ReplyAck")
key2msgIds6, msgCount := PrintMsgMap(msgMap6)
for streamKey, vecMsgId := range key2msgIds6 {
fmt.Println("streamKey:", streamKey, "groupName:", groupName, "consumerName:", consumerName, "Ack Msg Count:", msgCount)
err3 = redisCli.ReplyAck(streamKey, groupName, vecMsgId)
if err3 != nil {
fmt.Println("ReplyAck Failed. err:", err3)
}
}
fmt.Println("End Test Function ReplyAck")
costTime := time.Since(startTime)
fmt.Println("=========:START TIME:", startTime)
fmt.Println("=========:COST TIME:", costTime)
}
func testPendingList(redisCli *common.RedisStreamMQClient, groupName string, consumerName string){
fmt.Println("Start Test Function GetPendingList")
vecPendingMsg, _ := redisCli.GetPendingList(common.TEST_STREAM_KEY, groupName, consumerName, common.READ_MSG_AMOUNT)
vecMsgId := make([]string, 0)
for _, pendingMsg := range vecPendingMsg {
vecMsgId = append(vecMsgId, pendingMsg.MsgId)
}
_ = redisCli.ReplyAck(common.TEST_STREAM_KEY, groupName, vecMsgId)
fmt.Println("Start Test Function GetPendingList")
}
func testXinfo(redisCli *common.RedisStreamMQClient, streamKey string, groupName string) {
fmt.Println("Start Test Function MonitorMqInfo")
streamMqInfo := redisCli.MonitorMqInfo(streamKey)
fmt.Println("streamMqInfo:{")
fmt.Println("Length:", streamMqInfo.Length)
fmt.Println("RedixTreeKeys:", streamMqInfo.RedixTreeKeys)
fmt.Println("RedixTreeNodes:",streamMqInfo.RedixTreeNodes)
fmt.Println("LastGeneratedId:", streamMqInfo.LastGeneratedId)
fmt.Println("Groups:",streamMqInfo.Groups)
fmt.Println("FirstEntry:", streamMqInfo.FirstEntry)
fmt.Println("LastEntry:", streamMqInfo.LastEntry)
fmt.Println("}")
fmt.Println("End Test Function MonitorMqInfo")
fmt.Println("Start Test Function MonitorConsumerGroupInfo")
groupInfo := redisCli.MonitorConsumerGroupInfo(streamKey)
if groupInfo != nil {
fmt.Println("groupInfo:{")
fmt.Println("Name:", groupInfo.Name)
fmt.Println("Consumers:", groupInfo.Consumers)
fmt.Println("Pending:", groupInfo.Pending)
fmt.Println("LastDeliveredId:", groupInfo.LastDeliveredId)
fmt.Println("}")
}
fmt.Println("End Test Function MonitorConsumerGroupInfo")
fmt.Println("Start Test Function MonitorConsumerInfo")
vecConsumerInfo := redisCli.MonitorConsumerInfo(streamKey, groupName)
fmt.Println("groupInfo:{")
for _, consumerInfo := range vecConsumerInfo {
fmt.Println("Name:", consumerInfo.Name)
fmt.Println("Pending:", consumerInfo.Pending)
fmt.Println("Idle:", consumerInfo.Idle)
fmt.Println("===========================")
}
fmt.Println("}")
fmt.Println("End Test Function MonitorConsumerInfo")
}
func main() {
fmt.Println("test redis stream mq consumer")
redisOpt := common.RedisConnOpt{
Enable: true,
Host: "127.0.0.1",
Port: 6379,
TTL: 240,
}
redisCli := common.NewClient(redisOpt)
fmt.Println("Test Redis Consumer Client Host:", redisCli.RedisConnOpt.Host,
", Port:", redisCli.RedisConnOpt.Port, ", DB:", redisCli.RedisConnOpt.Index)
//无消费者组,所有消费者都能消费所有消息
//testNoGroup(redisCli)
//无消费者组,所有消费者都能消费所有消息(阻塞模式)
//testNoGroupBlock(redisCli)
//有消费者组,所有消费者都不能重复消费组内的消息
var groupName1 string = "GroupName1"
var consumerName1 string = "ConsumerName1-" + time.Now().String()
var msgAmount1 int32 = 50000
testGroupConsumer(redisCli, groupName1, consumerName1, msgAmount1)
testPendingList(redisCli, groupName1, consumerName1)
//有消费者组,所有消费者都不能重复消费组内的消息(阻塞模式)
//var groupName2 string = "GroupName2"
//var consumerName2 string = "ConsumerName2-" + time.Now().String()
//var msgAmount2 int32 = 50000
//testGroupConsumerBlock(redisCli, groupName2, consumerName2, msgAmount2)
//testPendingList(redisCli, groupName2, consumerName2)
//XINFO测试
//var groupName1 string = "testgroupname"
//var streamKey string = "test-mq"
//testXinfo(redisCli, streamKey, groupName1)
return
}
这里给出完整代码下载地址,下载可直接运行。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)