GIN试玩:GIN开启WEBSOCKET

GIN试玩:GIN开启WEBSOCKET,第1张

安装依赖
go get "github.com/gorilla/websocket"
代码实现
package Socket

import (
	"encoding/json"
	"flag"
	"fmt"
	"github.com/gorilla/websocket"
	"log"
	"net/http"
)

// ConnClients socket客户
type ConnClients struct {
	conn *websocket.Conn // 连接对象
	send chan []byte // 发送消息通道
}
// ConnServer socket容器
type ConnServer struct {
	broadcast chan []byte // 广播消息通道
	register chan *ConnClients // 注册的客户
	unregister chan *ConnClients // 未注册的客户
	clients map[*ConnClients]*ConnClients // 客户集合
}
// 定义全局控制器
var connServer = &ConnServer{
	broadcast:  make(chan []byte),
	register:   make(chan *ConnClients),
	unregister: make(chan *ConnClients),
	clients: make(map[*ConnClients]*ConnClients),
}

// 升级配置
var socketUpgrade = websocket.Upgrader{
	ReadBufferSize:  1024,
	WriteBufferSize: 1024,
	CheckOrigin: func(r *http.Request) bool {
		return true
	},
}

func websocketServer(w http.ResponseWriter, r *http.Request) {
	// 将http协议升级为websocket协议
	conn, err := socketUpgrade.Upgrade(w, r, nil)
	if err != nil {
		log.Println(err)
		return
	}
	// 新客户
	connClient := &ConnClients{
		conn: conn,
		send: make(chan []byte, 256),
	}
	// 加入注册列表
	connServer.register <- connClient

	// 开启单独线程用于收发消息
	go connClient.receiveMsg()
	go connClient.sendMsg()
}
// StartWebSocket 启动函数
func StartWebSocket() {
	var addr = flag.String("socketAddr", ":4433", "http service address")
	// 开启线程监听socket控制器
	go connServer.run()
	http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
		websocketServer(w, r)
	})
	go http.ListenAndServe(*addr, nil)
	fmt.Println("socket server started")
}
type WebsocketData struct {
	Action string
}
func (c *ConnClients)receiveMsg()  {
	for {
		_, message, err := c.conn.ReadMessage()
		if err != nil {
			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
				log.Printf("error: %v", err)
			}
			break
		}
		var msg WebsocketData
		err = json.Unmarshal(message, &msg)
		// 解析socket消息,如果是close,则注销当前客户,如果是心跳信息ping,则返回pong
		if msg.Action == "close" {
			connServer.unregister <- c
			break
		}
		if msg.Action == "ping" {
			j, _ := json.Marshal(&WebsocketData{Action: "pong"})
			c.send <- j
		}
	}
}
func (c *ConnClients)sendMsg()  {
	defer func() {
		c.conn.Close()
	}()
	for {
		select {
		// 循环遍历发送消息通道
		case message, ok := <-c.send:
			if !ok {
				c.conn.WriteMessage(websocket.CloseMessage, []byte{})
				return
			}
			c.conn.WriteMessage(websocket.TextMessage, message)
		}
	}
}

func (m *ConnServer) run()  {
	for {
		select {
		case client := <-m.register: // 新增客户
			m.clients[client] = client
		case client := <-m.unregister: // 注销
			if _, ok := m.clients[client]; ok {
				delete(m.clients, client)
				close(client.send)
			}
		case message := <-m.broadcast: // 广播消息到每个客户
			for client := range m.clients {
				client.send <- message
			}
		}
	}
}
// AddMsg 公用方法 推送消息给客户端
func AddMsg(t []byte)  {
	connServer.broadcast <- t
}

思路: 作为服务器,必然是一对多的关系,这里使用订阅者模式

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/994393.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存