android-flatMap如何深入工作?

android-flatMap如何深入工作?,第1张

概述我对flatMap如何控制其“子”线程感兴趣,例如以下代码可以正常工作:privateFlowable<PlcDataPackage>createIntervalPlcFlowable(){returnFlowable.interval(1,TimeUnit.SECONDS,Schedulers.computation()).onBackpressureLatest().paralle

我对flatMap如何控制其“子”线程感兴趣,例如以下代码可以正常工作:

 private Flowable<PlcdataPackage> createIntervalPlcFlowable() {    return Flowable.interval(1, TimeUnit.SECONDS, Schedulers.computation())            .onBackpressureLatest()            .parallel()            .runOn(Schedulers.computation())            .flatMap((Function<Long, Publisher<PlcdataPackage>>) aLong -> mDataPackageFlowable)            .sequential();}

并且此代码在被调用128次后停止(这对于flowable是默认的maxConcurent):

  private ConnectableFlowable<PlcdataPackage> createConnectablePlcFlowable() {    return mPlcIntervalFlowable.onBackpressureLatest()            .subscribeOn(Schedulers.single())            .publish();}

订阅:

adddisposable(mGetPlcUpdatesChanelUseCase.execute()                              .observeOn(AndroIDSchedulers.mainThread())                              .subscribe(plcDto -> Timber.d("plcReceiver"),                                         Timber::e));

用例:

public class GetPlcUpdatesChanelUseCase extends UseCase<PlcDto, Object> {    private final PlcRepository mPlcRepository;    public GetPlcUpdatesChanelUseCase(PlcRepository plcRepository) {        mPlcRepository = plcRepository;    }    @OverrIDe    public Flowable<PlcDto> buildFlowable(Optional<Object> optional) {        return mPlcRepository.getUpdatesChannel();    }    @OverrIDe    public boolean isParamsrequired() {        return false;    }}

回购法

@OverrIDe    public Flowable<PlcDto> getUpdatesChannel() {        return mPlcCore.getPlcConnectableFlowable()                .map(mPlcInfotopPlcDtotransformer::transform);    }

PlcCore方法

public ConnectableFlowable<PlcdataPackage> getPlcConnectableFlowable() {    return mConnectableFlowable;}

而mConnectableFlowable是:

mConnectableFlowable = createConnectablePlcFlowable();        mConnectableFlowable.connect();

因此,据我了解,mDataPackageFlowable创建一次,然后执行,并且每次为其子级创建新的“线程”,并且在执行128次之后,它只会阻止以下所有执行.

因此,存在3个主要问题:

1)flatMap控制子线程吗?

2)为什么要在新线程上执行每个新的“请求”?(也许不,然后告诉我)

3)在哪种情况下,我们可能会失去对子线程的控制.

免责声明:英语是我的第二语言,如果有不清楚的地方请问我,我会尝试补充说明.

 private Flowable<PlcdataPackage> createIntervalPlcFlowable() {    return Flowable.interval(1, TimeUnit.SECONDS, Schedulers.computation())            .onBackpressureLatest()            .parallel()            .runOn(Schedulers.computation())            .sequental()

这种组合不起作用,它实际上删除了128倍的flatMap调用限制,但不会清除导致内存泄漏和OOM异常的较旧的内部预订.请改用某种地图.

解决方法:

需要订阅者才能使观察者链正常工作.当您使用interval()生成数据时,您将提供一个“热”的可观察对象,它自己发出值. “冷”可观察对象仅在发生订阅时才会发出值.

128是flatMap()在停顿之前缓冲的条目数.如果有订阅,则flatMap()会向下游发射内部可观察到的值,并且不会停顿.

根据javadoc,flatMap()本身无法在特定的调度程序上运行.这意味着它不会在特定线程上 *** 纵其订阅.如果要控制由flatMap()调用的可观察对象所完成的工作,则可以使用显式调度:

observable  .flatMap( value -> fun(value).subscribeOn( myScheduler ) )  .subscribe();

例如,myScheduler可能是Schedulers.io(),它会在需要时创建线程.或者,它可以是您提供了固定数量的线程的执行程序.我经常使用仅分配了一个或两个或48个线程的执行器来控制flatMap()的扇出.

您还可以向flatMap()提供一个并行度参数,该参数告诉它它将维护的最大预订数.当flatMap()达到最大值时,它将缓冲请求,直到它订阅的观察者链完成为止.

parallel()运算符执行类似的 *** 作,但它拆分传入的事件,并在单独的线程上发出它们.同样,javadoc具有出色的描述以及精美的图片.

总是有可能失去对线程的控制.使用RxJava运算符时,请阅读其文档.您需要了解两个领域.第一个领域是运算符要使用的调度程序.如果它说它不能在特定的调度程序上运行,那么它不会直接影响线程的选择或线程的使用方式.如果它声明使用特定的调度程序,那么您需要了解该调度程序的工作方式.总会有另一个版本的运算符,可让您提供自己选择的调度程序.

您必须了解的第二个方面是背压.您需要了解背压的含义以及如何应用.每当您跨越线程边界时,例如通过使用observeOn()或subscribeOn(),这一点尤其重要.

总结

以上是内存溢出为你收集整理的android-flatMap如何深入工作?全部内容,希望文章能够帮你解决android-flatMap如何深入工作?所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/web/1082014.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-27
下一篇 2022-05-27

发表评论

登录后才能评论

评论列表(0条)

保存