基于Go语言实现对Redis Stream消息队列客户端的封装

基于Go语言实现对Redis Stream消息队列客户端的封装,第1张

1、概述

早期,基于Redis实现轻量化的消息队列有3种实现方式,分别是基于List的LPUSH+BRPOP (BRPOPLPUSH)的实现、PUB/SUB发布订阅模式以及基于Sorted-Set实现方式,但是,这三种模式分别有其相应的缺点。

实现方式缺点
基于List的LPUSH+BRPOP做消费者确认ACK比较麻烦,不能保证消费者消费消息后是否成功处理的问题,通常需要维护一个额外的列表,且不支持重复消费和分组消费。
PUB/SUB发布订阅模式若客户端不在线时发布消息会丢失,且消费者客户端出现消息积压,到一定程度,会被强制断开,导致消息意外丢失,可见PUB/SUB模式不适合做消息存储,消息积压类的业务。
基于Sorted-Set实现由于集合的特点,不允许重复消息,而且消息ID确定有误的话会导致消息的顺序出错。

Redis5.0中发布的Stream类型,也用来实现典型的消息队列。该Stream类型的出现,几乎满足了消息队列具备的全部内容,包括但不限于:

消息ID的序列化生成消息遍历消息的阻塞和非阻塞读取消息的分组消费未完成消息的处理消息队列监控 2、相关命令解释

本文基于Redis6.2版本进行说明,注意不同版本的Redis可能有些命令的部分参数会存在差异,但不影响整体使用。Stream相关的命令主要可以分为2大类,一类是与消息队列相关的命令,另一类是与消费者组相关的命令。
与消息队列相关的命令:

XADDXREADXDELXLENXRANGEXREVRANGEXTRIM

与消费者组相关的命令:

XGROUPXREADGROUPXPENDINGXACKXCLAIMXINFO 2.1、XADD


解释:XADD命令用于往某个消息队列中添加消息。

key:表示消息队列的名称,如果不存在就创建。[NOMKSTREAM]:可选参数,表示第一个参数key不存在时不创建。[MAXLEN|MINID [=|~] threshold [LIMIT count]] :可选参数,MAXLEN|MINID表示指定消息队列中消息的最大长度或者是消息ID的最小值。=|~表示设置精确的值或者是大约值,threshold 表示具体设置的值,超过threshold值以后,旧的消息将会被删掉。LIMIT count如果设置了会被当做键值对的形式保存在消息体中的第一个位置,另外设置LIMIT时MAXLEN和MINID只能使用~设置大约值(Redis6.2版本后才加入了LIMIT参数),由于队列中的消息不会主动被删除,但是在设置MAXLEN后,当消息队列长度超过MAXLEN时,会删除老的消息,保证消息队列长度不会一直累加。*|ID:表示消息ID,*表示由Redis生成(建议方案),ID表示自己指定。field value [field value …]:用于保存消息具体内容的键值对,可以传入多组键值对。 2.2、XREAD


解释:XREAD命令用于从某个消息队列中读取消息,分为阻塞模式和非阻塞模式。

[COUNT count]:可选参数,COUNT为关键字,表示指定读取消息的数量,count表示具体的值。[BLOCK milliseconds]:可选参数,BLOCK为关键字,表示设置XREAD为阻塞模式,默认是非阻塞模式,milliseconds表示具体阻塞的时间。STREAMS :关键字。key [key …]:表示消息队列的名称,可以传入多个消息队列名称。ID [ID …]:用于表示从哪个消息ID开始读取(不包含),与前面的key一一对应。0表示从第一条消息开始。在阻塞模式中,可以使用$用于表示最新的消息ID。(在非阻塞模式下$无意义)。 2.3、XDEL


解释:XDEL命令用于进行消息删除,注意XACK进行消息确认只是进行了标记,消息还是会存在消息队列中,并没有删除。使用XDEL命令才会将消息从消息队列中删除。

key:表示消息队列的名称。ID [ID …]:表示消息ID,可以传入多个消息ID。 2.4、XLEN


解释:XLEN命令用于获取消息队列的长度。

key:表示消息队列的名称。 2.5、XRANGE


解释:XRANGE命令用于获取消息队列中的消息,和XREAD有点类似,XREAD只能指定开始消息ID(不包含),XRANGE可以指定开始和结束消息ID。另外还有个XREVRANGE命令用于反向获取消息列表,与XRANGE不同的是消息ID是从大到小。

key:表示消息队列的名称。start:表示起始消息ID(包含)。end:表示结束消息ID(包含)。[COUNT count]:COUNT为关键字,表示指定读取消息的数量,count表示具体的值。同XREAD命令。 2.6 XTRIM


解释:XTRIM命令用于对消息队列进行修剪,限制长度。

key:表示消息队列的名称。MAXLEN|MINID [=|~] threshold [LIMIT count]:同XADD命令中的同名可选参数意义相同。 2.7、XGROUP

2.7.1 CREATE

解释:XGROUP CREATE命令用于创建消费者组。

CREATE:关键字,表示创建消费者组命令。key:表示消息队列的名称。groupname:表示要创建的消费者组名称。ID|$:表示该消费者组中的消费者将从哪个消息ID开始消费消息,ID表示指定的消息ID,$表示只消费新产生的消息。[MKSTREAM]:可选参数,表示在创建消费者组时,如果指定的消息队列不存在,会自动创建。但是这种方式创建的消息队列其长度为0。 2.7.2 SETID

解释:XGROUP SETID命令用于设置消费者组中下一条要读取的消息ID。

SETID:关键字,表示设置消费者组中下一条要读取的消息ID命令key:表示消息队列的名称。groupname:表示消费者组名称。ID|$:表示指定具体的消息ID,0可以表示重新开始处理消费者组中的所有消息,$表示只处理消费者组中新产生的消息。 2.7.3 DESTROY

解释:XGROUP DESTROY命令用于销毁消费者组。

DESTROY:关键字,表示销毁消费者组命令。key:表示消息队列的名称。groupname:表示要销毁的消费者组名称。 2.7.4 CREATECONSUMER

解释:XGROUP CREATECONSUMER命令用于创建消费者。

CREATECONSUMER:关键字,表示创建消费者命令。key:表示消息队列的名称。groupname:表示要创建的消费者所属的消费者组名称。consumername:表示要创建的消费者名称。 2.7.5 DELCONSUMER

解释:XGROUP DELCONSUMER命令用于删除消费者。

DELCONSUMER:关键字,表示删除消费者命令。key:表示消息队列的名称。groupname:表示要删除的消费者所属的消费者组名称。consumername:表示要删除的消费者名称。 2.8、XREADGROUP


解释:XREADGROUP命令用于分组消费消息。

GROUP:关键字。group:表示消费者组名称。consumer:表示消费者名称。[COUNT count]:可选参数,COUNT为关键字,表示指定读取消息的数量,count表示具体的值。同XREAD命令。[BLOCK milliseconds]:可选参数,可选参数,BLOCK为关键字,表示设置XREAD为阻塞模式,默认是非阻塞模式,milliseconds表示具体阻塞的时间。同XREAD命令。[NOACK]:可选参数,表示不要将消息加入到PEL队列(Pending等待队列)中,相当于在消息读取时直接进行消息确认。在可靠性要求不高和偶尔丢失消息可接受的场景下可以使用。STREAMS:关键字。key [key …]:表示消息队列的名称,可以传入多个消息队列名称。同XREAD命令。ID [ID …]:用于表示从哪个消息ID开始读取,与前面的key一一对应。0表示从第一条消息开始。在阻塞模式中,可以使用$用于表示最新的消息ID。(在非阻塞模式下$无意义)。同XREAD命令。 2.9、XPENDING


解释:XPENDING命令用于获取等待队列,等待队列中保存的是消费者组内被读取,但是还未完成处理的消息,也就是还没有ACK的消息。

key:表示消息队列的名称。group:表示消费者组名称。[IDLE min-idle-time]:可选参数,IDLE表示指定消息已读取时长,min-idle-time表示具体的值。start:表示起始消息ID(包含)。end:表示结束消息ID(包含)。count:指定读取消息的条数。[consumer]:可选参数,表示消费者名称。 2.10、XACK


解释:XACK命令用于进行消息确认。

key:表示消息队列的名称。group:表示消费者组名称。ID [ID …]:表示消息ID,可以传入多个消息ID。 2.11、XCLAIM:消息转移


解释:XCLAIM命令用于进行消息转移,当某个等待队列中的消息长时间没有被处理(没有ACK)的时候,可以用XCLAIM命令将其转移到其他消费者的等待列表中。

key:表示消息队列的名称。group:表示消费者组名称。consumer:表示消费者名称。min-idle-time:表示消息空闲时长(表示消息已经读取,但还未处理)。ID [ID …]:可选参数,表示要转移的消息的消息ID,可传入多个消息ID。[IDLE ms]:可选参数,设置消息空闲时间(上一次读取消息的时间),如果未指定,这假定IDLE为0,即每次转移消息之后重置消息空闲时间。因为如果空闲时间一直累加的话,消息会一直转移。[TIME ms-unix-time]:可选参数,与IDLE参数相同,只是它将空闲时间设置为特定的Unix时间(以毫秒为单位),而不是相对的毫秒量。这对于重写生成XCLAIM命令的AOF文件非常有用。[RETRYCOUNT count]:可选参数,设置重试计数器的值,每次消息被读取时,该计数器都会递增。一般XCLAIM命令不需要修改重试计数器的值。[FORCE]:可选参数,即使指定要转移的消息的消息ID在其他等待列表中不存在,也强制将该消息ID加入到执行消费者的等待列表中。[JUSTID]:可选参数,仅返回要转移消息的消息ID,使用此参数意味着重试计数器不会递增。 2.12、XINFO

2.12.1、CONSUMERS

解释:XINFO CONSUMERS命令用于监控消费者。

CONSUMERS:关键字,表示查看消费者信息命令。key:表示消息队列的名称。groupname:表示消费者组名称。 2.12.2、GROUPS

解释:XINFO GROUPS命令用于监控消费者组。

GROUPS:关键字,表示查看消费者组信息命令。key:表示消息队列的名称。 2.12.3、STREAM

解释:XINFO STREAM命令用于监控消息队列。

STREAM:关键字,表示查看消息队列信息命令。key:表示消息队列的名称。 2.12.4、HELP

解释:XINFO HELP 命令用于获取帮助。

HELP:关键字,表示获取帮助信息命令。 3、XADD/XREAD模式和消费者组模式 3.1、XADD/XREAD模式

普通场景下,生产者生产消息,消费者消费消息,多个消费者可以重复的消费相同的消息,比较类似常规的发布订阅模式,订阅了某个消息队列的消费者,能够获取到生产者投放的消息。当然消费者可以采用阻塞或者非阻塞的模式进行读取消息,业务处理等。一个典型的阻塞模式使用方式如下:

#Producer
127.0.0.1:6379> XADD test-mq * key1 value1
"1622605684330-0"
127.0.0.1:6379> 
127.0.0.1:6379> XADD test-mq * key2 value2
"1622605691371-0"
127.0.0.1:6379> 
127.0.0.1:6379> XADD test-mq * key3 value3
"1622605698309-0"
127.0.0.1:6379> 
127.0.0.1:6379> XADD test-mq * key4 value4
"1622605707261-0"
127.0.0.1:6379> 
127.0.0.1:6379> XADD test-mq * key5 value5 key6 value6
"1622605714081-0"
127.0.0.1:6379>
#Consumer
127.0.0.1:6379> XREAD BLOCK 10000 STREAMS test-mq $
1) 1) "test-mq"
   2) 1) 1) "1622605684330-0"
         2) 1) "key1"
            2) "value1"
(3.32s)
127.0.0.1:6379> 
127.0.0.1:6379> XREAD BLOCK 10000 STREAMS test-mq $
1) 1) "test-mq"
   2) 1) 1) "1622605691371-0"
         2) 1) "key2"
            2) "value2"
(2.88s)
127.0.0.1:6379> 
127.0.0.1:6379> XREAD BLOCK 10000 STREAMS test-mq $
1) 1) "test-mq"
   2) 1) 1) "1622605698309-0"
         2) 1) "key3"
            2) "value3"
(3.37s)
127.0.0.1:6379> 
127.0.0.1:6379> XREAD BLOCK 10000 STREAMS test-mq $
1) 1) "test-mq"
   2) 1) 1) "1622605707261-0"
         2) 1) "key4"
            2) "value4"
(3.75s)
127.0.0.1:6379> 
127.0.0.1:6379> XREAD BLOCK 10000 STREAMS test-mq $
1) 1) "test-mq"
   2) 1) 1) "1622605714081-0"
         2) 1) "key5"
            2) "value5"
            3) "key6"
            4) "value6"
(2.47s)
127.0.0.1:6379>

说明:使用阻塞模式的XREAD,XREAD BLOCK 10000 STREAMS test-mq $,最后一个参数$表示读取最新的消息,所以需要先启动消费者,阻塞等待消息,然后生产者添加消息,消费者接受消息完成处理。

3.2、消费者组模式

在有些场景下,我们需要多个消费者配合来消费同一个消息队列中的消息,而不是多个消费者重复的消息,以此来提高消息处理的效率。这种模式也就是消费者组模式了。消费者组模式如下图所示:

下面是Redis Stream的结构图:

上图解释:

Consumer Group :消费组,使用 XGROUP CREATE 命令创建,一个消费组有多个消费者(Consumer)。last_delivered_id :游标,每个消费组会有个游标 last_delivered_id,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。pending_ids :消费者(Consumer)的状态变量,作用是维护消费者的未确认的 id。 pending_ids 记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符) 4、基于Go语言封装Redis Stream客户端Demo

代码结构如下:

connHandle.go

package common

import (
	"fmt"
	"github.com/gomodule/redigo/redis"
	"log"
	"time"
)

func NewClient(opt RedisConnOpt) *RedisStreamMQClient {
	return &RedisStreamMQClient{
		RedisConnOpt: opt,
		ConnPool:     newPool(opt),
	}
}

func newPool(opt RedisConnOpt) *redis.Pool{
	return &redis.Pool{
		MaxIdle: 3,
		IdleTimeout: 240*time.Second,
		MaxActive: 10,
		Wait: true,
		Dial: func() (redis.Conn, error) {
			c, err := redis.Dial("tcp", fmt.Sprintf("%s:%d", opt.Host, opt.Port))
			if err != nil {
				log.Fatalf("Redis.Dial: %v", err)
				return nil, err
			}
			/*
			if _, err := c.Do("AUTH", opt.Password); err != nil {
				c.Close()
				log.Fatalf("Redis.AUTH: %v", err)
				return nil, err
			}
			*/
			if _, err := c.Do("SELECT", opt.Index); err != nil {
				c.Close()
				log.Fatalf("Redis.SELECT: %v", err)
				return nil, err
			}
			return c, nil
		},
	}
}

define.go

package common

import (
	"github.com/gomodule/redigo/redis"
)

const (
	STREAM_MQ_MAX_LEN = 500000  //消息队列最大长度
	READ_MSG_AMOUNT = 1000		//每次读取消息的条数
	READ_MSG_BLOCK_SEC = 30     //阻塞读取消息时间
	TEST_STREAM_KEY = "TestStreamKey1"
)

type RedisConnOpt struct {
	Enable   bool
	Host     string
	Port     int32
	Password string
	Index    int32
	TTL      int32
}

type RedisStreamMQClient struct {
	RedisConnOpt RedisConnOpt
	ConnPool     *redis.Pool
	StreamKey    string		//stream对应的key值
	GroupName    string		//消费者组名称
	ConsumerName string		//消费者名称
}

//等待列表中的消息属性
type PendingMsgInfo struct {
	MsgId			string	//消息ID
	BelongConsumer	string	//所属消费者
	IdleTime		int		//已读取未消费时长
	ReadCount		int		//消息被读取次数
}

// 消息队列信息
type StreamMQInfo struct {
	Length			int64			// 消息队列长度
	RedixTreeKeys	int64			// 基数树key数
	RedixTreeNodes	int64			// 基数树节点数
	LastGeneratedId	string			// 最后一个生成的消息ID
	Groups			int64			// 消费组个数
	FirstEntry		*map[string]map[string]string	// 第一个消息体
	LastEntry		*map[string]map[string]string	// 最后一个消息体
}

// 消费组信息
type GroupInfo struct {
	Name			string	    // 消费组名称
	Consumers		int64		// 组内消费者个数
	Pending			int64		// 组内所有消费者的等待列表总长度
	LastDeliveredId	string		// 组内最后一条被消费的消息ID
}

// 消费者信息
type ConsumerInfo struct {
	Name			string		// 消费者名称
	Pending			int64		// 等待队列长度
	Idle			int64		// 消费者空闲时间(毫秒)
}

msgHandle.go

package common

import (
	"fmt"
	"github.com/gomodule/redigo/redis"
)

// PutMsg 添加消息
func (mqClient *RedisStreamMQClient) PutMsg(streamKey string, msgKey string, msgValue string) (strMsgId string, err error){
	conn := mqClient.ConnPool.Get()
	defer conn.Close()
	//*表示由Redis自己生成消息ID,设置MAXLEN可以保证消息队列的长度不会一直累加
	strMsgId, err = redis.String(conn.Do("XADD",
		streamKey, "MAXLEN", "=", STREAM_MQ_MAX_LEN, "*", msgKey, msgValue))
	if err != nil {
		fmt.Println("XADD failed, err: ", err)
		return "", err
	}
	//fmt.Println("Reply Msg Id:", strMsgId)
	return strMsgId, nil
}

// PutMsgBatch 批量添加消息
func (mqClient *RedisStreamMQClient) PutMsgBatch(streamKey string, msgMap map[string]string) (msgId string, err error){
	if len(msgMap) <= 0 {
		fmt.Println("msgMap len <= 0, no need put")
		return msgId, nil
	}

	conn := mqClient.ConnPool.Get()
	defer conn.Close()

	vecMsg := make([]string, 0)
	for msgKey, msgValue := range msgMap {
		vecMsg = append(vecMsg, msgKey)
		vecMsg = append(vecMsg, msgValue)
	}

	msgId, err = redis.String(conn.Do("XADD",
		redis.Args{streamKey, "MAXLEN", "=", STREAM_MQ_MAX_LEN, "*"}.AddFlat(vecMsg)...))
	if err != nil {
		fmt.Println("XADD failed, err: ", err)
		return "", err
	}

	fmt.Println("Reply Msg Id:", msgId)
	return msgId, nil
}

func (mqClient *RedisStreamMQClient) ConvertVecInterface(vecReply []interface{}) (msgMap map[string]map[string][]string){
	msgMap = make(map[string]map[string][]string, 0)
	for keyIndex := 0; keyIndex < len(vecReply); keyIndex++ {
		var keyInfo = vecReply[keyIndex].([]interface{})
		var key = string(keyInfo[0].([]byte))
		var idList = keyInfo[1].([]interface{})

		//fmt.Println("StreamKey:", key)
		msgInfoMap := make(map[string][]string, 0)
		for idIndex := 0; idIndex < len(idList); idIndex++ {
			var idInfo = idList[idIndex].([]interface{})
			var id = string(idInfo[0].([]byte))

			var fieldList = idInfo[1].([]interface{})
			vecMsg := make([]string, 0)
			for msgIndex := 0; msgIndex < len(fieldList); msgIndex = msgIndex + 2 {
				var msgKey = string(fieldList[msgIndex].([]byte))
				var msgVal = string(fieldList[msgIndex+1].([]byte))
				vecMsg = append(vecMsg, msgKey)
				vecMsg = append(vecMsg, msgVal)
				//fmt.Println("MsgId:", id, "MsgKey:", msgKey, "MsgVal:", msgVal)
			}
			msgInfoMap[id] = vecMsg
		}
		msgMap[key] = msgInfoMap
	}
	return
}

// GetMsgBlock 阻塞方式读取消息
func (mqClient *RedisStreamMQClient) GetMsgBlock(blockSec int32, msgAmount int32, streamKey string) (
	msgMap map[string]map[string][]string, err error){
	conn := mqClient.ConnPool.Get()
	defer conn.Close()
	//在阻塞模式中,可以使用$,表示最新的消息ID(在非阻塞模式下$无意义)
	reply, err := redis.Values(conn.Do("XREAD",
		"COUNT", msgAmount, "BLOCK", blockSec*1000, "STREAMS", streamKey, "$"))
	if err != nil && err != redis.ErrNil{
		fmt.Println("BLOCK XREAD failed, err: ", err)
		return nil, err
	}

	//返回消息转换
	msgMap = mqClient.ConvertVecInterface(reply)
	fmt.Println("MsgMap:", msgMap)
	return msgMap, nil
}

// GetMsg 非阻塞方式读取消息
func (mqClient *RedisStreamMQClient) GetMsg(msgAmount int32, streamKey string, beginMsgId string) (
	msgMap map[string]map[string][]string, err error) {
	conn := mqClient.ConnPool.Get()
	defer conn.Close()
	//从消息ID=beginMsgId往后开始读取,不包含beginMsgId的消息
	reply, err := redis.Values(conn.Do("XREAD", "COUNT", msgAmount, "STREAMS", streamKey, beginMsgId))
	if err != nil && err != redis.ErrNil{
		fmt.Println("XREAD failed, err: ", err)
		return nil, err
	}

	//返回消息转换
	msgMap = mqClient.ConvertVecInterface(reply)
	fmt.Println("MsgMap:", msgMap)
	return msgMap, nil
}

// DelMsg 删除消息
func (mqClient *RedisStreamMQClient) DelMsg(streamKey string, vecMsgId []string) (err error){
	if len(vecMsgId) <= 0 {
		fmt.Println("vecMsgId len <= 0, no need del")
		return nil
	}

	conn := mqClient.ConnPool.Get()
	defer conn.Close()

	for _, msgId := range vecMsgId {
		_, err := redis.Int(conn.Do("XDEL", streamKey, msgId))
		if err != nil {
			fmt.Println("XDEL failed, msgId:",msgId, "err:", err)
		}
	}
	return nil
}

// ReplyAck 返回ACK
func (mqClient *RedisStreamMQClient) ReplyAck(streamKey string, groupName string, vecMsgId []string) error {
	if len(vecMsgId) <= 0 {
		fmt.Println("vecMsgId len <= 0, no need ack")
		return nil
	}

	conn := mqClient.ConnPool.Get()
	defer conn.Close()

	//fmt.Println("Start ReplyAck, vecMsgId:", vecMsgId)
	_, err := redis.Int(conn.Do("XACK", redis.Args{streamKey, groupName}.AddFlat(vecMsgId)...))
	if err != nil {
		fmt.Println("XACK failed, msgId:",vecMsgId, "err:", err)
		return err
	}
	//fmt.Println("ReplyAck Success")
	return nil
}

// CreateConsumerGroup 创建消费者组
func (mqClient *RedisStreamMQClient) CreateConsumerGroup(streamKey string, groupName string, beginMsgId string) error {
	conn := mqClient.ConnPool.Get()
	defer conn.Close()
	//最后一个参数表示该组从消息ID=beginMsgId往后开始消费,不包含beginMsgId的消息
	_, err := redis.String(conn.Do("XGROUP", "CREATE", streamKey, groupName, beginMsgId))
	if err != nil {
		fmt.Println("XGROUP CREATE Failed. err:", err)
		return err
	}
	return nil
}

// DestroyConsumerGroup 销毁消费者组
func (mqClient *RedisStreamMQClient) DestroyConsumerGroup(streamKey string, groupName string) error {
	conn := mqClient.ConnPool.Get()
	defer conn.Close()

	_, err := redis.String(conn.Do("XGROUP", "DESTROY", streamKey, groupName))
	if err != nil {
		fmt.Println("XGROUP DESTROY Failed. err:", err)
		return err
	}
	return nil
}

// GetMsgByGroupConsumer 组内消息分配 *** 作,组内每个消费者消费多少消息
func (mqClient *RedisStreamMQClient) GetMsgByGroupConsumer(streamKey string, groupName string,
	consumerName string, msgAmount int32)(msgMap map[string]map[string][]string, err error) {
	conn := mqClient.ConnPool.Get()
	defer conn.Close()

	//>代表当前消费者还没读取的消息
	reply, err := redis.Values(conn.Do("XREADGROUP",
		"GROUP", groupName, consumerName, "COUNT", msgAmount, "STREAMS", streamKey, ">"))
	if err != nil && err != redis.ErrNil{
		fmt.Println("XREADGROUP failed, err: ", err)
		return nil, err
	}

	//返回消息转换
	msgMap = mqClient.ConvertVecInterface(reply)
	//fmt.Println("MsgMap:", msgMap)
	return msgMap, nil
}

// CreateConsumer 创建消费者
func (mqClient *RedisStreamMQClient) CreateConsumer(streamKey string, groupName string, consumerName string) error {
	conn := mqClient.ConnPool.Get()
	defer conn.Close()

	_, err := redis.String(conn.Do("XGROUP", "CREATECONSUMER", streamKey, groupName, consumerName))
	if err != nil {
		fmt.Println("XGROUP CREATECONSUMER Failed. err:", err)
		return err
	}
	return nil
}

// DelConsumer 删除消费者
func (mqClient *RedisStreamMQClient) DelConsumer(streamKey string, groupName string, consumerName string) error {
	conn := mqClient.ConnPool.Get()
	defer conn.Close()

	_, err := redis.String(conn.Do("XGROUP", "DELCONSUMER", streamKey, groupName, consumerName))
	if err != nil {
		fmt.Println("XGROUP DELCONSUMER Failed. err:", err)
		return err
	}
	return nil
}

// GetMsgByGroupConsumer 组内消息分配 *** 作,组内每个消费者消费多少消息
func (mqClient *RedisStreamMQClient) GetMsgBlockByGroupConsumer(blockSec int32, streamKey string, groupName string,
	consumerName string, msgAmount int32)(msgMap map[string]map[string][]string, err error) {
	conn := mqClient.ConnPool.Get()
	defer conn.Close()

	//>代表当前消费者还没读取的消息
	reply, err := redis.Values(conn.Do("XREADGROUP", "GROUP", groupName,
		consumerName, "COUNT", msgAmount, "BLOCK", blockSec*1000, "STREAMS", streamKey, ">"))
	if err != nil && err != redis.ErrNil{
		fmt.Println("BLOCK XREADGROUP failed, err: ", err)
		return nil, err
	}

	//返回消息转换
	msgMap = mqClient.ConvertVecInterface(reply)
	fmt.Println("MsgMap:", msgMap)
	return msgMap, nil
}

// GetPendingList 获取等待列表(读取但还未消费的消息)
func (mqClient *RedisStreamMQClient) GetPendingList(streamKey string, groupName string, consumerName string, msgAmount int32)(
	vecPendingMsg []*PendingMsgInfo, err error) {
	conn := mqClient.ConnPool.Get()
	defer conn.Close()

	reply, err := redis.Values(conn.Do("XPENDING", streamKey, groupName, "-", "+", msgAmount, consumerName))
	if err != nil {
		fmt.Println("XPENDING failed, err: ", err)
		return nil, err
	}

	for iIndex := 0; iIndex < len(reply); iIndex++ {

		var msgInfo = reply[iIndex].([]interface{})
		var msgId = string(msgInfo[0].([]byte))
		var belongConsumer = string(msgInfo[1].([]byte))
		var idleTime = msgInfo[2].(int64)
		var readCount = msgInfo[3].(int64)

		pendingMsg := &PendingMsgInfo{msgId, belongConsumer, int(idleTime), int(readCount)}
		vecPendingMsg = append(vecPendingMsg, pendingMsg)
	}

	return vecPendingMsg, nil
}

// MoveMsg 转移消息到其他等待列表中
func (mqClient *RedisStreamMQClient) MoveMsg(streamKey string, groupName string,
	consumerName string, idleTime int, vecMsgId []string) error {
	if len(vecMsgId) <= 0 {
		fmt.Println("vecMsgId len <= 0, no need move")
		return nil
	}

	conn := mqClient.ConnPool.Get()
	defer conn.Close()

	_, err := redis.Values(conn.Do("XCLAIM", redis.Args{streamKey, groupName, consumerName, idleTime*1000}.AddFlat(vecMsgId)...))
	if err != nil {
		fmt.Println("XCLAIM failed, msgId:",vecMsgId, "err:", err)
		return err
	}
	return nil
}

// DelDeadMsg 删除不能被消费者处理,也就是不能被 XACK,长时间处于 Pending 列表中的消息
func (mqClient *RedisStreamMQClient) DelDeadMsg(streamKey string, groupName string, vecMsgId []string) error {
	if len(vecMsgId) <= 0 {
		fmt.Println("vecMsgId len <= 0, no need del")
		return nil
	}

	conn := mqClient.ConnPool.Get()
	defer conn.Close()

	// 删除消息
	_, err := redis.Int(conn.Do("XDEL", redis.Args{streamKey}.AddFlat(vecMsgId)...))
	if err != nil {
		fmt.Println("XDEL failed, msgId:",vecMsgId, "err:", err)
		return err
	}
	// 设置ACK,否则消息还会存在pending list中
	_, err = redis.Int(conn.Do("XACK", redis.Args{streamKey, groupName}.AddFlat(vecMsgId)...))
	if err != nil {
		fmt.Println("XACK failed, groupName:", groupName, "msgId:",vecMsgId, "err:", err)
		return err
	}
	return nil
}

// GetStreamsLen 获取消息队列的长度,消息消费之后会做标记,不会删除
func (mqClient *RedisStreamMQClient) GetStreamsLen(streamKey string) int {
	conn := mqClient.ConnPool.Get()
	defer conn.Close()

	reply, err := redis.Int(conn.Do("XLEN", streamKey))
	if err != nil {
		fmt.Println("XLEN failed, err:", err)
		return -1
	}
	return reply
}

// MonitorMqInfo 监控服务器队列信息
func (mqClient *RedisStreamMQClient) MonitorMqInfo(streamKey string) (streamMQInfo *StreamMQInfo){
	conn := mqClient.ConnPool.Get()
	defer conn.Close()

	reply, err := redis.Values(conn.Do("XINFO", "STREAM", streamKey))
	if err != nil || len(reply) <= 0{
		fmt.Println("XINFO STREAM failed, err:", err)
		return nil
	}
	fmt.Println("reply len:", len(reply))

	streamMQInfo = &StreamMQInfo{}
	streamMQInfo.Length = reply[1].(int64)
	streamMQInfo.RedixTreeKeys = reply[3].(int64)
	streamMQInfo.RedixTreeNodes = reply[5].(int64)
	streamMQInfo.LastGeneratedId = string(reply[7].([]byte))
	streamMQInfo.Groups, _ = reply[9].(int64)

	firstEntryInfo := reply[11].([]interface{})
	firstEntryMsgId := string(firstEntryInfo[0].([]byte))
	vecFirstEntryMsg := firstEntryInfo[1].([]interface{})
	firstMsgMap := make(map[string]string, 0)
	for iIndex := 0; iIndex < len(vecFirstEntryMsg); iIndex = iIndex + 2 {
		msgKey := string(vecFirstEntryMsg[iIndex].([]byte))
		msgVal := string(vecFirstEntryMsg[iIndex+1].([]byte))
		firstMsgMap[msgKey] = msgVal
	}
	firstEntry := map[string]map[string]string{
		firstEntryMsgId : firstMsgMap,
	}
	streamMQInfo.FirstEntry = &firstEntry

	lastEntryInfo := reply[13].([]interface{})
	lastEntryMsgId := string(lastEntryInfo[0].([]byte))
	vecLastEntryMsg := lastEntryInfo[1].([]interface{})
	lastMsgMap := make(map[string]string, 0)
	for iIndex := 0; iIndex < len(vecLastEntryMsg); iIndex = iIndex + 2 {
		msgKey := string(vecLastEntryMsg[iIndex].([]byte))
		msgVal := string(vecLastEntryMsg[iIndex+1].([]byte))
		lastMsgMap[msgKey] = msgVal
	}
	lastEntry := map[string]map[string]string{
		lastEntryMsgId : lastMsgMap,
	}
	streamMQInfo.LastEntry = &lastEntry
	return
}

// MonitorConsumerGroupInfo 监控消费者组信息
func (mqClient *RedisStreamMQClient) MonitorConsumerGroupInfo(streamKey string) (groupInfo *GroupInfo) {
	conn := mqClient.ConnPool.Get()
	defer conn.Close()

	reply, err := redis.Values(conn.Do("XINFO", "GROUPS", streamKey))
	if err != nil || len(reply) <= 0{
		fmt.Println("XINFO GROUPS failed, err:", err)
		return nil
	}
	fmt.Println("reply len:", len(reply))

	oGroupInfo := reply[0].([]interface{})
	name := string(oGroupInfo[1].([]byte))
	consumers := oGroupInfo[3].(int64)
	pending := oGroupInfo[5].(int64)
	lastDeliveredId := string(oGroupInfo[7].([]byte))
	groupInfo = &GroupInfo{name, consumers, pending, lastDeliveredId}

	return
}

// MonitorConsumerInfo 监控消费者信息
func (mqClient *RedisStreamMQClient) MonitorConsumerInfo(streamKey string, groupName string) (vecConsumerInfo []*ConsumerInfo){
	conn := mqClient.ConnPool.Get()
	defer conn.Close()

	reply, err := redis.Values(conn.Do("XINFO", "CONSUMERS", streamKey, groupName))
	if err != nil {
		fmt.Println("XINFO CONSUMERS failed, err:", err)
		return nil
	}
	fmt.Println("reply len:", len(reply))

	for iIndex := 0; iIndex < len(reply); iIndex++ {
		oConsumerInfo := reply[iIndex].([]interface{})
		name := string(oConsumerInfo[1].([]byte))
		pending := oConsumerInfo[3].(int64)
		idle := oConsumerInfo[5].(int64)
		vecConsumerInfo = append(vecConsumerInfo, &ConsumerInfo{name, pending, idle})
	}
	return
}

testProduceClient.go

package main

import (
	"RedisStreamMqDemo/common"
	"fmt"
	"time"
)

func testPutMsg(redisCli *common.RedisStreamMQClient, textKey string, textVal string, msgCount int) {

	startTime := time.Now()
	fmt.Println("Start Test Function testPutMsg")
	for i := 0; i < msgCount; i++ {
		var strMsgKey string = fmt.Sprintf("%s-%d", textKey, i+1)
		var strMsgVal string = fmt.Sprintf("%s-%d", textVal, i+1)
		_, err := redisCli.PutMsg(common.TEST_STREAM_KEY, strMsgKey, strMsgVal)
		if err != nil {
			fmt.Println("PutMsg Failed. err:", err)
		}
	}
	fmt.Println("End Test Function testPutMsg")

	costTime := time.Since(startTime)
	fmt.Println("=========:START TIME:", startTime)
	fmt.Println("=========:COST TIME:", costTime)
}

func testPutMsgBatch(redisCli *common.RedisStreamMQClient, textKey string, textVal string, msgCount int) {

	startTime := time.Now()
	fmt.Println("Start Test Function PutMsgBatch")
	msgMap := make(map[string]string,0)
	for i := 0; i < msgCount; i++ {
		var strMsgKey string = fmt.Sprintf("%s-%d", textKey, i+1)
		var strMsgVal string = fmt.Sprintf("%s-%d", textVal, i+1)
		msgMap[strMsgKey] = strMsgVal
	}
	vecMsgId, err2 := redisCli.PutMsgBatch(common.TEST_STREAM_KEY, msgMap)
	if err2 != nil {
		fmt.Println("PutMsgBatch Failed. err:", err2)
	}
	fmt.Println("Reply Msg Id:", vecMsgId)
	fmt.Println("End Test Function PutMsgBatch")

	costTime := time.Since(startTime)
	fmt.Println("=========:START TIME:", startTime)
	fmt.Println("=========:COST TIME:", costTime)
}

func main() {

	fmt.Println("test redis stream mq producer")
	redisOpt := common.RedisConnOpt{
		Enable: true,
		Host:   "127.0.0.1",
		Port:   6379,
		TTL:    240,
	}

	redisCli := common.NewClient(redisOpt)
	fmt.Println("Test Redis Producer Client Host:", redisCli.RedisConnOpt.Host,
		", Port:", redisCli.RedisConnOpt.Port, ", DB:", redisCli.RedisConnOpt.Index)

	// 单条生产消息
	var textKey string = "DEMO-TEST-STREAM-MSG-KEY"
	var textVal string = "DEMO-TEST-STREAM-MSG-VAL"
	var msgCount int = 100
	testPutMsg(redisCli, textKey, textVal, msgCount)

	// 批量生产消息
	//var textKey2 string = "DEMO-TEST-STREAM-MSG-KEY"
	//var textVal2 string = "DEMO-TEST-STREAM-MSG-VAL"
	//var msgCount2 int = 500
	//testPutMsgBatch(redisCli, textKey2, textVal2, msgCount2)
	//testPutMsgBatch(redisCli, textKey2, textVal2, msgCount2)
	//testPutMsgBatch(redisCli, textKey2, textVal2, msgCount2)

	return
}

testConsumerClient.go

package main

import (
	"RedisStreamMqDemo/common"
	"fmt"
	"time"
)

func PrintMsgMap(msgMap map[string]map[string][]string) (key2msgIds map[string][]string, msgCount int32){
	key2msgIds = make(map[string][]string, 0)
	msgCount = 0
	for streamKey, val := range msgMap {
		//fmt.Println("StreamKey:", streamKey)
		vecMsgId := make([]string, 0)
		for msgId, msgList := range val {
			//fmt.Println("MsgId:", msgId)
			vecMsgId = append(vecMsgId, msgId)
			for msgIndex := 0; msgIndex < len(msgList); msgIndex = msgIndex + 2 {
				//var msgKey = msgList[msgIndex]
				//var msgVal = msgList[msgIndex+1]
				msgCount++
				//fmt.Println("MsgKey:", msgKey, "MsgVal:", msgVal)
			}
		}
		key2msgIds[streamKey] = vecMsgId
	}
	return
}

// 非阻塞无消费组的情况
func testNoGroup(redisCli *common.RedisStreamMQClient) {

	startTime := time.Now()
	fmt.Println("Start Test Function GetMsg")
	msgMap, err2 := redisCli.GetMsg(common.READ_MSG_AMOUNT, common.TEST_STREAM_KEY, "0")
	if err2 != nil {
		fmt.Println("GetMsg Failed. streamKey:", common.TEST_STREAM_KEY, "err:", err2)
		return
	}
	_, msgCount := PrintMsgMap(msgMap)
	fmt.Println("streamKey:", common.TEST_STREAM_KEY, "Ack Msg Count:", msgCount)
	fmt.Println("End Test Function GetMsg")

	costTime := time.Since(startTime)
	fmt.Println("=========:START TIME:", startTime)
	fmt.Println("=========:COST TIME:", costTime)
}

// 阻塞模式无消费组的情况
func testNoGroupBlock(redisCli *common.RedisStreamMQClient) {

	startTime := time.Now()
	fmt.Println("Start Test Function GetMsgBlock")
	msgMap, err := redisCli.GetMsgBlock(common.READ_MSG_BLOCK_SEC, common.READ_MSG_AMOUNT, common.TEST_STREAM_KEY)
	if err != nil {
		fmt.Println("GetMsg Failed. streamKey:", common.TEST_STREAM_KEY, "err:", err)
		return
	}
	_, msgCount := PrintMsgMap(msgMap)
	fmt.Println("streamKey:", common.TEST_STREAM_KEY, "Ack Msg Count:", msgCount)
	fmt.Println("End Test Function GetMsgBlock")

	costTime := time.Since(startTime)
	fmt.Println("=========:START TIME:", startTime)
	fmt.Println("=========:COST TIME:", costTime)
}

// 非阻塞消费
func testGroupConsumer(redisCli *common.RedisStreamMQClient, groupName string, consumerName string, msgAmount int32){

	/*
	fmt.Println("Start Test Function CreateConsumerGroup")
	err3 := redisCli.CreateConsumerGroup(common.TEST_STREAM_KEY, groupName, "0")
	if err3 != nil {
		fmt.Println("CreateConsumerGroup Failed. err:", err3)
		return
	}
	fmt.Println("End Start Test Function CreateConsumerGroup")
	*/
	startTime := time.Now()
	fmt.Println("Start Test Function GetMsgByGroupConsumer")
	msgMap3, err3 := redisCli.GetMsgByGroupConsumer(common.TEST_STREAM_KEY, groupName, consumerName, msgAmount)

	if err3 != nil {
		fmt.Println("GetMsgByGroupConsumer Failed. err:", err3)
		return
	}
	fmt.Println("End Test Function GetMsgByGroupConsumer")

	fmt.Println("Start Test Function ReplyAck")
	key2msgIds3, _ := PrintMsgMap(msgMap3)
	for streamKey, vecMsgId := range key2msgIds3 {
		//fmt.Println("streamKey:", streamKey, "groupName:", groupName, "consumerName:", consumerName, "Ack Msg Count:", msgCount)
		err3 = redisCli.ReplyAck(streamKey, groupName, vecMsgId)
		if err3 != nil {
			fmt.Println("ReplyAck Failed. err:", err3)
		}
	}

	fmt.Println("End Test Function ReplyAck")
	costTime := time.Since(startTime)
	fmt.Println("=========:START TIME:", startTime)
	fmt.Println("=========:COST TIME:", costTime)
}

// 阻塞消费
func testGroupConsumerBlock(redisCli *common.RedisStreamMQClient, groupName string, consumerName string, msgAmount int32){

	/*
	fmt.Println("Start Test Function CreateConsumerGroup")
	err3 := redisCli.CreateConsumerGroup(common.TEST_STREAM_KEY, groupName, "0")
	if err3 != nil {
		fmt.Println("CreateConsumerGroup Failed. err:", err3)
		return
	}
	fmt.Println("End Start Test Function CreateConsumerGroup")
	*/

	startTime := time.Now()
	fmt.Println("Start Test Function GetMsgBlockByGroupConsumer")
	msgMap6, err3 := redisCli.GetMsgBlockByGroupConsumer(common.READ_MSG_BLOCK_SEC,
		common.TEST_STREAM_KEY, groupName, "ConsumerName1-A", msgAmount)

	if err3 != nil {
		fmt.Println("GetMsgBlockByGroupConsumer Failed. err:", err3)
		return
	}
	fmt.Println("End Test Function GetMsgBlockByGroupConsumer")

	fmt.Println("Start Test Function ReplyAck")
	key2msgIds6, msgCount := PrintMsgMap(msgMap6)
	for streamKey, vecMsgId := range key2msgIds6 {
		fmt.Println("streamKey:", streamKey, "groupName:", groupName, "consumerName:", consumerName, "Ack Msg Count:", msgCount)
		err3 = redisCli.ReplyAck(streamKey, groupName, vecMsgId)
		if err3 != nil {
			fmt.Println("ReplyAck Failed. err:", err3)
		}
	}
	fmt.Println("End Test Function ReplyAck")
	costTime := time.Since(startTime)
	fmt.Println("=========:START TIME:", startTime)
	fmt.Println("=========:COST TIME:", costTime)
}

func testPendingList(redisCli *common.RedisStreamMQClient, groupName string, consumerName string){

	fmt.Println("Start Test Function GetPendingList")
	vecPendingMsg, _ := redisCli.GetPendingList(common.TEST_STREAM_KEY, groupName, consumerName, common.READ_MSG_AMOUNT)

	vecMsgId := make([]string, 0)
	for _, pendingMsg := range vecPendingMsg {
		vecMsgId = append(vecMsgId, pendingMsg.MsgId)
	}
	_ = redisCli.ReplyAck(common.TEST_STREAM_KEY, groupName, vecMsgId)
	fmt.Println("Start Test Function GetPendingList")
}

func testXinfo(redisCli *common.RedisStreamMQClient, streamKey string, groupName string) {
	fmt.Println("Start Test Function MonitorMqInfo")
	streamMqInfo := redisCli.MonitorMqInfo(streamKey)
	fmt.Println("streamMqInfo:{")
	fmt.Println("Length:", streamMqInfo.Length)
	fmt.Println("RedixTreeKeys:", streamMqInfo.RedixTreeKeys)
	fmt.Println("RedixTreeNodes:",streamMqInfo.RedixTreeNodes)
	fmt.Println("LastGeneratedId:", streamMqInfo.LastGeneratedId)
	fmt.Println("Groups:",streamMqInfo.Groups)
	fmt.Println("FirstEntry:", streamMqInfo.FirstEntry)
	fmt.Println("LastEntry:", streamMqInfo.LastEntry)
	fmt.Println("}")
	fmt.Println("End Test Function MonitorMqInfo")

	fmt.Println("Start Test Function MonitorConsumerGroupInfo")
	groupInfo := redisCli.MonitorConsumerGroupInfo(streamKey)
	if groupInfo != nil {
		fmt.Println("groupInfo:{")
		fmt.Println("Name:", groupInfo.Name)
		fmt.Println("Consumers:", groupInfo.Consumers)
		fmt.Println("Pending:", groupInfo.Pending)
		fmt.Println("LastDeliveredId:", groupInfo.LastDeliveredId)
		fmt.Println("}")
	}
	fmt.Println("End Test Function MonitorConsumerGroupInfo")

	fmt.Println("Start Test Function MonitorConsumerInfo")
	vecConsumerInfo := redisCli.MonitorConsumerInfo(streamKey, groupName)
	fmt.Println("groupInfo:{")
	for _, consumerInfo := range vecConsumerInfo {
		fmt.Println("Name:", consumerInfo.Name)
		fmt.Println("Pending:", consumerInfo.Pending)
		fmt.Println("Idle:", consumerInfo.Idle)
		fmt.Println("===========================")
	}
	fmt.Println("}")
	fmt.Println("End Test Function MonitorConsumerInfo")
}

func main() {

	fmt.Println("test redis stream mq consumer")
	redisOpt := common.RedisConnOpt{
		Enable: true,
		Host:   "127.0.0.1",
		Port:   6379,
		TTL:    240,
	}

	redisCli := common.NewClient(redisOpt)
	fmt.Println("Test Redis Consumer Client Host:", redisCli.RedisConnOpt.Host,
		", Port:", redisCli.RedisConnOpt.Port, ", DB:", redisCli.RedisConnOpt.Index)

	//无消费者组,所有消费者都能消费所有消息
	//testNoGroup(redisCli)

	//无消费者组,所有消费者都能消费所有消息(阻塞模式)
	//testNoGroupBlock(redisCli)

	//有消费者组,所有消费者都不能重复消费组内的消息
	var groupName1 string = "GroupName1"
	var consumerName1 string = "ConsumerName1-" + time.Now().String()
	var msgAmount1 int32 = 50000
	testGroupConsumer(redisCli, groupName1, consumerName1, msgAmount1)
	testPendingList(redisCli, groupName1, consumerName1)

	//有消费者组,所有消费者都不能重复消费组内的消息(阻塞模式)
	//var groupName2 string = "GroupName2"
	//var consumerName2 string = "ConsumerName2-" + time.Now().String()
	//var msgAmount2 int32 = 50000
	//testGroupConsumerBlock(redisCli, groupName2, consumerName2, msgAmount2)
	//testPendingList(redisCli, groupName2, consumerName2)

	//XINFO测试
	//var groupName1 string = "testgroupname"
	//var streamKey string = "test-mq"
	//testXinfo(redisCli, streamKey, groupName1)

	return
}

这里给出完整代码下载地址,下载可直接运行。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/995401.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存