码字不易,转载请附原链,搬砖繁忙回复不及时见谅,技术交流请加QQ群:909211071,或关注公众号:程序猿AirGo
本文源码已上传至 Github:GitHub - why444216978/grpc-cmux: Serve grpc and http on the same port by cmux
公众号原文:连接多路复用,同一个端口同时提供 HTTP 和 gRPC 服务
gRPC-Gateway关于 gRPC 和 grpc-gateway 的介绍使用方法不再过多赘述,我的另一篇博客和 gRPC-Gateway 官方文档已经介绍的很全面了:
初探gRPC
Adding gRPC-Gateway annotations to an existing proto file
有两个问题可能大家都会遇到,这里简单提一下,一个是因为缺少 grpc-gateway 提供的 proto 包,需要我们下载对应包并复制到项目根目录中:
google/api/annotations.proto: File not found.
helloworld/hello_world.proto:6:1: Import "google/api/annotations.proto" was not found or had errors.
go get github.com/grpc-ecosystem/grpc-gateway
cp -rf $GOPATH/pkg/mod/github.com/grpc-ecosystem/grpc-gateway@v1.16.0/third_party/googleapis/google ./
另一个问题是因为我们引入的 annotations.proto 文件又会引入 protobuf 包的 descriptor.proto 文件,所以我们需要复制我们上面下载的 protoc 源码中的 protobuf 目录到项目根目录的 googole 目录下:
google/protobuf/descriptor.proto: File not found.
google/api/annotations.proto:20:1: Import "google/protobuf/descriptor.proto" was not found or had errors.
google/api/annotations.proto:28:8: "google.protobuf.MethodOptions" is not defined.
cp -rf /user_dir/protoc/include/google/protobuf ./google
封装多路复用服务
package server
import (
"context"
"fmt"
"net"
"net/http"
"time"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/soheilhy/cmux"
"google.golang.org/grpc"
"google.golang.org/protobuf/encoding/protojson"
)
type Server struct {
endpoint string
HTTPListener net.Listener
GRPCListener net.Listener
httpServer *http.Server
router *http.ServeMux
GRPClientConn *grpc.ClientConn
registerHTTP registerFunc
registerGRPC registerFunc
ServerMux *runtime.ServeMux
tcpMux cmux.CMux
}
type registerFunc func(ctx context.Context, s *Server)
type Option func(*Server)
func WithEndpoint(endpoint string) Option {
return func(s *Server) { s.endpoint = endpoint }
}
func WithHTTPregisterFunc(registerHTTP registerFunc) Option {
return func(s *Server) { s.registerHTTP = registerHTTP }
}
func WithGRPCregisterFunc(registerGRPC registerFunc) Option {
return func(s *Server) { s.registerGRPC = registerGRPC }
}
// New returns a Server instance
func New(opts ...Option) *Server {
s := &Server{}
for _, o := range opts {
o(s)
}
listener, err := net.Listen("tcp", s.endpoint)
if err != nil {
panic(err)
}
s.tcpMux = cmux.New(listener)
return s
}
func (s *Server) Start() error {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
s.GRPCListener = s.tcpMux.MatchWithWriters(cmux.HTTP2MatchHeaderFieldPrefixSendSettings("content-type", "application/grpc"))
s.HTTPListener = s.tcpMux.Match(cmux.HTTP1Fast())
go func() {
s.registerGRPC(ctx, s)
}()
go func() {
if err := s.initGateway(ctx); err != nil {
panic(err)
}
s.registerHTTP(ctx, s)
s.startGateway()
}()
return s.tcpMux.Serve()
}
func (s *Server) Stop() {
s.tcpMux.Close()
}
func (s *Server) initGateway(ctx context.Context) error {
var err error
s.router = http.NewServeMux()
s.GRPClientConn, err = grpc.Dial(s.endpoint, []grpc.DialOption{
grpc.WithTimeout(10 * time.Second),
grpc.WithBlock(),
grpc.WithInsecure(),
}...)
if err != nil {
return fmt.Errorf("Fail to dial: %v", err)
}
s.ServerMux = runtime.NewServeMux(
runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.JSONPb{
MarshalOptions: protojson.MarshalOptions{
UseProtoNames: true,
UseEnumNumbers: true,
},
}),
runtime.WithErrorHandler(runtime.DefaultHTTPErrorHandler),
)
return nil
}
func (s *Server) startGateway() {
s.router.Handle("/", s.ServerMux)
s.httpServer = &http.Server{
Addr: s.endpoint,
Handler: s.router,
ReadTimeout: time.Second,
WriteTimeout: time.Second,
IdleTimeout: time.Second,
}
if err := s.httpServer.Serve(s.HTTPListener); err != nil {
panic(err)
}
}
这里说明几点,帮助大家理解服务运行逻辑:
定义 registerFunc 方法,通过 Options 将自己的 gRPC 服务注册方法在初始化的时候传入,来达到与具体服务解耦的目的。注册 gRPC 和 HTTP 服务的 Matcher注册具体的 gRPC 服务并启动服务初始化 gRPC-Gateway 并启动服务启动 cmux 服务下面是一个简单的使用例子:
package main
import (
"context"
"fmt"
"log"
"sync"
"time"
pb "helloworld/helloworld"
"helloworld/server"
"google.golang.org/grpc"
)
const (
endpoint = ":8888"
)
type Server struct {
*server.Server
pb.UnimplementedGreeterServer
}
func (s *Server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
return &pb.HelloReply{Message: in.Name + " world"}, nil
}
func registerHTTP(ctx context.Context, s *server.Server) {
if err := pb.RegisterGreeterHandler(ctx, s.ServerMux, s.GRPClientConn); err != nil {
panic(err)
}
}
func registerGRPC(ctx context.Context, s *server.Server) {
grpcServer := grpc.NewServer()
pb.RegisterGreeterServer(grpcServer, new(Server))
if err := grpcServer.Serve(s.GRPCListener); err != nil {
panic(err)
}
}
func main() {
s := server.New(
server.WithEndpoint(endpoint),
server.WithGRPCregisterFunc(registerGRPC),
server.WithHTTPregisterFunc(registerHTTP))
var wg sync.WaitGroup
wg.Add(1)
go func() {
panic(s.Start())
wg.Done()
}()
cc, err := newClientConn(endpoint)
if err != nil {
log.Fatal(err)
}
ticker := time.NewTicker(time.Second)
for range ticker.C {
client(cc)
}
wg.Wait()
}
func client(cc *grpc.ClientConn) {
client := pb.NewGreeterClient(cc)
reply, err := client.SayHello(context.Background(), &pb.HelloRequest{Name: "why"})
if err != nil {
log.Fatal(err)
}
fmt.Println(reply)
}
func newClientConn(target string) (*grpc.ClientConn, error) {
cc, err := grpc.Dial(
target,
grpc.WithInsecure(),
)
if err != nil {
return nil, err
}
return cc, nil
}
[why@MacBook-Pro] ~/Desktop/go/grpc-cmux$go run main.go
message:"why world"
message:"why world"
message:"why world"
......
[why@MacBook-Pro] ~/Desktop/go/grpc-cmux$curl http://localhost:8888/v1/example/echo
{"message":" world"}
cmux 源码分析
整体思路:通过实现不同的 net.listener 方法,自定义各自的请求过滤规则,并用相应协议的服务监听对应 listener 来接收 TCP 字节流,达到在同一端口提供不同服务的效果。
cMux 结构体:
type matchersListener struct {
ss []MatchWriter //匹配规则列表
l muxListener //伪Listener
}
type cMux struct {
root net.Listener //真实Listener
bufLen int //连接队列大小,1024
errh ErrorHandler //错误处理方法
sls []matchersListener //自定义"Listener"列表
readTimeout time.Duration //读超时时间,默认不限制
donec chan struct{} //连接关闭chan
mu sync.Mutex //关闭cmux服务时需要加索,因为既可以主动触发也可以被动触发,避免重复close导致panic
}
MatcherWriter 匹配流量:
HTTP1Fast 和 TLS 使用前缀树(patricia Tree)匹配HTTP1 通过硬解析 request 对比 HTTP 版本Header 相关方法通过字符串匹配核心处理逻辑:
注册多个 Matcher,根据 Header、HTTP版本等匹配并接管流量(matchers)cmux.root.Accept 接收所有流量双层迭代,外层按照注册 listener 的先后顺序,内层按照 MatchWriter 参数的先后顺序,依次匹配注册的 Matcher匹配成功后,用对应 matchersListener 的 connc 接收连接,执行对应协议的后续处理逻辑匹配 matchersListener 核心源码:
func (m *cMux) serve(c net.Conn, donec <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
muc := newMuxConn(c)
if m.readTimeout > noTimeout {
_ = c.SetReadDeadline(time.Now().Add(m.readTimeout))
}
//双层迭代,外层按照注册 listener 的先后顺序,内层按照 MatchWriter 参数的先后顺序,依次匹配注册的 Matcher
for _, sl := range m.sls {
for _, s := range sl.ss {
matched := s(muc.Conn, muc.startSniffing())
if matched {
muc.doneSniffing()
if m.readTimeout > noTimeout {
_ = c.SetReadDeadline(time.Time{})
}
select {
case sl.l.connc <- muc:
case <-donec:
_ = c.Close()
}
return
}
}
}
_ = c.Close()
err := ErrNotMatched{c: c}
if !m.handleErr(err) {
_ = m.root.Close()
}
}
流量分配到对应协议源码:
func (l muxListener) Accept() (net.Conn, error) {
select {
case c, ok := <-l.connc:
if !ok {
return nil, ErrListenerClosed
}
return c, nil
case <-l.donec:
return nil, ErrServerClosed
}
}
可以加个断点验证一下流量转发的逻辑,在 cmux.go 第 264 行加个断电,然后启动调试,可以看到流量被转发到了 gRPC.(*Server).Serve 上 :
参考:Golang: Run multiple services on one port - Dgraph Blog
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)