android– 具有Backpressure的RxJava主题 – 只允许最后一个值在下游完成消耗后发出

android– 具有Backpressure的RxJava主题 – 只允许最后一个值在下游完成消耗后发出,第1张

概述我有一个PublishSubject在一些UI事件上调用onNext().订户通常需要2秒钟才能完成其工作.我需要忽略除了最后一个用户繁忙时对onNext()的所有调用.我尝试了以下,但是我无法控制流量.请求似乎排队等待,每个请求都得到处理(因此背压似乎不起作用).如何让它忽略所有请求,但最后一个?(我

我有一个PublishSubject在一些UI事件上调用onNext().订户通常需要2秒钟才能完成其工作.我需要忽略除了最后一个用户繁忙时对onNext()的所有调用.我尝试了以下,但是我无法控制流量.请求似乎排队等待,每个请求都得到处理(因此背压似乎不起作用).如何让它忽略所有请求,但最后一个? (我不想使用debounce,因为代码需要立即做出反应,任何合理的小超时都不起作用).

此外,我意识到使用subscribeOn与主题没有任何影响,因此我使用observeOn在其中一个运算符中进行异步工作.这是正确的方法吗?

Subject<Boolean> loadingQueue = PublishSubject.<Boolean>create().toSerialized();loadingQueue  .toFlowable(BackpressureStrategy.LATEST)  .observeOn(AndroIDSchedulers.mainThread())  .map(discarded -> {    // PRE-LOADING    Log.d("RXLOADING", "PRE-LOADING: " + Thread.currentThread().getname());    return discarded;   })   .observeOn(Schedulers.computation())   .map(b -> {     Log.d("RXLOADING", "LOADING: " + Thread.currentThread().getname());     Thread.sleep(2000);     return b;   })   .observeOn(AndroIDSchedulers.mainThread())   .subscribe(b -> {      Log.d("RXLOADING", "FINISHED: " + Thread.currentThread().getname() + "\n\n");   });loadingQueue.onNext(true);loadingQueue.onNext(true);loadingQueue.onNext(true);....

我看到的输出是:

PRE-LOADING: mainPRE-LOADING: mainLOADING: RxcomputationThreadPool-1PRE-LOADING: mainPRE-LOADING: mainPRE-LOADING: mainPRE-LOADING: mainPRE-LOADING: mainPRE-LOADING: mainLOADING: RxcomputationThreadPool-1FINISHED: mainLOADING: RxcomputationThreadPool-1FINISHED: mainLOADING: RxcomputationThreadPool-1FINISHED: mainLOADING: RxcomputationThreadPool-1FINISHED: mainLOADING: RxcomputationThreadPool-1FINISHED: mainLOADING: RxcomputationThreadPool-1FINISHED: mainLOADING: RxcomputationThreadPool-1FINISHED: mainFINISHED: main

相反,我希望代码执行以下 *** 作(即加载一次,当它加载时,背压以阻止所有请求并发出最后一个请求,一旦第一个观察者完成 – 所以总的来说理想情况下应该加载两次最多):

PRE-LOADING: mainLOADING: RxcomputationThreadPool-1FINISHED: mainPRE-LOADING: mainLOADING: RxcomputationThreadPool-1FINISHED: main

解决方法:

您不能使用observeOn执行此 *** 作,因为它将缓冲至少1个元素,因此如果已经发生一个“LOADING”,则始终执行“PRE-LOADING”阶段.

但是,您可以延迟执行此 *** 作,因为它不会 *** 纵链上的请求数量,并在调度程序上单独调度每个onNext,而无需自行排队:

public static voID main(String[] args) throws Exception {    Subject<Boolean> loadingQueue =          PublishSubject.<Boolean>create().toSerialized();    loadingQueue      .toFlowable(BackpressureStrategy.LATEST)      .delay(0, TimeUnit.MILliSECONDS, Schedulers.single())        // <-------      .map(discarded -> {        // PRE-LOADING        System.out.println("PRE-LOADING: "              + Thread.currentThread().getname());        return discarded;       })       .delay(0, TimeUnit.MILliSECONDS, Schedulers.computation())  // <-------       .map(b -> {           System.out.println("LOADING: "              + Thread.currentThread().getname());         Thread.sleep(2000);         return b;       })       .delay(0, TimeUnit.MILliSECONDS, Schedulers.single())       // <-------       .rebatchRequests(1)             // <----------------------------------- one-by-one       .subscribe(b -> {           System.out.println("FINISHED: "                + Thread.currentThread().getname() + "\n\n");       });    loadingQueue.onNext(true);    loadingQueue.onNext(true);    loadingQueue.onNext(true);    Thread.sleep(10000);}
总结

以上是内存溢出为你收集整理的android – 具有Backpressure的RxJava主题 – 只允许最后一个值在下游完成消耗后发出全部内容,希望文章能够帮你解决android – 具有Backpressure的RxJava主题 – 只允许最后一个值在下游完成消耗后发出所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/web/1108598.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-29
下一篇 2022-05-29

发表评论

登录后才能评论

评论列表(0条)

保存