如果对 grpc 还不太了解的,可以看看我的这栏文章https://blog.csdn.net/wanmei002/category_11067794.html
因为 服务发现和服务注册用到了 etcd , 但是最新的 grpc 跟 etcd 不兼容,
所以 protoc-gen-go 跟 grpc 的版本要降级
go get -u github.com/golang/protobuf/protoc-gen-go@v1.3.2
go get google.golang.org/grpc@v1.29.1
先贴以下我的 go.mod
require (
github.com/golang/protobuf v1.4.2
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/tal-tech/go-zero v1.1.4
google.golang.org/grpc v1.29.1
)
简单初始化 一个 grpc server
建一个文件,进入 初始化: go mod init *****以下代码来自 go-zero 文档, 地址: https://github.com/tal-tech/zero-doc/blob/main/doc/shorturl.md
goctl rpc template -o transform.proto
goctl rpc proto -src transform.proto -dir .
go get github.com/golang/protobuf@v1.4.2
go get google.golang.org/grpc@v1.29.1
go get
获得所有的依赖查看下 etc/transform.yaml 文件里的etcd 地址:端口 是否正确go run transform.go 运行起来程序
rpc 服务之间的调用
修改配置文件
比如我们又建了一个服务 AAA,要调用 transform 服务
修改 AAA 服务的配置文件 修改在 etc/ 目录下的 yaml 文件,在其中添加 transform 服务的 etcd 信息,添加内容如下:Transform: // 名字可以自己取
Etcd:
Hosts:
- 127.0.0.1:2379 // transform 服务注册的 etcd
Key: transform.rpc // transform 服务注册的 服务名
在 AAA 服务目录下的 internal/svc/servicecontext.go 文件中添加 transform 客户端连接
type ServiceContext struct {
Config config.Config
Transform transformer.Transformer // Transformer服务 客户端接口 位置一般是: transform/transformer/transformer.go
}
创建 transform 服务连接,也是在 servicecontext.go 文件中
func NewServiceContext(c config.Config) *ServiceContext {
return &ServiceContext{
Config: c,
// 创建 transform 客户端连接
Transform: transformer.NewTransformer(zrpc.MustNewClient(c.Transform)),// c.Transform 是在 yaml 中配置的 transform 服务的配置信息
}
}
逻辑一般都在项目下的 internal/logic 文件夹下, 在 rpc 方法中,在其中调用 其它 gRPC 项目的方法可以
用 l.svcCtx.Transform 对象里的方法
// 第二个参数的值 会赋值给 s 的register属性,然后 调用 s.Start() 方法, 注册服务
s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {
transform.RegisterTransformerServer(grpcServer, srv)
})
让我们看看新建的时候都做了什么
第一步
func MustNewServer(c RpcServerConf, register internal.RegisterFn) *RpcServer {
server, err := NewServer(c, register)// 这个是第二步
if err != nil {
log.Fatal(err)
}
return server
}
第二步
func NewServer(c RpcServerConf, register internal.RegisterFn) (*RpcServer, error) {
var err error
// 如果要 Auth 认证,需要配置 redis ,这里检查是否配置了 redis
if err = c.Validate(); err != nil {
return nil, err
}
var server internal.Server
metrics := stat.NewMetrics(c.ListenOn)
if c.HasEtcd() {
// 检查 配置的 ip
listenOn := figureOutListenOn(c.ListenOn)
// 重要的一步 连接etcd 并注册服务,并保持续期 这是第三步
server, err = internal.NewRpcPubServer(c.Etcd.Hosts, c.Etcd.Key, listenOn, internal.WithMetrics(metrics))
if err != nil {
return nil, err
}
} else {
server = internal.NewRpcServer(c.ListenOn, internal.WithMetrics(metrics))
}
server.SetName(c.Name)
if err = setupInterceptors(server, c, metrics); err != nil {
return nil, err
}
rpcServer := &RpcServer{
server: server,
register: register,
}
if err = c.SetUp(); err != nil {
return nil, err
}
return rpcServer, nil
}
第三步
server, err = internal.NewRpcPubServer(c.Etcd.Hosts, c.Etcd.Key, listenOn, internal.WithMetrics(metrics))
上面的 NewRpcPubServer 方法里的代码
func NewRpcPubServer(etcdEndpoints []string, etcdKey, listenOn string, opts ...ServerOption) (Server, error) {
registerEtcd := func() error {
// 连接 etcd 并设置租约
pubClient := discov.NewPublisher(etcdEndpoints, etcdKey, listenOn)
return pubClient.KeepAlive()
}
server := keepAliveServer{
registerEtcd: registerEtcd,
// 这是第四步
Server: NewRpcServer(listenOn, opts...),
}
return server, nil
}
第四步
func NewRpcServer(address string, opts ...ServerOption) Server {
var options rpcServerOptions
for _, opt := range opts {
opt(&options)
}
if options.metrics == nil {
options.metrics = stat.NewMetrics(address)
}
// 这个 server 是比较重要的 他有个方法
return &rpcServer{
baseRpcServer: newBaseRpcServer(address, options.metrics),
}
}
rpcServer 的 Start 方法, 这个方法就是注册grpc 服务 和 注册拦截器
func (s *rpcServer) Start(register RegisterFn) error {
lis, err := net.Listen("tcp", s.address)
if err != nil {
return err
}
unaryInterceptors := []grpc.UnaryServerInterceptor{
serverinterceptors.UnaryTracingInterceptor(s.name),
serverinterceptors.UnaryCrashInterceptor(),
serverinterceptors.UnaryStatInterceptor(s.metrics),
serverinterceptors.UnaryPrometheusInterceptor(),
}
unaryInterceptors = append(unaryInterceptors, s.unaryInterceptors...)
streamInterceptors := []grpc.StreamServerInterceptor{
serverinterceptors.StreamCrashInterceptor,
}
streamInterceptors = append(streamInterceptors, s.streamInterceptors...)
options := append(s.options, WithUnaryServerInterceptors(unaryInterceptors...),
WithStreamServerInterceptors(streamInterceptors...))
server := grpc.NewServer(options...)
register(server)
// we need to make sure all others are wrapped up
// so we do graceful stop at shutdown phase instead of wrap up phase
waitForCalled := proc.AddWrapUpListener(func() {
server.GracefulStop()
})
defer waitForCalled()
return server.Serve(lis)
}
第五步 启动起来
func (rs *RpcServer) Start() {
if err := rs.server.Start(rs.register); err != nil {
logx.Error(err)
panic(err)
}
}
就这样 grpc 服务就跑起来了
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)