为您的gRPC服务创建一个 全局
context变量。因此,遍历各个部分:
- 每个gRPC服务请求都将使用此上下文(以及客户端上下文)来满足该请求
os.Interrupt
处理程序将取消全局上下文;从而取消任何当前正在运行的请求- 最终问题
server.GracefulStop()
-应该等待所有活动的gRPC调用完成(如果他们没有立即看到取消 *** 作)
因此,例如,在设置gRPC服务时:
pctx := context.Background()globalCtx, globalCancel := context.WithCancel(pctx)mysrv := MyService{ gctx: globalCtx}s := grpc.NewServer()pb.RegisterMyService(s, mysrv)
os.Interrupt处理程序启动并等待关闭:
globalCancel()server.GracefulStop()
gRPC方法:
func(s *MyService) SomeRpcMethod(ctx context.Context, req *pb.Request) error { // merge client and server contexts into one `mctx` // (client context will cancel if client disconnects) // (server context will cancel if service Ctrl-C'ed) mctx, mcancel := mergeContext(ctx, s.gctx) defer mcancel() // so we don't leak, if neither client or server context cancels // RPC WORK GOES HERE // RPC WORK GOES HERE // RPC WORK GOES HERE // pass mctx to any blocking calls: // - http REST calls // - SQL queries etc. // - or if running a long loop; status check the context occasionally like so: // Example long request (10s) for i:=0; i<10*1000; i++ { time.Sleep(1*time.Milliscond) // poll merged context select { case <-mctx.Done(): return fmt.Errorf("request canceled: %s", mctx.Err()) default: } }}
和:
func mergeContext(a, b context.Context) (context.Context, context.CancelFunc) { mctx, mcancel := context.WithCancel(a) // will cancel if `a` cancels go func() { select { case <-mctx.Done(): // don't leak go-routine on clean gRPC run case <-b.Done(): mcancel() // b canceled, so cancel mctx } }() return mctx, mcancel}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)