我正在尝试实现在我的应用程序的活动之间共享的计时器Observable.我正在实现一个Dagger单例类的实现,我将其注入到每个不同Activity的每个Presenter中.
我以这种方式创建了一次Observable:
Observable.defer(() -> Observable.timer(milliseconds, TimeUnit.MILliSECONDS).map(t -> this::doSomethingCool())) .subscribeOn(Schedulers.newThread()) .observeOn(AndroIDSchedulers.mainThread()) .share();
我使用以下功能从Presenter进行订阅:
public Observable<Status> register(Callback callback) { PublishSubject<Status> subject = PublishSubject.create(); subject.subscribe(status -> {}, throwable -> L.LOGE(TAG, throwable.getMessage()), () -> callback.onStatusChanged(mBasketStatus)); mObservable.subscribe(subject); basketCounterCallback.onStatusChanged(status)); subject.doOnUnsubscribe(() -> L.LOGD(TAG, "Unsubcribed from subject!")); return subject.asObservable();}
我将该主题存储为可观察到的每个演示者,并呼吁:@H_301_19@ obs.unsubscribeOn(AndroIDSchedulers.mainThread())
取消订阅(在onPause()方法中).我还尝试使用Scheduler Schedulers.immediate()退订
但是无论如何,该回调都会被调用X次(其中X是我已订阅的所有Presenter,因此它并没有取消订阅).还有日志“取消订阅主题!”没有接到电话.
如何正确退订每个主题?
提前致谢
编辑:
由于评论,添加了更多的实现细节:
这是我创建Observable并将其存储在Singleton类StatusManager的成员中的状态(状态也是singleton):
private Observable<BasketStatus> mObservable;private Status mStatus;public Observable<BasketStatus> start(long milliseconds, Status status, Callback callback) { if (mObservable == null) mObservable = createObservable(milliseconds, status); return register(callback);}private Observable<BasketStatus> createObservable(long milliseconds, Status status) { mStatus = status; return Observable.defer(() -> Observable.timer(milliseconds, TimeUnit.MILliSECONDS).map(t -> status.upgradeStatus())) .subscribeOn(Schedulers.newThread()) .observeOn(AndroIDSchedulers.mainThread()) .share();}public Observable<BasketStatus> register(Callback callback) { PublishSubject<Status> subject = PublishSubject.create(); subject.subscribe(status -> {}, throwable -> L.LOGE(TAG, throwable.getMessage()), () -> callback.onStatusChanged(mStatus)); mObservable.subscribe(subject); callback.onStatusChanged(mStatus)); subject.doOnUnsubscribe(() -> L.LOGD(TAG, "Unsubcribed from subject!")); return subject.asObservable();}
从启动计时器的Presenter调用方法start(…)之后,我从下一个Presenters调用register(…)方法:
class Presenter implements Callback { private Observable<BasketStatus> mRegister; @Inject public Presenter(Status status, StatusManager statusManager) { mRegister = statusManager.start(20000, status, this); } // Method called from onPause() public voID unregisterFromBasketStatus() { mRegister.unsubscribeOn(Schedulers.immediate()); }}
下一位主持人
@Injectpublic NextPresenter(StatusManager statusManager) { mBasketStatusManager.register(this);}
解决方法:
正如我在评论中提到的那样,您没有得到unsubscribeOn运算符的行为.它不会取消任何订阅,而是告诉每个订阅者在取消订阅发生时应该在哪里进行工作.这也没有意义,因为您不是从Observable取消订阅,而是从表示观察者与流本身之间的连接的订阅退订.
回到你的问题.现在设计代码的方式,返回的主题完全没有用.您正在使用注册方法中的可观察计时器,但是您将这些通知转发给主题,此主题此后再也没有任何人订阅.如果确实需要它们,例如在我们未看到的代码的某些部分中,您需要存储subscribe()方法的结果,该方法是一个Subscription对象,并在需要时在其上调用unsubscribe().我认为是unregisterFromBasketStatus).
但是代码中还有更多问题.例如 :
subject.doOnUnsubscribe(() -> L.LOGD(TAG, "Unsubcribed from subject!"));return subject.asObservable();
在第一行中,您不会在任何地方存储该 *** 作的结果.由于Observables是不可变的结构,因此每个运算符都创建一个新流,该流接收来自上一个的通知.由此可见,很明显,当您在第二行中返回subject.asObservable()时,您不会从第一行中返回已修改的observable,而是返回未修改的旧状态.要更正此问题,只需将结果替换为主题变量:subject = subject.doOnUnsubscribe(()-> L.LOGD(TAG,“取消订阅主题!”)));
其次,使用流的原因之一是完全替换回调.当然,它将起作用,但是您正在减轻Rx带给代码库的许多好处.但是,它的可读性要差得多,任何需要处理您的代码的人都会把您诅咒到地狱,以至于他实际上可能会成功:-))我知道,Rx从一开始就很难学习,但是尝试尽可能避免这种情况.
总结以上是内存溢出为你收集整理的android-使用RxJava与取消订阅之间的活动之间可以共享观察全部内容,希望文章能够帮你解决android-使用RxJava与取消订阅之间的活动之间可以共享观察所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)