消息推送模块结构图
基础消息模板
// Message 群聊消息模板
type Message struct {
Id int64 //消息Id
GroupId int64 //消息所属群组id
UserId int64 //消息所属用户id
MessageData string //消息主体
CreateTime string //消息创建时间
UpdateTime string //消息更新时间
Status int64 //消息状态
}
import (
"encoding/json"
"github.com/dpwgc/kapokmq-go-client/conn"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"strconv"
"sync"
)
//客户端连接表,把每个客户端都放进来。Key为WsKey结构体(GroupId int64, UserId int64)。Value为websocket连接
var clients = make(map[models.WsKey]*websocket.Conn)
//广播通道,用于广播推送群聊用户发送的消息(带缓冲区,提高并发速率)
var broadcast = make(chan models.UserMessage, 10000)
//锁,用于避免map并发写
var lock sync.RWMutex
InitChat 聊天室模块初始化 这里开启两个协程。 pushMessages广播协程:持续接收用户的消息并广播推送给其他用户。 insertMessage数据库连接协程:持续接收消息队列中的消息并将消息插入到数据库
// InitChat 聊天室模块初始化
func InitChat() {
//启动广播推送协程,推送消息到各个客户端
go pushMessages()
//启动消息批量处理协程,借助消息队列将消息批量插入数据库
go insertMessage()
}
ChatLink 监听模块 建立WebSocket连接并监听用户发送的消息,将用户发送的消息插入广播通道与消息队列。
func ChatLink(c *gin.Context) {
//获取连接的用户id与群组id
userId := c.Param("userId")
groupId := c.Param("groupId")
//将id转为int64类型
gid, _ := strconv.ParseInt(groupId, 10, 64)
uid, _ := strconv.ParseInt(userId, 10, 64)
//升级get请求为webSocket协议
ws, err := config.UpGrader.Upgrade(c.Writer, c.Request, nil)
//生成该websocket连接的key(以群组id和用户id为key)
wsKey := models.WsKey{
GroupId: gid,
UserId: uid,
}
//将当前连接的客户端放入客户端map(clients)中
lock.RLock()
clients[wsKey] = ws
lock.RUnlock()
if err != nil {
fmt.Println(err)
delete(clients, wsKey) //删除map中的客户端
return
}
defer ws.Close()
for {
//读取websocket发来的数据
_, message, err := ws.ReadMessage()
if err != nil {
fmt.Println(err)
delete(clients, wsKey) //删除map中的客户端
break
}
/*
略。。。
对message进行加工处理
得到userMessage
*/
//将聊天消息模板插入广播通道,交由pushMessages协程处理
broadcast <- userMessage
//将消息发送至消息队列(流量削峰,避免数据库被冲垮)
byteMsg, err := json.Marshal(userMessage)
if err == nil {
conn.ProducerSend(string(byteMsg), 0)
}
}
}
pushMessages 广播推送模块 广播推送消息到其他在线客户端,将用户消息推送给该群组(同一GroupId)里所有在线的用户。
// go pushMessages()
func pushMessages() {
for {
//读取广播通道中的消息
msg := <-broadcast
//遍历现有的websocket客户端
for key, client := range clients {
//获取该客户端的群组id(GroupId)
gid := key.GroupId
//匹配客户端,判断该客户端的GroupId是否与该消息的GroupId一致,如果相同,则将该消息投递给该客户端
if msg.GroupId == gid {
//发送消息,失败重试次数:3
for i := 0; i < 3; i++ {
//发送消息到消费者客户端
err := client.WriteJSON(msg)
//如果发送成功
if err == nil {
//结束循环
break
}
//如果到达重试次数,但仍未发送成功
if i == 2 && err != nil {
//客户端关闭
client.Close()
//删除map中的客户端
delete(clients, key)
}
}
}
}
}
}
数据库插入模块 从消息队列中取出消息,将消息插入数据库。
// go insertMessage()
func insertMessage() {
for {
//从消息队列中获取消息
message, _ := utils.GetMsg()
//开始事务
tx := mysql.Db().Begin()
/*
略。。。
将message插入数据库
更新群组与用户的信息
*/
//事务提交
tx.Commit()
}
}
消息队列连接模块 消息队列下载 https://github.com/dpwgc/kapokmq-server 消息队列源码 https://gitee.com/dpwgc/kapokmq
import (
"encoding/json"
"github.com/dpwgc/kapokmq-go-client/conf"
"github.com/dpwgc/kapokmq-go-client/conn"
"github.com/spf13/viper"
"local/models"
)
// InitMQ 与Kapokmq消息队列建立连接
func InitMQ() {
//创建生产者模板
producer := conf.Producer{
MqAddr: viper.GetString("kapokmq.mqAddr"), //消息队列服务IP地址
MqPort: viper.GetString("kapokmq.mqPort"), //消息队列服务端口号
MqProtocol: viper.GetString("kapokmq.mqProtocol"), //消息队列连接协议:ws/wss
Topic: viper.GetString("kapokmq.topic"), //生产者订阅的主题
ProducerId: viper.GetString("kapokmq.producerId"), //生产者Id
SecretKey: viper.GetString("kapokmq.secretKey"), //消息队列访问密钥
}
//生产者与消息队列建立连接
err := conn.NewProducerConn(producer)
if err != nil {
panic(err)
}
//创建消费者模板
consumer := conf.Consumer{
MqAddr: viper.GetString("kapokmq.mqAddr"), //消息队列服务IP地址
MqPort: viper.GetString("kapokmq.mqPort"), //消息队列服务端口号
MqProtocol: viper.GetString("kapokmq.mqProtocol"), //消息队列连接协议:ws/wss
Topic: viper.GetString("kapokmq.topic"), //消费者订阅的主题
ConsumerId: viper.GetString("kapokmq.consumerId"), //消费者Id
SecretKey: viper.GetString("kapokmq.secretKey"), //消息队列访问密钥
}
//消费者与消息队列建立连接
err = conn.NewConsumerConn(consumer)
if err != nil {
panic(err)
}
}
// GetMsg 从消息队列中获取消息
func GetMsg() (models.UserMessage, bool) {
//接收消息队列推送过来的消息mqMsg
mqMsg, isOk := conn.ConsumerReceive()
//取出消息队列的消息主体,将其解析到models.UserMessage
userMessage := models.UserMessage{}
if isOk {
err := json.Unmarshal([]byte(mqMsg.MessageData), &userMessage)
if err != nil {
return userMessage, false
}
return userMessage, true
}
return userMessage, false
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)