1 python使用Protobuf和golang使用protobuf+grpc

1 python使用Protobuf和golang使用protobuf+grpc,第1张

1 python使用Protobuf和golang使用protobuf
$ python -m pip install grpcio #安装grpc
$ python -m pip install grpcio-tools
syntax = "proto3";
message HelloRequest {
  string name = 1; // name表示名称,编号是1
}

生成protobuf的python文件

$ python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I . test.proto
# grpc_tools.protoc 未grpcio-tools的命令
# --python_out=. 生成的对应python文件路径
# --grpc_python_out=. 生成对应的grpc文件目录
# -I . 指定生成文件目录为当前目录
# test.proto protobuf文件

新建protobuf_test

from protobuf_test import hello_pb2

request = hello_pb2.HelloRequest()
request.name = "aaaaa"
res_str = request.SerializeToString()
print(res_str)

# 通过字符串反向生成对象
request2 = hello_pb2.HelloRequest()
request2.ParseFromString(res_str)
print(request2.name)
2 python使用grpc

新建grpc_proto包

syntax = "proto3";
service Greeter { // 服务名
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}
server.py
import sys

sys.path.append("C:\Users\Administrator\PycharmProjects\pythonProject\grpc_test\grpc_proto")
# 指定模块路径

from concurrent import futures
import grpc
from grpc_proto import helloworld_pb2, helloworld_pb2_grpc


class Greeter(helloworld_pb2_grpc.GreeterServicer):  # 继承服务名的类
    def SayHello(self, request, context):  # 重写方法 SayHello 是 HelloRequest作为参数,也就是request, context 上下文
        return helloworld_pb2.HelloReply(message=f"你好,{request.name}")  # 函数的逻辑由业务决定


if __name__ == "__main__":
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    # 设置服务器的最大线程数量
    helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
    # 注册服务
    server.add_insecure_port('[::]:50052')
    server.start()
    server.wait_for_termination()
client.py
import grpc
import sys

sys.path.append("C:\Users\Administrator\PycharmProjects\pythonProject\grpc_test\grpc_proto") # 指定模块路径

from grpc_test.grpc_proto import helloworld_pb2, helloworld_pb2_grpc

if __name__ == "__main__":
    with grpc.insecure_channel("localhost:50052") as channel:
        stub = helloworld_pb2_grpc.GreeterStub(channel)
        rsp = stub.SayHello(helloworld_pb2.HelloRequest(name="bobby"))
        # 对于HelloRequest函数的远程调用

        print(rsp.message)

需要把helloworld_pb2_grpc.py的导入源码修改一下

# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc

from . import helloworld_pb2 as helloworld__pb2
3 在golang里面使用protobuf和grpc
$ protoc -I . helloworld.proto --go_out=plugins=grpc:.
syntax = "proto3";
option go_package = "grpc_test/proto;proto";
// 设置包的名称 package
service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply);
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}

server code

package main

import (
	"context"
	"fmt"
	"google.golang.org/grpc"
	"mksz_469/16_grpc_protobuf/grpc_test/proto/grpc_test/proto"
	"net"
)

type Sever struct {}

func (s *Sever) SayHello(ctx context.Context,request *proto.HelloRequest) (*proto.HelloReply,error){

	return &proto.HelloReply{Message: "hello "+request.Name},nil
}
func main()  {
	g := grpc.NewServer()
	proto.RegisterGreeterServer(g,&Sever{})
	lis,err :=net.Listen("tcp",":8081")
	if err != nil {
		fmt.Println(err)
	}
	err = g.Serve(lis)
	if err != nil {
		return
	}
}

client code

package main

import (
	"context"
	"fmt"
	"google.golang.org/grpc"
	"mksz_469/16_grpc_protobuf/grpc_test/proto/grpc_test/proto"
)

func main() {
	conn,err := grpc.Dial("127.0.0.1:8081",grpc.WithInsecure())
	if err != nil {
		fmt.Println(err)
	}
	defer func(conn *grpc.ClientConn) {
		err := conn.Close()
		if err != nil {

		}
	}(conn)
	c := proto.NewGreeterClient(conn)
	r,err := c.SayHello(context.Background(),&proto.HelloRequest{Name: "aaaa"})
	if err != nil {
		fmt.Println(err)
	}
	fmt.Println(r.Message)
}

这样只需要在go中把端口改一下即可

如果出现

missing go.sum entry for module providing package <package_name>

go build -mod=mod 用该命令刷新一下mod就行

只要proto文件一致,两边随便调用都没问题

4 grpc的流

可以源源不断推送数据,比如ROS里面的订阅和发布,不停地由其推送数据,很适合传输一些大数据,或者服务器和客户端长时间数据交互

普通流,发请求给响应服务器端数据流客户端数据流双向数据流

流在protobuf里是个关键字

syntax = "proto3";
option go_package="./;proto";
service Greeter {
// 注册三种rpc的流方法
  rpc GetStream(StreamReqInfo) returns (stream StreamReqInfo); // 获取流
  rpc PutStream(stream StreamResInfo) returns (StreamReqInfo); // 输出流
  rpc SignalStream(stream StreamReqInfo) returns(stream StreamResInfo); // 双向流
}

message StreamReqInfo {
  string Info =1;
}

message StreamResInfo{
  string ResInfo = 1;
}
server端
package main

import (
	"fmt"
	"google.golang.org/grpc"
	proto "mksz_469/16_grpc_protobuf/strem_grpc/protoc"
	"net"
	"sync"
	"time"
)

type Server struct {}

func (s *Server)GetStream(req *proto.StreamReqInfo,res proto.Greeter_GetStreamServer) error {
	i := 0
	for true {
		i++
		// 往客户端发送数据流
		err := res.Send(&proto.StreamReqInfo{Info: fmt.Sprintf("%v", time.Now().String())})
		if err != nil {
			return err
		}
		if i>5 {
			break
		}
		time.Sleep(time.Second*1)
	}
	return nil
}
func (s *Server)PutStream(cliStr proto.Greeter_PutStreamServer) error {
	for {
		// 从客户端接收数据流
		if Info,err := cliStr.Recv(); err != nil {
			fmt.Println("client recv",err)
			break
		}else {
			fmt.Println(Info)
		}
	}
	return nil
}
func (s *Server)SignalStream(allStr proto.Greeter_SignalStreamServer) error {
	wg := sync.WaitGroup{}
	wg.Add(2) // 开启两个协程的等待退出
	go func() {
		defer wg.Done()
		for  {
			Info ,err := allStr.Recv()
			if err != nil {
				fmt.Println("客户端退出")
			}
			fmt.Println("收到客户端消息--->",Info.Info)

		}
	}()
	go func() {
		defer wg.Done()
		for {
			err := allStr.Send(&proto.StreamResInfo{ResInfo: "server ->" + fmt.Sprintf("%s", time.Now().String())})
			if err != nil {
				fmt.Println("客户端退出")
				break
			}
			time.Sleep(time.Second*1)
		}
	}()
	wg.Wait()
	return nil
}

func main() {
	g := grpc.NewServer()
	proto.RegisterGreeterServer(g,&Server{})
	lis,err :=net.Listen("tcp",":50051")
	if err != nil {
		fmt.Println(err)
	}
	err = g.Serve(lis)
	if err != nil {
		return
	}
}

client 端
package main

import (
	"context"
	"fmt"
	"google.golang.org/grpc"
	proto "mksz_469/16_grpc_protobuf/strem_grpc/protoc"
	"sync"
	"time"
)

func main() {
	conn,err := grpc.Dial("127.0.0.1:50051",grpc.WithInsecure())
	if err != nil {
		fmt.Println(err)
	}
	defer func(conn *grpc.ClientConn) {
		err := conn.Close()
		if err != nil {

		}
	}(conn)
	c := proto.NewGreeterClient(conn)
	res ,_:= c.GetStream(context.Background(),&proto.StreamReqInfo{Info: "打我呀"})
	for true {
		a ,err := res.Recv()
		if err != nil {
			break
		}
		fmt.Println("recv = ",a.Info)

	}

	// 客户端流模式
	puts ,_ := c.PutStream(context.Background())
	i :=0
	for  {
		i ++
		puts.Send(&proto.StreamResInfo{ResInfo: fmt.Sprintf("client %s",time.Now().String())})
		if i > 5 {
			break
		}
		time.Sleep(time.Second*1)
	}

	// 双向流模式
	allStr ,_:= c.SignalStream(context.Background()) // 拿到双向流的通道 Greeter_SignalStreamClient
	wg := sync.WaitGroup{}
	wg.Add(2) // 开启两个协程的等待
	go func() {
		defer wg.Done()
		for  {
			Info ,err := allStr.Recv()
			if err != nil {
				fmt.Println("服务器端断开连接")
				break
			}
			fmt.Println("收到服务器端消息--->",Info.ResInfo)

		}
	}()
	go func() {
		defer wg.Done()
		for {
			err := allStr.Send(&proto.StreamReqInfo{Info: "client ->" + fmt.Sprintf("%s", time.Now().String())})
			if err != nil {
				fmt.Println("服务器端断开连接")
				break
			}
			time.Sleep(time.Second*1)
		}
	}()
	wg.Wait()
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/994248.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存