一个Leaf开发的游戏服务器由多个模块组成,模块有一下特点:
1、每个模块运行在一个单独的goroutine中
2、模块间通过一套轻量的RPC机制通讯(leaf/charpc)
游戏服务器在启动时进行模块的注册,例如:
leaf.Run(
game.Module,
gate.Module,
login.Module,
)
这里按顺序注册了game、gate、login三个模块。每个模块都需要实现接口:
type Module interface {
OnInit()
OnDestroy()
Run(closeSig chan bool)
}
login模块的module:
var (
skeleton = base.NewSkeleton()
ChanRPC = skeleton.ChanRPCServer
)
type Module struct {
*module.Skeleton //Skeleton实现了Run方法
}
func (m *Module) OnInit() {
m.Skeleton = skeleton
}
func (m *Module) OnDestroy() {
log.Release("login module destroy")
}
协议源码分析
一个请求协议设置和处理的流程。
leaf框架简单服务器:https://github.com/name5566/leaf/blob/master/TUTORIAL_ZH.md
按照链接的例子分析。
当我们定义一个协议的时候先要注册到信息处理器。
package msg
import (
"github.com/name5566/leaf/network/json"
)
// 使用默认的 JSON 消息处理器(默认还提供了 protobuf 消息处理器)
var Processor = json.NewProcessor()
func init() {
// 这里我们注册了一个 JSON 消息 Hello
Processor.Register(&Hello{})
}
// 一个结构体定义了一个 JSON 消息的格式
// 消息协议名为 Hello
type Hello struct {
Name string
}
客服端发送到游戏服务器的消息需要通过gate模块,简而言之,gate模块决定了某个消息具体交给内部的哪个模块来处理。这里我们将Hello消息路由到game模块中。
package gate
import (
"server/game"
"server/msg"
)
func init() {
// 这里指定消息 Hello 路由到 game 模块
// 模块间使用 ChanRPC 通讯,消息路由也不例外
msg.Processor.SetRouter(&msg.Hello{}, game.ChanRPC)
}
消息路由到了game模块,那么就需要有一个处理器或者说方法来接受这个协议。
package internal
import (
"github.com/name5566/leaf/log"
"github.com/name5566/leaf/gate"
"reflect"
"server/msg"
)
func init() {
// 向当前模块(game 模块)注册 Hello 消息的消息处理函数 handleHello
handler(&msg.Hello{}, handleHello)
}
func handler(m interface{}, h interface{}) {
skeleton.RegisterChanRPC(reflect.TypeOf(m), h)
}
func handleHello(args []interface{}) {
// 收到的 Hello 消息
m := args[0].(*msg.Hello)
// 消息的发送者
a := args[1].(gate.Agent)
// 输出收到的消息的内容
log.Debug("hello %v", m.Name)
// 给发送者回应一个 Hello 消息
a.WriteMsg(&msg.Hello{
Name: "client",
})
}
客户端代码:
package main
import (
"encoding/binary"
"net"
)
func main() {
conn, err := net.Dial("tcp", "127.0.0.1:3563")
if err != nil {
panic(err)
}
// Hello 消息(JSON 格式)
// 对应游戏服务器 Hello 消息结构体
data := []byte(`{
"Hello": {
"Name": "leaf"
}
}`)
// len + data
m := make([]byte, 2+len(data))
// 默认使用大端序
binary.BigEndian.PutUint16(m, uint16(len(data)))
copy(m[2:], data)
// 发送消息
conn.Write(m)
}
通过源码分析一波整个流程。
设置消息到消息处理器
Processor.Register(&Hello{})
//将信息结构体类型存入到消息处理器中,在game模块使用
func (p *Processor) Register(msg interface{}) string {
msgType := reflect.TypeOf(msg) //获取msg的类型
if msgType == nil || msgType.Kind() != reflect.Ptr {
log.Fatal("json message pointer required")
}
msgID := msgType.Elem().Name() //获取协议的名称
if msgID == "" {
log.Fatal("unnamed json message")
}
if _, ok := p.msgInfo[msgID]; ok {
log.Fatal("message %v is already registered", msgID)
}
i := new(MsgInfo) //创建一个MsgInfo对象,协议处理器其实就是一个以协议名称为key以MsgInfo为value的map集合
i.msgType = msgType //放入方法的类型
p.msgInfo[msgID] = i //协议名对应着协议类型
return msgID
}
设置路由信息
msg.Processor.SetRouter(&msg.Hello{}, game.ChanRPC)
//添加路由信息,在gate模块使用
func (p *Processor) SetRouter(msg interface{}, msgRouter *chanrpc.Server) {
msgType := reflect.TypeOf(msg) //获取协议的类型
if msgType == nil || msgType.Kind() != reflect.Ptr {
log.Fatal("json message pointer required")
}
msgID := msgType.Elem().Name() //获取协议的名称
i, ok := p.msgInfo[msgID] //从协议处理器中通过协议名获取msgInfo
if !ok {
log.Fatal("message %v not registered", msgID)
}
i.msgRouter = msgRouter //将路由协议添加到msgInfo中
}
设置协议对应的处理方法
handler(&msg.Hello{}, handleHello)
func (s *Skeleton) RegisterChanRPC(id interface{}, f interface{}) {
if s.ChanRPCServer == nil {
panic("invalid ChanRPCServer")
}
s.server.Register(id, f)
}
func (s *Server) Register(id interface{}, f interface{}) {
switch f.(type) { //判断方法的类型
case func([]interface{}):
case func([]interface{}) interface{}:
case func([]interface{}) []interface{}:
default:
panic(fmt.Sprintf("function id %v: definition of function is invalid", id))
}
if _, ok := s.functions[id]; ok {
panic(fmt.Sprintf("function id %v: already registered", id))
}
s.functions[id] = f //协议名对应的函数存入server的functions中
}
type Server struct {
functions map[interface{}]interface{} //保存了所有协议对应的函数
ChanCall chan *CallInfo
}
服务器接收客服端的链接
//src/github.com/name5566/leaf/network/tcp_server.go:62
func (server *TCPServer) run() { //负责客户端请求的链接
... //省略代码
for {
conn, err := server.ln.Accept()
... //省略代码
tcpConn := newTCPConn(conn, server.PendingWriteNum, server.msgParser)
agent := server.NewAgent(tcpConn)
...//省略代码
}
请求协议的接收
//src/github.com/name5566/leaf/gate/gate.go:93
func (a *agent) Run() {
for {
data, err := a.conn.ReadMsg() //读取客服端发送来的信息
if err != nil {
log.Debug("read message: %v", err)
break
}
if a.gate.Processor != nil {
msg, err := a.gate.Processor.Unmarshal(data)
if err != nil {
log.Debug("unmarshal message error: %v", err)
break
}
err = a.gate.Processor.Route(msg, a) //在gate模块中调用协议处理器
if err != nil {
log.Debug("route message error: %v", err)
break
}
}
}
}
//将消息路由到对应的模块
func (p *Processor) Route(msg interface{}, userData interface{}) error {
// raw
if msgRaw, ok := msg.(MsgRaw); ok {
i, ok := p.msgInfo[msgRaw.msgID]
if !ok {
return fmt.Errorf("message %v not registered", msgRaw.msgID)
}
if i.msgRawHandler != nil {
i.msgRawHandler([]interface{}{msgRaw.msgID, msgRaw.msgRawData, userData})
}
return nil
}
// json
msgType := reflect.TypeOf(msg) //获取协议类型
if msgType == nil || msgType.Kind() != reflect.Ptr {
return errors.New("json message pointer required")
}
msgID := msgType.Elem().Name() //协议名
i, ok := p.msgInfo[msgID] //通过协议名获取msgInfo,这个就是前面func (p *Processor) Register(msg interface{}) string {}存进去的
if !ok {
return fmt.Errorf("message %v not registered", msgID)
}
if i.msgHandler != nil {
i.msgHandler([]interface{}{msg, userData})
}
if i.msgRouter != nil { //msgInfo中的路由,这个是前面func (p *Processor) SetRouter(msg interface{}, msgRouter *chanrpc.Server) {}存进去的
i.msgRouter.Go(msgType, msg, userData)
}
return nil
}
//将协议和函数封装存入chan
func (s *Server) Go(id interface{}, args ...interface{}) {
f := s.functions[id] //通过协议名获取函数,这个是func (s *Skeleton) RegisterChanRPC(id interface{}, f interface{}) {}存进去的
if f == nil {
return
}
defer func() {
recover()
}()
s.ChanCall <- &CallInfo{ //将函数和参数封装成结构体存入到chan中,等待其他协程调用
f: f,
args: args,
}
}
方法的调用
//src/github.com/name5566/leaf/module/skeleton.go:45
func (s *Skeleton) Run(closeSig chan bool) {
for {
select {
case <-closeSig:
s.commandServer.Close()
s.server.Close()
for !s.g.Idle() || !s.client.Idle() {
s.g.Close()
s.client.Close()
}
return
case ri := <-s.client.ChanAsynRet:
s.client.Cb(ri)
case ci := <-s.server.ChanCall: //封装的协议和函数在这里拿出来处理
s.server.Exec(ci) //调用CallInfo结构体中函数的
case ci := <-s.commandServer.ChanCall:
s.commandServer.Exec(ci)
case cb := <-s.g.ChanCb:
s.g.Cb(cb)
case t := <-s.dispatcher.ChanTimer:
t.Cb()
}
}
}
上面这个函数最后就调用了handleHello()这个我们自己写的业务逻辑方法了,然后将返回值存入到chanRet中,等待其他协程消费返回给客户端。整个请求过程就结束了。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)