如何在客户端使用Java读取gRPC中的元数据

如何在客户端使用Java读取gRPC中的元数据,第1张

概述我正在使用 Java和Protoc 3.0编译器,我的proto文件在下面提到.      https://github.com/openconfig/public/blob/master/release/models/rpc/openconfig-rpc-api.yang syntax = "proto3";package Telemetry;// Interface exported 我正在使用 Java和Protoc 3.0编译器,我的proto文件在下面提到.
https://github.com/openconfig/public/blob/master/release/models/rpc/openconfig-rpc-api.yang
Syntax = "proto3";package Telemetry;// Interface exported by Agentservice OpenConfigTelemetry {    // Request an inline subscription for data at the specifIEd path.    // The device should send telemetry data back on the same    // connection as the subscription request.    rpc telemetrySubscribe(SubscriptionRequest)                     returns (stream OpenConfigData) {}    // Terminates and removes an exisiting telemetry subscription    rpc cancelTelemetrySubscription(CancelSubscriptionRequest)      returns (CancelSubscriptionReply) {}    // Get the List of current telemetry subscriptions from the    // target. This command returns a List of existing subscriptions    // not including those that are established via configuration.    rpc getTelemetrySubscriptions(GetSubscriptionsRequest)          returns (GetSubscriptionsReply) {}    // Get Telemetry Agent Operational States    rpc getTelemetryOperationalState(GetoperationalStateRequest)    returns (GetoperationalStateReply) {}    // Return the set of data enCodings supported by the device for    // telemetry data    rpc getDataEnCodings(DataEnCodingRequest)                       returns (DataEnCodingReply) {}}// Message sent for a telemetry subscription requestmessage SubscriptionRequest {    // Data associated with a telemetry subscription    Subscriptioninput input                                 = 1;    // List of data models paths and filters    // which are used in a telemetry operation.    repeated Path path_List                                 = 2;    // The below configuration is not defined in Openconfig RPC.    // It is a proposed extension to configure additional    // subscription request features.    SubscriptionAdditionalConfig additional_config          = 3;}// Data associated with a telemetry subscriptionmessage Subscriptioninput {    // List of optional collector endpoints to send data for    // this subscription.    // If no collector destinations are specifIEd,the collector    // destination is assumed to be the requester on the rpc channel.    repeated Collector  collector_List                      = 1;}// Collector endpoints to send data specifIEd as an ip+port combination.message Collector {    // IP address of collector endpoint    string address                                          = 1;    // Transport protocol port number for the collector destination.    uint32 port                                             = 2;}// Data model pathmessage Path {    // Data model path of interest    // Path specification for elements of OpenConfig data models    string path                                             = 1;    // Regular Expression to be used in filtering state leaves    string filter                                           = 2;    // If this is set to true,the target device will only send    // updates to the collector upon a change in data value    bool suppress_unchanged                                 = 3;    // Maximum time in ms the target device may go without sending    // a message to the collector. If this time expires with    // suppress-unchanged set,the target device must send an update    // message regardless if the data values have changed.    uint32 max_silent_interval                              = 4;    // Time in ms between collection and transmission of the    // specifIEd data to the collector platform. The target device    // will sample the corresponding data (e.g,. a counter) and    // immediately send to the collector destination.    //    // If sample-frequency is set to 0,then the network device    // must emit an update upon every datum change.    uint32 sample_frequency                                 = 5;}// Configure subscription request additional features.message SubscriptionAdditionalConfig {    // limit the number of records sent in the stream    int32 limit_records                                     = 1;    // limit the time the stream remains open    int32 limit_time_seconds                                = 2;}// Reply to inline subscription for data at the specifIEd path is done in// two-folds.// 1. Reply data message sent out using out-of-band channel.// 2. Telemetry data send back on the same connection as the//    subscription request.// 1. Reply data message sent out using out-of-band channel.message SubscriptionReply {    // Response message to a telemetry subscription creation or    // get request.    SubscriptionResponse response                           = 1;    // List of data models paths and filters    // which are used in a telemetry operation.    repeated Path path_List                                 = 2;}// Response message to a telemetry subscription creation or get request.message SubscriptionResponse {    // Unique ID for the subscription on the device. This is    // generated by the device and returned in a subscription    // request or when Listing existing subscriptions    uint32 subscription_ID = 1;}// 2. Telemetry data send back on the same connection as the//    subscription request.message OpenConfigData {    // router name:export IP address    string system_ID                                        = 1;    // line card / RE (slot number)    uint32 component_ID                                     = 2;    // PFE (if applicable)    uint32 sub_component_ID                                 = 3;    // Path specification for elements of OpenConfig data models    string path                                             = 4;    // Sequence number,monotonically increasing for each    // system_ID,component_ID,sub_component_ID + path.    uint64 sequence_number                                  = 5;    // timestamp (milliseconds since epoch)    uint64 timestamp                                        = 6;    // List of key-value pairs    repeated keyvalue kv                                    = 7;}// Simple Key-value,where value Could be one of scalar typesmessage keyvalue {    // Key    string key                                              =  1;    // One of possible values    oneof value {        double double_value                                 =  5;        int64  int_value                                    =  6;        uint64 uint_value                                   =  7;        sint64 sint_value                                   =  8;        bool   bool_value                                   =  9;        string str_value                                    = 10;        bytes  bytes_value                                  = 11;    }}// Message sent for a telemetry subscription cancellation requestmessage CancelSubscriptionRequest {    // Subscription IDentifIEr as returned by the device when    // subscription was requested    uint32 subscription_ID                                  = 1;}// Reply to telemetry subscription cancellation requestmessage CancelSubscriptionReply {    // Return code    ReturnCode code                                         = 1;    // Return code string    string     code_str                                     = 2;};// Result of the operationenum ReturnCode {    SUCCESS                                                 = 0;    NO_SUBSCRIPTION_ENTRY                                   = 1;    UNKNowN_ERROR                                           = 2;}// Message sent for a telemetry get requestmessage GetSubscriptionsRequest {    // Subscription IDentifIEr as returned by the device when    // subscription was requested    // --- or ---    // 0xFFFFFFFF for all subscription IDentifIErs    uint32 subscription_ID                                  = 1;}// Reply to telemetry subscription get requestmessage GetSubscriptionsReply {    // List of current telemetry subscriptions    repeated SubscriptionReply subscription_List            = 1;}// Message sent for telemetry agent operational states requestmessage GetoperationalStateRequest {    // Per-subscription_ID level operational state can be requested.    //    // Subscription IDentifIEr as returned by the device when    // subscription was requested    // --- or ---    // 0xFFFFFFFF for all subscription IDentifIErs including agent-level    // operational stats    // --- or ---    // If subscription_ID is not present then sent only agent-level    // operational stats    uint32 subscription_ID                                  = 1;    // Control verbosity of the output    VerbosityLevel verbosity                                = 2;}// Verbosity Levelenum VerbosityLevel {    DETAIL                                                  = 0;    TERSE                                                   = 1;    BRIEF                                                   = 2;}// Reply to telemetry agent operational states requestmessage GetoperationalStateReply {    // List of key-value pairs where    //     key      = operational state deFinition    //     value    = operational state value    repeated keyvalue kv                                    = 1;}// Message sent for a data enCoding requestmessage DataEnCodingRequest {}// Reply to data enCodings supported requestmessage DataEnCodingReply {    repeated EnCodingType  enCoding_List                    = 1;}// EnCoding Type Supportedenum EnCodingType {    UNdefineD                                               = 0;    XML                                                     = 1;    JsON_IETF                                               = 2;    PROTO3                                                  = 3;}

为了进行服务调用(rpc TelemetrySubscribe),我首先需要读取具有订阅ID的头,然后开始阅读消息.现在,使用Java我能够连接服务,我确实介绍了拦截器但是当我打印/检索头时它是null.我的调用拦截器代码如下,

ClIEntInterceptor interceptor = new headerClIEntInterceptor();      originChannel = OkhttpChannelBuilder.forAddress(host,port)        .usePlaintext(true)        .build();     Channel channel =  ClIEntInterceptors.intercept(originChannel,interceptor);      telemetryStub = OpenConfigTelemetryGrpc.newStub(channel);

这是读取元数据的拦截器代码.

@OverrIDe  public <ReqT,RespT> ClIEntCall<ReqT,RespT> interceptCall(MethodDescriptor<ReqT,RespT> method,CallOptions callOptions,Channel next) {    return new SimpleForwardingClIEntCall<ReqT,RespT>(next.newCall(method,callOptions)) {      @OverrIDe      public voID start(Listener<RespT> responseListener,Metadata headers) {        super.start(new SimpleForwardingClIEntCallListener<RespT>(responseListener) {          @OverrIDe          public voID onheaders(Metadata headers) {             Key<String> CUSTOM_header_KEY = Metadata.Key.of("responseKEY",Metadata.ASCII_STRING_MARSHALLER);            System.out.println("Contains Key?? "+headers.containsKey(CUSTOM_header_KEY));

想知道有没有其他方法来读取元数据或第一个有订阅ID的消息?所有我需要阅读第一条有订阅ID的消息,并将相同的订阅ID返回给服务器以便流可以启动我使用相同的原型文件使用相同的Python代码,它通过下面的代码提供与服务器通信仅供参考:

sub_req = SubscribeRequestMsg("host",port)     data_itr = stub.telemetrySubscribe(sub_req,_TIMEOUT_SECONDS)     Metadata = data_itr.initial_Metadata()                   if Metadata[0][0] == "responseKey":                    Metainfo = Metadata[0][1]                    print Metainfo                    subreply = agent_pb2.SubscriptionReply()                    subreply.SetInParent()                    Google.protobuf.text_format.Merge(Metainfo,subreply)                    if subreply.response.subscription_ID:                    SUB_ID = subreply.response.subscription_ID

从上面的python代码我可以轻松检索元数据对象,不知道如何使用Java检索它?

在阅读MetaData之后,我得到的是:元数据({content-type = [application / grpc],grpc-enCoding = [IDentity],grpc-accept-enCoding = [IDentity,deflate,gzip]})

但我知道从元数据到它还有一条线,也就是说

response {  subscription_ID: 2}

如何从header中提取包含订阅ID的最后一个响应.我确实尝试了很多选项,我迷失在这里.

解决方法 您使用的方法是请求元数据,而不是响应元数据:
public voID start(Listener<RespT> responseListener,Metadata headers) {

对于响应元数据,您将需要ClIEntCall.Listener并等待onheaders回调:

public voID onheaders(Metadata headers)

我觉得你提到的元数据的使用似乎很奇怪.元数据通常用于其他错误详细信息或不特定于RPC方法的交叉功能(如身份验证,跟踪等).

总结

以上是内存溢出为你收集整理的如何在客户端使用Java读取gRPC中的元数据全部内容,希望文章能够帮你解决如何在客户端使用Java读取gRPC中的元数据所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/1213810.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-04
下一篇 2022-06-04

发表评论

登录后才能评论

评论列表(0条)

保存