Leaf框架

Leaf框架,第1张

文章目录 Leaf的模块机制协议源码分析

Leaf的模块机制

一个Leaf开发的游戏服务器由多个模块组成,模块有一下特点:
1、每个模块运行在一个单独的goroutine中
2、模块间通过一套轻量的RPC机制通讯(leaf/charpc)
游戏服务器在启动时进行模块的注册,例如:

leaf.Run(
	game.Module,
	gate.Module,
	login.Module,
)

这里按顺序注册了game、gate、login三个模块。每个模块都需要实现接口:

type Module interface {
	OnInit()
	OnDestroy()
	Run(closeSig chan bool)
}

login模块的module:

var (
	skeleton = base.NewSkeleton()
	ChanRPC  = skeleton.ChanRPCServer
)
type Module struct {
	*module.Skeleton //Skeleton实现了Run方法
}
func (m *Module) OnInit() {
	m.Skeleton = skeleton
}
func (m *Module) OnDestroy() {
	log.Release("login module destroy")
}
协议源码分析

一个请求协议设置和处理的流程。
leaf框架简单服务器:https://github.com/name5566/leaf/blob/master/TUTORIAL_ZH.md
按照链接的例子分析。
当我们定义一个协议的时候先要注册到信息处理器。

package msg

import (
	"github.com/name5566/leaf/network/json"
)
// 使用默认的 JSON 消息处理器(默认还提供了 protobuf 消息处理器)
var Processor = json.NewProcessor()
func init() {
	// 这里我们注册了一个 JSON 消息 Hello
	Processor.Register(&Hello{})
}
// 一个结构体定义了一个 JSON 消息的格式
// 消息协议名为 Hello
type Hello struct {
	Name string
}

客服端发送到游戏服务器的消息需要通过gate模块,简而言之,gate模块决定了某个消息具体交给内部的哪个模块来处理。这里我们将Hello消息路由到game模块中。

package gate
import (
	"server/game"
	"server/msg"
)
func init() {
	// 这里指定消息 Hello 路由到 game 模块
	// 模块间使用 ChanRPC 通讯,消息路由也不例外
	msg.Processor.SetRouter(&msg.Hello{}, game.ChanRPC)
}

消息路由到了game模块,那么就需要有一个处理器或者说方法来接受这个协议。

package internal

import (
	"github.com/name5566/leaf/log"
	"github.com/name5566/leaf/gate"
	"reflect"
	"server/msg"
)
func init() {
	// 向当前模块(game 模块)注册 Hello 消息的消息处理函数 handleHello
	handler(&msg.Hello{}, handleHello)
}
func handler(m interface{}, h interface{}) {
	skeleton.RegisterChanRPC(reflect.TypeOf(m), h)
}
func handleHello(args []interface{}) {
	// 收到的 Hello 消息
	m := args[0].(*msg.Hello)
	// 消息的发送者
	a := args[1].(gate.Agent)

	// 输出收到的消息的内容
	log.Debug("hello %v", m.Name)

	// 给发送者回应一个 Hello 消息
	a.WriteMsg(&msg.Hello{
		Name: "client",
	})
}

客户端代码:

package main
import (
	"encoding/binary"
	"net"
)
func main() {
	conn, err := net.Dial("tcp", "127.0.0.1:3563")
	if err != nil {
		panic(err)
	}

	// Hello 消息(JSON 格式)
	// 对应游戏服务器 Hello 消息结构体
	data := []byte(`{
		"Hello": {
			"Name": "leaf"
		}
	}`)
	// len + data
	m := make([]byte, 2+len(data))
	// 默认使用大端序
	binary.BigEndian.PutUint16(m, uint16(len(data)))
	copy(m[2:], data)
	// 发送消息
	conn.Write(m)
}

通过源码分析一波整个流程。
设置消息到消息处理器

 Processor.Register(&Hello{})

//将信息结构体类型存入到消息处理器中,在game模块使用
func (p *Processor) Register(msg interface{}) string {
	msgType := reflect.TypeOf(msg) //获取msg的类型
	if msgType == nil || msgType.Kind() != reflect.Ptr {
		log.Fatal("json message pointer required")
	}
	msgID := msgType.Elem().Name() //获取协议的名称
	if msgID == "" {
		log.Fatal("unnamed json message")
	}
	if _, ok := p.msgInfo[msgID]; ok {
		log.Fatal("message %v is already registered", msgID)
	}

	i := new(MsgInfo) //创建一个MsgInfo对象,协议处理器其实就是一个以协议名称为key以MsgInfo为value的map集合
	i.msgType = msgType //放入方法的类型
	p.msgInfo[msgID] = i //协议名对应着协议类型
	return msgID
}

设置路由信息

msg.Processor.SetRouter(&msg.Hello{}, game.ChanRPC)

//添加路由信息,在gate模块使用
func (p *Processor) SetRouter(msg interface{}, msgRouter *chanrpc.Server) {
	msgType := reflect.TypeOf(msg) //获取协议的类型
	if msgType == nil || msgType.Kind() != reflect.Ptr {
		log.Fatal("json message pointer required")
	}
	msgID := msgType.Elem().Name() //获取协议的名称
	i, ok := p.msgInfo[msgID] //从协议处理器中通过协议名获取msgInfo
	if !ok {
		log.Fatal("message %v not registered", msgID)
	}

	i.msgRouter = msgRouter //将路由协议添加到msgInfo中
}

设置协议对应的处理方法

handler(&msg.Hello{}, handleHello)

func (s *Skeleton) RegisterChanRPC(id interface{}, f interface{}) {
	if s.ChanRPCServer == nil {
		panic("invalid ChanRPCServer")
	}

	s.server.Register(id, f)
}

func (s *Server) Register(id interface{}, f interface{}) {
	switch f.(type) { //判断方法的类型
	case func([]interface{}):
	case func([]interface{}) interface{}:
	case func([]interface{}) []interface{}:
	default:
		panic(fmt.Sprintf("function id %v: definition of function is invalid", id))
	}

	if _, ok := s.functions[id]; ok {
		panic(fmt.Sprintf("function id %v: already registered", id))
	}

	s.functions[id] = f //协议名对应的函数存入server的functions中
}

type Server struct {
	functions map[interface{}]interface{} //保存了所有协议对应的函数
	ChanCall  chan *CallInfo
}

服务器接收客服端的链接

//src/github.com/name5566/leaf/network/tcp_server.go:62
func (server *TCPServer) run() { //负责客户端请求的链接
...  //省略代码
for {
		conn, err := server.ln.Accept()
		... //省略代码
		tcpConn := newTCPConn(conn, server.PendingWriteNum, server.msgParser)
		agent := server.NewAgent(tcpConn)
		...//省略代码
} 

请求协议的接收

//src/github.com/name5566/leaf/gate/gate.go:93
func (a *agent) Run() {
	for {
		data, err := a.conn.ReadMsg() //读取客服端发送来的信息
		if err != nil {
			log.Debug("read message: %v", err)
			break
		}

		if a.gate.Processor != nil {
			msg, err := a.gate.Processor.Unmarshal(data)
			if err != nil {
				log.Debug("unmarshal message error: %v", err)
				break
			}
			err = a.gate.Processor.Route(msg, a) //在gate模块中调用协议处理器
			if err != nil {
				log.Debug("route message error: %v", err)
				break
			}
		}
	}
}

//将消息路由到对应的模块
func (p *Processor) Route(msg interface{}, userData interface{}) error {
	// raw
	if msgRaw, ok := msg.(MsgRaw); ok {
		i, ok := p.msgInfo[msgRaw.msgID]
		if !ok {
			return fmt.Errorf("message %v not registered", msgRaw.msgID)
		}
		if i.msgRawHandler != nil {
			i.msgRawHandler([]interface{}{msgRaw.msgID, msgRaw.msgRawData, userData})
		}
		return nil
	}

	// json
	msgType := reflect.TypeOf(msg) //获取协议类型
	if msgType == nil || msgType.Kind() != reflect.Ptr {
		return errors.New("json message pointer required")
	}
	msgID := msgType.Elem().Name() //协议名
	i, ok := p.msgInfo[msgID] //通过协议名获取msgInfo,这个就是前面func (p *Processor) Register(msg interface{}) string {}存进去的
	if !ok {
		return fmt.Errorf("message %v not registered", msgID)
	}
	if i.msgHandler != nil {
		i.msgHandler([]interface{}{msg, userData})
	}
	if i.msgRouter != nil { //msgInfo中的路由,这个是前面func (p *Processor) SetRouter(msg interface{}, msgRouter *chanrpc.Server) {}存进去的
		i.msgRouter.Go(msgType, msg, userData)
	}
	return nil
}

//将协议和函数封装存入chan
func (s *Server) Go(id interface{}, args ...interface{}) {
	f := s.functions[id] //通过协议名获取函数,这个是func (s *Skeleton) RegisterChanRPC(id interface{}, f interface{}) {}存进去的
	if f == nil {
		return
	}

	defer func() {
		recover()
	}()

	s.ChanCall <- &CallInfo{ //将函数和参数封装成结构体存入到chan中,等待其他协程调用
		f:    f,
		args: args,
	}
}

方法的调用

//src/github.com/name5566/leaf/module/skeleton.go:45
func (s *Skeleton) Run(closeSig chan bool) {
	for {
		select {
		case <-closeSig:
			s.commandServer.Close()
			s.server.Close()
			for !s.g.Idle() || !s.client.Idle() {
				s.g.Close()
				s.client.Close()
			}
			return
		case ri := <-s.client.ChanAsynRet:
			s.client.Cb(ri)
		case ci := <-s.server.ChanCall: //封装的协议和函数在这里拿出来处理
			s.server.Exec(ci) //调用CallInfo结构体中函数的
		case ci := <-s.commandServer.ChanCall:
			s.commandServer.Exec(ci) 
		case cb := <-s.g.ChanCb:
			s.g.Cb(cb)
		case t := <-s.dispatcher.ChanTimer:
			t.Cb()
		}
	}
}

上面这个函数最后就调用了handleHello()这个我们自己写的业务逻辑方法了,然后将返回值存入到chanRet中,等待其他协程消费返回给客户端。整个请求过程就结束了。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/990780.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存