多语言通信基础 06 go实现grpc的四种数据流模式实现

多语言通信基础 06 go实现grpc的四种数据流模式实现,第1张

grpc的数据传输模式有多种:

简单模式:客户端发起一次请求,服务端响应一次数据,和普通的rpc没有区别。服务端数据流模式:客户端发起一次请求,服务端返回一段连续的数据流。典型的例子是客户端发给服务端一个股票代码,服务端将该股票的数据实时不断的返回给客户端。还有我们常用的订阅场景也属于服务端流模式。客户端数据流模式。与服务端数据流模式相反,由客户端源源不断的像服务端发送数据流,发送结束后,由服务端返回一个响应。典型的例子是物联网终端像服务器报送数据。双向数据流模式:客户端和服务端可以向双方实时发送数据流进行交互,典型应用是聊天机器人,即时通信工具等。 propto文件

各种模式的proto文件内容如下:

syntax = "proto3";

option go_package = ".;proto";  // 指明当前目录

service Greeter {
  rpc GetStream(StreamReqData)returns(stream StreamResData);  //服务端流模式
  rpc PutStream(stream StreamReqData)returns(StreamResData);  //客户端流模式
  rpc Al1Stream(stream StreamReqData)returns(stream StreamResData); //双向流模式
}

message StreamReqData {
  string data = 1;
}
message StreamResData {
  string data = 1;
}

serve服务端程序

服务端必须实现proto中的三个函数。

package main

import (
	"OldPackageTest/stream_grpc_test/proto"
	"fmt"
	"google.golang.org/grpc"
	"net"
	"sync"
	"time"
)

const PORT = ":50052"

type server struct {
}

// 服务端流模式
func (s *server) GetStream(req *proto.StreamReqData, res proto.Greeter_GetStreamServer) error {
	i := 0
	for {
		i++
		_ = res.Send(&proto.StreamResData{
			Data: fmt.Sprintf("%v", time.Now().Unix()),
		})
		time.Sleep(time.Second)
		if i > 10 {
			break
		}
	}

	return nil
}



// 客户端流模式
func (s *server) PutStream(cliStr proto.Greeter_PutStreamServer) error {
	for {
		if a, err := cliStr.Recv(); err != nil {
			fmt.Println(err)
			break
		} else {
			fmt.Println(a.Data)
		}
	}

	return nil
}


// 双向流模式
func (s *server) AllStream(allStr proto.Greeter_AllStreamServer) error {
	wg := sync.WaitGroup{}
	wg.Add(2)
	go func() {
		defer wg.Done()
		for {
			data, _ := allStr.Recv()
			fmt.Println("收到客户端消息:" + data.Data)
		}
	}()

	go func() {
		defer wg.Done()
		for {
			_ = allStr.Send(&proto.StreamResData{Data: "我是服务器"})
			time.Sleep(time.Second)
		}
	}()

	wg.Wait()
	return nil
}

func main() {
	lis, err := net.Listen("tcp", PORT)
	if err != nil {
		panic(err)
	}
	s := grpc.NewServer()
	proto.RegisterGreeterServer(s, &server{})
	err = s.Serve(lis)
	if err != nil {
		panic(err)
	}
}
client客户端程序
package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"google.golang.org/grpc"

	"OldPackageTest/stream_grpc_test/proto"
)

func main() {
	conn, err := grpc.Dial("localhost:50052", grpc.WithInsecure())
	if err != nil {
		panic(err)
	}
	defer conn.Close()
	//服务端流模式
	c := proto.NewGreeterClient(conn)
	res, _ := c.GetStream(context.Background(), &proto.StreamReqData{Data: "慕课网"})
	for {
		a, err := res.Recv() //如果大家懂socket编程的话就明白 send recv
		if err != nil {
			fmt.Println(err)
			break
		}
		fmt.Println(a.Data)
	}

	//客户端流模式
	putS, _ := c.PutStream(context.Background())
	i := 0
	for {
		i++
		_ = putS.Send(&proto.StreamReqData{
			Data: fmt.Sprintf("慕课网%d", i),
		})
		time.Sleep(time.Second)
		if i > 10 {
			break
		}
	}

	//双向流模式
	allStr, _ := c.AllStream(context.Background())
	wg := sync.WaitGroup{}
	wg.Add(2)
	go func() {
		defer wg.Done()
		for {
			data, _ := allStr.Recv()
			fmt.Println("收到客户端消息:" + data.Data)
		}
	}()

	//1. 集中学习protobuf, grpc

	go func() {
		defer wg.Done()
		for {
			_ = allStr.Send(&proto.StreamReqData{Data: "慕课网"})
			time.Sleep(time.Second)
		}
	}()

	wg.Wait()
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/994201.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存