c# – 如何使用Observables实现轮询?

c# – 如何使用Observables实现轮询?,第1张

概述我有一个参数化的休息调用,应该每五秒执行一次不同的参数: Observable<TResult> restCall = api.method1(param1); 我需要创建一个可观察的< TResult>这将使用param1的不同值每5秒轮询restCall.如果api调用失败,我需要得到一个错误,并在5秒内进行下一个调用.只有当restCall完成(成功/错误)时,才应该测量呼叫间隔. 我正在使 我有一个参数化的休息调用,应该每五秒执行一次不同的参数:
Observable<TResult> restCall = API.method1(param1);

我需要创建一个可观察的< TResult>这将使用param1的不同值每5秒轮询restCall.如果API调用失败,我需要得到一个错误,并在5秒内进行下一个调用.只有当restCall完成(成功/错误)时,才应该测量呼叫间隔.

我正在使用RxJava,但.NET示例也是很好的.

解决方法 介绍

首先,我是.NET的人,而且我知道这种方法使用一些在Java中没有直接等价的成语.但是我接受你的话,并且基于这样一个伟大的问题,.NET的家伙会喜欢,希望它会引导你在rx-java中正确的路径,我从来没有看过.这是一个很长的答案,但它主要是解释 – 解决方案代码本身很短!

使用两者

我们首先需要一些工具来帮助解决这个问题.第一个是使用“Tleft”,“TRight”类型.这很重要,因为每个调用都有两个可能的结果,一个很好的结果或一个错误.但是我们需要将它们包装在一个类型中 – 我们不能使用OnError发送错误,因为这将终止结果流.看起来有点像元组,并且使它更容易处理这种情况. Rxx library有一个非常完整和良好的实现,但这里是一个简单的通用示例,其次是一个简单的实现,为我们的目的:

var goodResult = Either.Right<Exception,int>(1);var exception = Either.left<Exception,int>(new Exception());/* base class for leftValue and RightValue types */public abstract class Either<Tleft,TRight>{    public abstract bool Isleft { get; }    public bool IsRight { get { return !Isleft; } }    public abstract Tleft left { get; }    public abstract TRight Right { get;  }    }public static class Either{    public sealed class leftValue<Tleft,TRight> : Either<Tleft,TRight>    {        Tleft _leftValue;        public leftValue(Tleft leftValue)        {            _leftValue = leftValue;        }        public overrIDe Tleft left { get { return _leftValue; } }        public overrIDe TRight Right { get { return default(TRight); } }        public overrIDe bool Isleft { get { return true; } }    }    public sealed class RightValue<Tleft,TRight>    {        TRight _rightValue;        public RightValue(TRight rightValue)        {            _rightValue = rightValue;        }        public overrIDe Tleft left { get { return default(Tleft); } }        public overrIDe TRight Right { get { return _rightValue; } }        public overrIDe bool Isleft { get { return false; } }    }    // Factory functions to create left or right-valued Either instances    public static Either<Tleft,TRight> left<Tleft,TRight>(Tleft leftValue)    {        return new leftValue<Tleft,TRight>(leftValue);    }    public static Either<Tleft,TRight> Right<Tleft,TRight>(TRight rightValue)    {        return new RightValue<Tleft,TRight>(rightValue);    }}

请注意,按惯例,当使用或者建模成功或失败时,右侧用于成功的值,因为它当然是“对”:)

一些助手功能

我将用一些帮助函数来模拟你的问题的两个方面.首先,这里是一个生成参数的工厂 – 每次调用它将返回从1开始的整数序列中的下一个整数:

// An infinite supply of parametersprivate static int count = 0;public int ParameterFactory(){    return ++count; }

接下来,这是一个将您的休息调用模拟为IObservable的功能.此函数接受一个整数:

>如果整数为偶数,它返回一个Observable,它立即发送一个OnError.
>如果整数为奇数,则返回一个将整数与“-ret”连接的字符串,但只有在一秒钟之后才会传递.我们将使用它来检查轮询间隔是否按照您所请求的行为 – 作为完成调用之间的暂停,但是它们需要长时间,而不是常规间隔.

这里是:

// A asynchronous function representing the REST callpublic IObservable<string> SomeRestCall(int x){    return x % 2 == 0        ? Observable.Throw<string>(new Exception())        : Observable.Return(x + "-ret").Delay(TimeSpan.FromSeconds(1));   }

现在好点

下面是一个相当通用的可重用功能,我称之为Poll.它接受将被轮询的异步函数,该函数的参数工厂,所需的休息(无双关意图!)间隔,最后使用IScheduler.

我可以想出的最简单的方法是使用Observable.Create,它使用调度程序来驱动结果流. ScheduleAsync是一种使用.NET async / await窗体的计划方式.这是一个.NET习语,允许您以强制性的方式编写异步代码. async关键字引入了一个异步函数,可以等待其身体中的一个或多个异步调用,并且只有在调用完成时才会继续. I wrote a long explanation of this style of scheduling in this question,which includes the older recursive the style that might be easier to implement in an rx-java approach.代码如下所示:

public IObservable<Either<Exception,TResult>> Poll<TResult,TArg>(    Func<TArg,IObservable<TResult>> asyncFunction,Func<TArg> parameterFactory,TimeSpan interval,IScheduler scheduler){    return Observable.Create<Either<Exception,TResult>>(observer =>    {        return scheduler.ScheduleAsync(async (ctrl,ct) => {            while(!ct.IsCancellationRequested)            {                try                {                    var result = await asyncFunction(parameterFactory());                    observer.OnNext(Either.Right<Exception,TResult>(result));                }                catch(Exception ex)                {                    observer.OnNext(Either.left<Exception,TResult>(ex));                }                await ctrl.Sleep(interval,ct);            }        });            });    }

打破这一点,可观察.创建一般是一个创建IObservables的工厂,可以很好地控制结果如何发布到观察者.经常被忽略,有利于不必要的复杂的原始构图.

在这种情况下,我们正在使用它来创建一个“TRESult”,“异常”以便我们可以返回成功和失败的投票结果.

Create函数接受一个观察者,表示我们通过OnNext / OnError / OnCompleted传递结果的订阅者.我们需要在Create调用中返回一个Idisposable – 在.NET中,这是用户可以通过该句柄取消其订阅的句柄.在这里特别重要,因为Polling将会永远持续下去,或至少不会有OnComplete.

ScheduleAsync(或简单计划)的结果是这样一个句柄.处理时,它将取消我们计划的任何待处理事件 – 从而结束轮询循环.在我们的情况下,我们用于管理间隔的Sleep是可取消 *** 作,尽管Poll函数可以轻松地被修改为接受一个可取消的asyncFunction,它接受CancellationToken.

ScheduleAsync方法接受一个将被调用以调度事件的函数.它传递两个参数,第一个ctrl是调度程序本身.第二个ct是一个CancellationToken,我们可以使用它来查看取消是否被请求(由订阅者处理其订阅句柄).

轮询本身通过无限循环执行,只有当CancellationToken指示已取消请求时才终止.

在循环中,我们可以使用异步/等待的魔法异步调用轮询函数,但仍将其包装在异常处理程序中.这太棒了!假设没有错误,我们通过OnNext将结果作为正确的值发送给观察者.如果有一个异常,我们会把它作为一个观察者的左值发送.最后,我们使用调度器上的Sleep函数在休息间隔之后调度一个唤醒调用 – 不要与Thread.Sleep调用混淆,这通常不阻止任何线程.请注意,Sleep接受CancellationToken,使其也被中止!

我想你会同意这是非常酷的使用异步/等待简化什么是一个非常棘手的问题!

使用示例

最后,这里有一些调用Poll的测试代码,以及示例输出 – 对于LINQPad的所有代码,所有的代码将一起运行在liNQPad中,并使用Rx 2.1程序集:

voID Main(){    var subscription = Poll(SomeRestCall,ParameterFactory,TimeSpan.FromSeconds(5),ThreadPoolScheduler.Instance)        .TimeInterval()                                    .Subscribe(x => {            Console.Write("Interval: " + x.Interval);            var result = x.Value;            if(result.IsRight)                Console.Writeline(" Success: " + result.Right);            else                Console.Writeline(" Error: " + result.left.Message);        });    Console.Readline();        subscription.dispose();}Interval: 00:00:01.0027668 Success: 1-retInterval: 00:00:05.0012461 Error: Exception of type 'System.Exception' was thrown.Interval: 00:00:06.0009684 Success: 3-retInterval: 00:00:05.0003127 Error: Exception of type 'System.Exception' was thrown.Interval: 00:00:06.0113053 Success: 5-retInterval: 00:00:05.0013136 Error: Exception of type 'System.Exception' was thrown.

注意如果一个成功的结果发生错误立即返回结果之间的间隔是5秒(轮询间隔),或6秒(轮询间隔加上模拟的REST通话持续时间).

编辑 – 这是一个替代实现,不使用ScheduleAsync,而是使用旧样式的递归调度,没有async /等待语法.正如你所看到的,这是一个更麻烦的 – 但它也支持取消asyncFunction observable.

public IObservable<Either<Exception,TArg>(        Func<TArg,IScheduler scheduler)    {        return Observable.Create<Either<Exception,TResult>>(            observer =>                {                    var disposable = new Compositedisposable();                    var funcdisposable = new Serialdisposable();                    bool cancelRequested = false;                    disposable.Add(disposable.Create(() => { cancelRequested = true; }));                    disposable.Add(funcdisposable);                    disposable.Add(scheduler.Schedule(interval,self =>                        {                            funcdisposable.disposable = asyncFunction(parameterFactory())                                .Finally(() =>                                    {                                        if (!cancelRequested) self(interval);                                    })                                .Subscribe(                                    res => observer.OnNext(Either.Right<Exception,TResult>(res)),ex => observer.OnNext(Either.left<Exception,TResult>(ex)));                        }));                    return disposable;                });    }

请参阅我的其他答案,以避免.NET 4.5异步/等待功能,并且不使用计划调用.

我希望对rx-java的家伙有一些帮助!

总结

以上是内存溢出为你收集整理的c# – 如何使用Observables实现轮询?全部内容,希望文章能够帮你解决c# – 如何使用Observables实现轮询?所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/1260691.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-08
下一篇 2022-06-08

发表评论

登录后才能评论

评论列表(0条)

保存