$ python -m pip install grpcio #安装grpc
$ python -m pip install grpcio-tools
syntax = "proto3";
message HelloRequest {
string name = 1; // name表示名称,编号是1
}
生成protobuf的python文件
$ python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I . test.proto
# grpc_tools.protoc 未grpcio-tools的命令
# --python_out=. 生成的对应python文件路径
# --grpc_python_out=. 生成对应的grpc文件目录
# -I . 指定生成文件目录为当前目录
# test.proto protobuf文件
新建protobuf_test
包
from protobuf_test import hello_pb2
request = hello_pb2.HelloRequest()
request.name = "aaaaa"
res_str = request.SerializeToString()
print(res_str)
# 通过字符串反向生成对象
request2 = hello_pb2.HelloRequest()
request2.ParseFromString(res_str)
print(request2.name)
2 python使用grpc
新建grpc_proto包
syntax = "proto3";
service Greeter { // 服务名
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
server.py
import sys
sys.path.append("C:\Users\Administrator\PycharmProjects\pythonProject\grpc_test\grpc_proto")
# 指定模块路径
from concurrent import futures
import grpc
from grpc_proto import helloworld_pb2, helloworld_pb2_grpc
class Greeter(helloworld_pb2_grpc.GreeterServicer): # 继承服务名的类
def SayHello(self, request, context): # 重写方法 SayHello 是 HelloRequest作为参数,也就是request, context 上下文
return helloworld_pb2.HelloReply(message=f"你好,{request.name}") # 函数的逻辑由业务决定
if __name__ == "__main__":
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
# 设置服务器的最大线程数量
helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
# 注册服务
server.add_insecure_port('[::]:50052')
server.start()
server.wait_for_termination()
client.py
import grpc
import sys
sys.path.append("C:\Users\Administrator\PycharmProjects\pythonProject\grpc_test\grpc_proto") # 指定模块路径
from grpc_test.grpc_proto import helloworld_pb2, helloworld_pb2_grpc
if __name__ == "__main__":
with grpc.insecure_channel("localhost:50052") as channel:
stub = helloworld_pb2_grpc.GreeterStub(channel)
rsp = stub.SayHello(helloworld_pb2.HelloRequest(name="bobby"))
# 对于HelloRequest函数的远程调用
print(rsp.message)
需要把helloworld_pb2_grpc.py
的导入源码修改一下
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
from . import helloworld_pb2 as helloworld__pb2
3 在golang里面使用protobuf和grpc
$ protoc -I . helloworld.proto --go_out=plugins=grpc:.
syntax = "proto3";
option go_package = "grpc_test/proto;proto";
// 设置包的名称 package
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply);
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
server code
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"mksz_469/16_grpc_protobuf/grpc_test/proto/grpc_test/proto"
"net"
)
type Sever struct {}
func (s *Sever) SayHello(ctx context.Context,request *proto.HelloRequest) (*proto.HelloReply,error){
return &proto.HelloReply{Message: "hello "+request.Name},nil
}
func main() {
g := grpc.NewServer()
proto.RegisterGreeterServer(g,&Sever{})
lis,err :=net.Listen("tcp",":8081")
if err != nil {
fmt.Println(err)
}
err = g.Serve(lis)
if err != nil {
return
}
}
client code
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"mksz_469/16_grpc_protobuf/grpc_test/proto/grpc_test/proto"
)
func main() {
conn,err := grpc.Dial("127.0.0.1:8081",grpc.WithInsecure())
if err != nil {
fmt.Println(err)
}
defer func(conn *grpc.ClientConn) {
err := conn.Close()
if err != nil {
}
}(conn)
c := proto.NewGreeterClient(conn)
r,err := c.SayHello(context.Background(),&proto.HelloRequest{Name: "aaaa"})
if err != nil {
fmt.Println(err)
}
fmt.Println(r.Message)
}
这样只需要在go中把端口改一下即可
如果出现
missing go.sum entry for module providing package <package_name>
go build -mod=mod 用该命令刷新一下mod就行
只要proto文件一致,两边随便调用都没问题
4 grpc的流可以源源不断推送数据,比如ROS里面的订阅和发布,不停地由其推送数据,很适合传输一些大数据,或者服务器和客户端长时间数据交互
普通流,发请求给响应服务器端数据流客户端数据流双向数据流流在protobuf里是个关键字
syntax = "proto3";
option go_package="./;proto";
service Greeter {
// 注册三种rpc的流方法
rpc GetStream(StreamReqInfo) returns (stream StreamReqInfo); // 获取流
rpc PutStream(stream StreamResInfo) returns (StreamReqInfo); // 输出流
rpc SignalStream(stream StreamReqInfo) returns(stream StreamResInfo); // 双向流
}
message StreamReqInfo {
string Info =1;
}
message StreamResInfo{
string ResInfo = 1;
}
server端
package main
import (
"fmt"
"google.golang.org/grpc"
proto "mksz_469/16_grpc_protobuf/strem_grpc/protoc"
"net"
"sync"
"time"
)
type Server struct {}
func (s *Server)GetStream(req *proto.StreamReqInfo,res proto.Greeter_GetStreamServer) error {
i := 0
for true {
i++
// 往客户端发送数据流
err := res.Send(&proto.StreamReqInfo{Info: fmt.Sprintf("%v", time.Now().String())})
if err != nil {
return err
}
if i>5 {
break
}
time.Sleep(time.Second*1)
}
return nil
}
func (s *Server)PutStream(cliStr proto.Greeter_PutStreamServer) error {
for {
// 从客户端接收数据流
if Info,err := cliStr.Recv(); err != nil {
fmt.Println("client recv",err)
break
}else {
fmt.Println(Info)
}
}
return nil
}
func (s *Server)SignalStream(allStr proto.Greeter_SignalStreamServer) error {
wg := sync.WaitGroup{}
wg.Add(2) // 开启两个协程的等待退出
go func() {
defer wg.Done()
for {
Info ,err := allStr.Recv()
if err != nil {
fmt.Println("客户端退出")
}
fmt.Println("收到客户端消息--->",Info.Info)
}
}()
go func() {
defer wg.Done()
for {
err := allStr.Send(&proto.StreamResInfo{ResInfo: "server ->" + fmt.Sprintf("%s", time.Now().String())})
if err != nil {
fmt.Println("客户端退出")
break
}
time.Sleep(time.Second*1)
}
}()
wg.Wait()
return nil
}
func main() {
g := grpc.NewServer()
proto.RegisterGreeterServer(g,&Server{})
lis,err :=net.Listen("tcp",":50051")
if err != nil {
fmt.Println(err)
}
err = g.Serve(lis)
if err != nil {
return
}
}
client 端
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
proto "mksz_469/16_grpc_protobuf/strem_grpc/protoc"
"sync"
"time"
)
func main() {
conn,err := grpc.Dial("127.0.0.1:50051",grpc.WithInsecure())
if err != nil {
fmt.Println(err)
}
defer func(conn *grpc.ClientConn) {
err := conn.Close()
if err != nil {
}
}(conn)
c := proto.NewGreeterClient(conn)
res ,_:= c.GetStream(context.Background(),&proto.StreamReqInfo{Info: "打我呀"})
for true {
a ,err := res.Recv()
if err != nil {
break
}
fmt.Println("recv = ",a.Info)
}
// 客户端流模式
puts ,_ := c.PutStream(context.Background())
i :=0
for {
i ++
puts.Send(&proto.StreamResInfo{ResInfo: fmt.Sprintf("client %s",time.Now().String())})
if i > 5 {
break
}
time.Sleep(time.Second*1)
}
// 双向流模式
allStr ,_:= c.SignalStream(context.Background()) // 拿到双向流的通道 Greeter_SignalStreamClient
wg := sync.WaitGroup{}
wg.Add(2) // 开启两个协程的等待
go func() {
defer wg.Done()
for {
Info ,err := allStr.Recv()
if err != nil {
fmt.Println("服务器端断开连接")
break
}
fmt.Println("收到服务器端消息--->",Info.ResInfo)
}
}()
go func() {
defer wg.Done()
for {
err := allStr.Send(&proto.StreamReqInfo{Info: "client ->" + fmt.Sprintf("%s", time.Now().String())})
if err != nil {
fmt.Println("服务器端断开连接")
break
}
time.Sleep(time.Second*1)
}
}()
wg.Wait()
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)