当我们通过 grpc 进行客户端服务端通信时,健全的代码应该尽可能的对不同的异常情况进行捕获并做出相应的处理。对于 grpc的话,我们可以通过 try-catch
的形式来进行捕获.
例如像是下面这样子:
应用层可以就像下面这样子进行书写,当然这种感觉不是特别好,具体可以参考本文后面的内容。
try:
response = stub.SayHello(...)
except grpc.RpcError as rpc_error:
if rpc_error.code() == grpc.StatusCode.CANCELLED:
pass
elif rpc_error.code() == grpc.StatusCode.UNAVAILABLE:
pass
else:
print(f"Received unknown RPC error: code={rpc_error.code()} message={rpc_error.details()}")
这里我们看下它示例代码当中的 RpcError
,继承的父类为一个空的类,这里就不展示了。需要注意的是,每一个抛出的异常都有其对应的错误状态码,具体内容在 grpc/__init.py__ 当中的 StatusCode 接口。
将上面介绍内容的代码进行补充之后,就类似于下面这段:
stub = exercise_pb2_grpc.ServerStub(channel)
try:
guide_get_one_feature(
stub, exercise_pb2.Point(latitude=409146138,
longitude=-746188906))
except grpc.RpcError as rpc_error:
raise rpc_error
当我们获取到了 stub 之后,调用对应的方法来对服务器的函数进行调用。此时就需要对这个 *** 作进行异常的捕获,防止服务器将逻辑上的异常进行返回。
客户端的应用层进行异常的捕获这里我书写了一个 Mixin的类,用来将返回的异常对象类型进行检查,并进行序列化的返回。
class ProcessExcMixin(object):
@classmethod
def process_exc(cls, rpc_error: grpc.RpcError):
"""
对异常进行捕获并序列化返回
:param rpc_error: 捕获的异常
:return: 返回的是一个异常信息的字典
:rtype: dict
:raise: 对非 QuotaFailure 异常对象进行抛出
"""
status = rpc_status.from_call(rpc_error)
status_dict = protobuf_to_dict(status)
# 取出所有的 details, 并判断是否为对应的 QuotaFailure 对象
for detail in status.details:
# 下面这句话即:
# detail.TypeName() == QuotaFailure.DESCRIPTOR
if detail.Is(error_details_pb2.QuotaFailure.DESCRIPTOR):
detail_info = error_details_pb2.QuotaFailure()
# 取出抛出异常的具体信息
detail.Unpack(detail_info)
info = {k: status_dict[k]
for k in status_dict.keys()
if k in ['code', 'message']}
info.update(protobuf_to_dict(detail_info))
return info
else:
# 对其它特殊异常进行抛出
raise RuntimeError('Unexpected failure: %s'
% detail)
当返回的异常对象不是 grpc 模块当中定义的 异常 message,就会将异常进行抛出。
服务器将异常进行返回光写客户端代码了,这里重要的还是服务端将异常信息给抛回去:
返回一个异常对象的方法def create_greet_limit_exceed_error_status(name):
detail = any_pb2.Any()
detail.Pack(
error_details_pb2.QuotaFailure(violations=[
error_details_pb2.QuotaFailure.Violation(
subject="name: %s" % name,
description="Limit one greeting per person",
)
], ))
return status_pb2.Status(
code=code_pb2.RESOURCE_EXHAUSTED,
message='Request limit exceeded.',
details=[detail],
)
服务器接口的实现
class LimitedGreeter(helloworld_pb2_grpc.GreeterServicer):
def __init__(self):
self._lock = threading.RLock()
self._greeted = set()
def SayHello(self, request, context:_Context):
with self._lock:
if request.name in self._greeted:
rich_status = \
create_greet_limit_exceed_error_status(
request.name)
# 将指定的异常状态码添加到 context 当中,
#to_status 的作用是将google.rpc.status.Status类型的 msg
# 转化为对应的 grpc.Status 的 msg
context.abort_with_status(
rpc_status.to_status(rich_status))
else:
print('request_name:', request.name)
self._greeted.add(request.name)
return helloworld_pb2.HelloReply(message='Hello, %s!'
% request.name)
服务端启动代码
def create_server(server_address):
server = grpc.server(futures.ThreadPoolExecutor())
helloworld_pb2_grpc.add_GreeterServicer_to_server(
LimitedGreeter(),
server)
port = server.add_insecure_port(server_address)
return server, port
def serve(server):
server.start()
server.wait_for_termination()
def main():
server, unused_port = create_server('[::]:50051')
serve(server)
if __name__ == '__main__':
logging.basicConfig()
main()
下一篇:client订阅服务器
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)