基于Go Gin+WebSocket+消息队列实现即时通讯多人群聊功能模块

基于Go Gin+WebSocket+消息队列实现即时通讯多人群聊功能模块,第1张

聊天室模块源码 https://gitee.com/dpwgc/dpwgc_im_go_ws 消息队列 https://gitee.com/dpwgc/kapokmq (这里用的是自己写的消息队列,也可以换成别的RabbitMQ或RocketMQ之类的)
消息推送模块结构图


基础消息模板
// Message 群聊消息模板
type Message struct {
	Id          int64  	//消息Id
	GroupId     int64  	//消息所属群组id
	UserId      int64  	//消息所属用户id
	MessageData string 	//消息主体
	CreateTime  string 	//消息创建时间
	UpdateTime  string 	//消息更新时间
	Status      int64  	//消息状态
}

import (
	"encoding/json"
	"github.com/dpwgc/kapokmq-go-client/conn"
	"github.com/gin-gonic/gin"
	"github.com/gorilla/websocket"
	"strconv"
	"sync"
)

//客户端连接表,把每个客户端都放进来。Key为WsKey结构体(GroupId int64, UserId int64)。Value为websocket连接
var clients = make(map[models.WsKey]*websocket.Conn)

//广播通道,用于广播推送群聊用户发送的消息(带缓冲区,提高并发速率)
var broadcast = make(chan models.UserMessage, 10000)

//锁,用于避免map并发写
var lock sync.RWMutex

InitChat 聊天室模块初始化 这里开启两个协程。 pushMessages广播协程:持续接收用户的消息并广播推送给其他用户。 insertMessage数据库连接协程:持续接收消息队列中的消息并将消息插入到数据库

// InitChat 聊天室模块初始化
func InitChat() {

	//启动广播推送协程,推送消息到各个客户端
	go pushMessages()

	//启动消息批量处理协程,借助消息队列将消息批量插入数据库
	go insertMessage()
}

ChatLink 监听模块

建立WebSocket连接并监听用户发送的消息,将用户发送的消息插入广播通道与消息队列。
func ChatLink(c *gin.Context) {

	//获取连接的用户id与群组id
	userId := c.Param("userId")
	groupId := c.Param("groupId")

	//将id转为int64类型
	gid, _ := strconv.ParseInt(groupId, 10, 64)
	uid, _ := strconv.ParseInt(userId, 10, 64)

	//升级get请求为webSocket协议
	ws, err := config.UpGrader.Upgrade(c.Writer, c.Request, nil)
	
	//生成该websocket连接的key(以群组id和用户id为key)
	wsKey := models.WsKey{
		GroupId: gid,
		UserId:  uid,
	}
	
	//将当前连接的客户端放入客户端map(clients)中
	lock.RLock()
	clients[wsKey] = ws
	lock.RUnlock()

	if err != nil {
		fmt.Println(err)
		delete(clients, wsKey) //删除map中的客户端
		return
	}
	defer ws.Close()

	for {
		//读取websocket发来的数据
		_, message, err := ws.ReadMessage()
		if err != nil {
			fmt.Println(err)
			delete(clients, wsKey) //删除map中的客户端
			break
		}
		
		/*
			略。。。
			对message进行加工处理
			得到userMessage
		*/
		
		//将聊天消息模板插入广播通道,交由pushMessages协程处理
		broadcast <- userMessage

		//将消息发送至消息队列(流量削峰,避免数据库被冲垮)
		byteMsg, err := json.Marshal(userMessage)
		if err == nil {
			conn.ProducerSend(string(byteMsg), 0)
		}
	}
}

pushMessages 广播推送模块

广播推送消息到其他在线客户端,将用户消息推送给该群组(同一GroupId)里所有在线的用户。
// go pushMessages()
func pushMessages() {
	for {
		//读取广播通道中的消息
		msg := <-broadcast

		//遍历现有的websocket客户端
		for key, client := range clients {

			//获取该客户端的群组id(GroupId)
			gid := key.GroupId

			//匹配客户端,判断该客户端的GroupId是否与该消息的GroupId一致,如果相同,则将该消息投递给该客户端
			if msg.GroupId == gid  {
				//发送消息,失败重试次数:3
				for i := 0; i < 3; i++ {
					//发送消息到消费者客户端
					err := client.WriteJSON(msg)
					//如果发送成功
					if err == nil {
						//结束循环
						break
					}
					//如果到达重试次数,但仍未发送成功
					if i == 2 && err != nil {
						//客户端关闭
						client.Close()
						//删除map中的客户端
						delete(clients, key)
					}
				}
			}
		}
	}
}

数据库插入模块 从消息队列中取出消息,将消息插入数据库。
// go insertMessage()
func insertMessage() {
	for {

		//从消息队列中获取消息
		message, _ := utils.GetMsg()

		//开始事务
		tx := mysql.Db().Begin()

		/*
			略。。。
			将message插入数据库
			更新群组与用户的信息
		*/

		//事务提交
		tx.Commit()
	}
}

消息队列连接模块 消息队列下载 https://github.com/dpwgc/kapokmq-server 消息队列源码 https://gitee.com/dpwgc/kapokmq
import (
	"encoding/json"
	"github.com/dpwgc/kapokmq-go-client/conf"
	"github.com/dpwgc/kapokmq-go-client/conn"
	"github.com/spf13/viper"
	"local/models"
)

// InitMQ 与Kapokmq消息队列建立连接
func InitMQ() {
	//创建生产者模板
	producer := conf.Producer{
		MqAddr:     viper.GetString("kapokmq.mqAddr"),     //消息队列服务IP地址
		MqPort:     viper.GetString("kapokmq.mqPort"),     //消息队列服务端口号
		MqProtocol: viper.GetString("kapokmq.mqProtocol"), //消息队列连接协议:ws/wss
		Topic:      viper.GetString("kapokmq.topic"),      //生产者订阅的主题
		ProducerId: viper.GetString("kapokmq.producerId"), //生产者Id
		SecretKey:  viper.GetString("kapokmq.secretKey"),  //消息队列访问密钥
	}

	//生产者与消息队列建立连接
	err := conn.NewProducerConn(producer)
	if err != nil {
		panic(err)
	}

	//创建消费者模板
	consumer := conf.Consumer{
		MqAddr:     viper.GetString("kapokmq.mqAddr"),     //消息队列服务IP地址
		MqPort:     viper.GetString("kapokmq.mqPort"),     //消息队列服务端口号
		MqProtocol: viper.GetString("kapokmq.mqProtocol"), //消息队列连接协议:ws/wss
		Topic:      viper.GetString("kapokmq.topic"),      //消费者订阅的主题
		ConsumerId: viper.GetString("kapokmq.consumerId"), //消费者Id
		SecretKey:  viper.GetString("kapokmq.secretKey"),  //消息队列访问密钥
	}

	//消费者与消息队列建立连接
	err = conn.NewConsumerConn(consumer)
	if err != nil {
		panic(err)
	}
}

// GetMsg 从消息队列中获取消息
func GetMsg() (models.UserMessage, bool) {

	//接收消息队列推送过来的消息mqMsg
	mqMsg, isOk := conn.ConsumerReceive()

	//取出消息队列的消息主体,将其解析到models.UserMessage
	userMessage := models.UserMessage{}
	if isOk {
		err := json.Unmarshal([]byte(mqMsg.MessageData), &userMessage)
		if err != nil {
			return userMessage, false
		}
		return userMessage, true
	}
	return userMessage, false
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/994198.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存