RxJava合并请求序列

RxJava合并请求序列,第1张

RxJava合并请求序列

tl; dr 异步地或在调度程序上使用

concatMapEager
flatMap
执行子调用。


很长的故事

我不是android开发人员,所以我的问题将仅限于纯RxJava(版本1和版本2)。

如果我看对图片,所需的流程是:

some query param   --> Execute query on API_1 -> list of items          |-> Execute query for item 1 on API_2 -> extended info of item1          |-> Execute query for item 2 on API_2 -> extended info of item1          |-> Execute query for item 3 on API_2 -> extended info of item1          ...          -> Execute query for item n on API_2 -> extended info of item1  ----------------------------------------------------------------------/      |      --> stream (or list) of extended item info for the query param

假设Retrofit为

interface Api1 {    @GET("/api1") Observable<List<Item>> items(@Query("param") String param);}interface Api2 {    @GET("/api2/{item_id}") Observable<ItemExtended> extendedInfo(@Path("item_id") String item_id);}

如果项目的顺序不重要,则

flatMap
只能使用:

api1.items(queryParam)    .flatMap(itemList -> Observable.fromIterable(itemList)))    .flatMap(item -> api2.extendedInfo(item.id()))    .subscribe(...)

仅当 改造生成器配置有

  • 要么使用异步适配器(调用将在okhttp内部执行程序中排队)。我个人认为这不是一个好主意,因为您无法控制此执行器。

    .addCallAdapterFactory(RxJava2CallAdapterFactory.createAsync()
  • 或使用基于调度程序的适配器(调用将在RxJava调度程序上进行调度)。这是我的首选,因为您明确选择了使用哪个调度程序,因此很可能是IO调度程序,但是您可以自由尝试其他调度程序。

    .addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io()))

原因是

flatMap
将订阅由创建的每个可观察对象
api2.extendedInfo(...)
,并将它们合并到结果可观察对象中。因此结果将按接收顺序显示。

如果 改装客户端 没有 设置为异步或组对调度运行,也可以设置一个:

api1.items(queryParam)    .flatMap(itemList -> Observable.fromIterable(itemList)))    .flatMap(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))    .subscribe(...)

这种结构几乎与先前的执行相同,它 在本地 指示

api2.extendedInfo
应该在每个调度程序上运行。

可以调整的

maxConcurrency
参数
flatMap
以控制您想同时执行多少个请求。尽管我对此会保持谨慎,但您不想同时运行所有查询。通常,默认值
maxConcurrency
足够好(
128
)。

现在,如果原始查询的顺序很重要

concatMap
通常是运算符,它
flatMap
按顺序执行相同的 *** 作,但是顺序执行,如果代码需要等待所有子查询执行,则结果会很慢。尽管解决方案与相比又迈出了一步
concatMapEager
,这一步将按顺序订阅可观察的内容,并根据需要缓冲结果。

假设改造客户端是异步的或在特定的调度程序上运行:

api1.items(queryParam)    .flatMap(itemList -> Observable.fromIterable(itemList)))    .concatMapEager(item -> api2.extendedInfo(item.id()))    .subscribe(...)

或者,如果必须在本地设置调度程序:

api1.items(queryParam)    .flatMap(itemList -> Observable.fromIterable(itemList)))    .concatMapEager(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))    .subscribe(...)

也可以在此运算符中调整并发性。


另外,如果Api返回

Flowable
,则可以
.parallel
在RxJava
2.1.7中使用仍处于beta状态的Api。但是结果却是不规则的,我还不知道有没有排序的方法(还可以吗?)。

api.items(queryParam) // Flowable<Item>   .parallel(10)   .runOn(Schedulers.io())   .map(item -> api2.extendedInfo(item.id()))   .sequential();     // Flowable<ItemExtended>


欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5561093.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-14
下一篇 2022-12-14

发表评论

登录后才能评论

评论列表(0条)

保存