RxJava Backpressure(快速生产者缓慢的消费者)

RxJava Backpressure(快速生产者缓慢的消费者),第1张

概述我有执行方法,它会在io线程上进行一些耗时的网络调用例/***networkcall*@paramvalue*@return*/privateObservable<Integer>execute(finalintvalue){returnObservable.create(newObservable.OnSubscribe<Integer>(){@Overridepubli

我有执行方法,它会在io线程上进行一些耗时的网络调用

/** * network call * @param value * @return */private Observable<Integer> execute(final int value) {    return Observable.create(new Observable.OnSubscribe<Integer>() {        @OverrIDe        public voID call(Subscriber<? super Integer> subscriber) {            try {                Thread.sleep(500);            } catch (InterruptedException e) {                e.printstacktrace();            }            System.out.println("done " + value);            subscriber.onNext(value);            subscriber.onCompleted();        }    }).subscribeOn(Schedulers.io());}

那么我有必须按顺序执行的“命令”列表. (相继)

示例(Observable.range(x,y)表示命令列表)

public List<Integer> testObservableBackpressure(){   return Observable.range(0,5).flatMap(new Func1<Integer, Observable<Integer>>() {        @OverrIDe        public Observable<Integer> call(Integer integer) {            System.out.println("started " + integer);            return exeute(integer);        }    }).toList().toBlocking().single();}

这样输出是

started 0started 1started 2started 3started 4done 0done 1done 2done 4done 3

产品比消耗更快

我想要这样的结果

started 0done 0started 1done 1started 2done 2...

但..

public List<Integer> testObservableBackpressure(){    return Observable.create(new Observable.OnSubscribe<Integer>() {        @OverrIDe        public voID call(final Subscriber<? super Integer> subscriber) {            Observable.range(0,5).subscribe(new Subscriber<Integer>() {                @OverrIDe                public voID onStart() {                    request(1);                }                @OverrIDe                public voID onCompleted() {                    subscriber.onCompleted();                }                @OverrIDe                public voID one rror(Throwable e) {                    subscriber.onError(e);                }                @OverrIDe                public voID onNext(Integer integer) {                    System.out.println("started " + integer);                    execute(integer).subscribe(new Action1<Integer>() {                        @OverrIDe                        public voID call(Integer integer) {                            subscriber.onNext(integer);                            request(1);                        }                    });                }            });        }    }).toList().toBlocking().single();}

这样结果如预期

started 0done 0started 1done 1started 2done 2started 3done 3started 4

我的问题是是否还有另一种更优雅的方式来解决此问题?

解决方法:

我不确定这里是否需要任何特定的背压策略.只需使用concatMap.

如果使用concatMap而不是flatMap,则每个新输入值仅在concatMap发出的最后一个Observable完成时进行订阅.在底层,concatMap为此使用了SerialSubscription.那应该给您想要的订单.

总结

以上是内存溢出为你收集整理的RxJava Backpressure(快速生产者缓慢的消费者)全部内容,希望文章能够帮你解决RxJava Backpressure(快速生产者缓慢的消费者)所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/web/1079165.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-27
下一篇 2022-05-27

发表评论

登录后才能评论

评论列表(0条)

保存