手动取消订阅
Consumer类型
Observable创建返回disposable取消
public class SecondActivity extends AppCompatActivity { private static final String TAG = "SecondActivity"; private disposable disposable; @OverrIDe protected voID onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentVIEw(R.layout.activity_second); disposable = Observable.create(new ObservableOnSubscribe<String>() { @OverrIDe public voID subscribe(ObservableEmitter<String> emitter) throws Exception { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printstacktrace(); } } }).subscribeOn(Schedulers.io()) .observeOn(AndroIDSchedulers.mainThread()) .subscribe(new Consumer<String>() { @OverrIDe public voID accept(String s) throws Exception { Log.d(TAG,"accept: "+s); } }); } @OverrIDe protected voID onDestroy() { super.onDestroy(); Log.d(TAG,"onDestroy: "); //取消订阅 if(disposable != null && !disposable.isdisposed()){ disposable.dispose(); Log.d(TAG,"onDestroy: dispose"); } }}
普通类型Observer
在Observer中获取disposable然后取消
public class ThirdActivity extends AppCompatActivity { private static final String TAG = "ThirdActivity"; disposable disposable; @OverrIDe protected voID onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentVIEw(R.layout.activity_third); Observable.create(new ObservableOnSubscribe<String>() { @OverrIDe public voID subscribe(ObservableEmitter<String> emitter) throws Exception { try { Thread.sleep(5000); emitter.onNext("testInfo"); } catch (InterruptedException e) { e.printstacktrace(); } } }).subscribeOn(Schedulers.io()) .observeOn(AndroIDSchedulers.mainThread()) .subscribe(new Observer<String>() { @OverrIDe public voID onSubscribe(disposable d) { disposable = d; } @OverrIDe public voID onNext(String s) { Log.d(TAG,"onNext: "+s); } @OverrIDe public voID onError(Throwable e) { Log.d(TAG,"onError: "); } @OverrIDe public voID onComplete() { Log.d(TAG,"onComplete: "); } }); } @OverrIDe protected voID onDestroy() { super.onDestroy(); Log.d(TAG,"onDestroy: "); //然后在需要取消订阅的地方调用即可 if (disposable != null && !disposable.isdisposed()) { Log.d(TAG,"dispose: "); disposable.dispose(); } }}
disposableObserver类型
利用disposableObserver和SubscribeWith直接返回disposable,然后取消
public class FourthActivity extends AppCompatActivity { private static final String TAG = "FourthActivity"; private disposableObserver<String> observer; @OverrIDe protected voID onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentVIEw(R.layout.activity_fourth); observer = Observable.create(new ObservableOnSubscribe<String>() { @OverrIDe public voID subscribe(ObservableEmitter<String> emitter) throws Exception { try { Thread.sleep(5000); emitter.onNext("testInfo"); } catch (InterruptedException e) { e.printstacktrace(); } } }).subscribeOn(Schedulers.io()) .observeOn(AndroIDSchedulers.mainThread()) .subscribeWith(new disposableObserver<String>() { @OverrIDe public voID onNext(String o) { Log.d(TAG,"onNext: "+o); } @OverrIDe public voID onError(Throwable e) { Log.d(TAG,"onError: "); } @OverrIDe public voID onComplete() { Log.d(TAG,"onComplete: "); } }); } @OverrIDe protected voID onDestroy() { super.onDestroy(); if (observer != null && !observer.isdisposed()) { Log.d(TAG,"dispose: "); observer.dispose(); } }}
取消多个Observer
把多个Observer添加Compositedisposable,一次取消
public class ComdisposableActivity extends AppCompatActivity { private disposable disposable1; private disposable disposable2; private static final String TAG = "ComdisposableActivity"; @OverrIDe protected voID onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentVIEw(R.layout.activity_com_disposable); Observable.create(new ObservableOnSubscribe<String>() { @OverrIDe public voID subscribe(ObservableEmitter<String> emitter) throws Exception { try { Thread.sleep(5000); emitter.onNext("testInfo"); } catch (InterruptedException e) { e.printstacktrace(); } } }).subscribeOn(Schedulers.io()) .observeOn(AndroIDSchedulers.mainThread()) .doOndispose(new Action() { @OverrIDe public voID run() throws Exception { Log.d(TAG,"run: Unsubscribing subscription from onCreate()"); } }) .subscribe(new Observer<String>() { @OverrIDe public voID onSubscribe(disposable d) { disposable1 = d; } @OverrIDe public voID onNext(String s) { Log.d(TAG,"onComplete: "); } }); Observable.create(new ObservableOnSubscribe<String>() { @OverrIDe public voID subscribe(ObservableEmitter<String> emitter) throws Exception { try { Thread.sleep(5000); emitter.onNext("testInfo"); } catch (InterruptedException e) { e.printstacktrace(); } } }).subscribeOn(Schedulers.io()) .observeOn(AndroIDSchedulers.mainThread()) .subscribe(new Observer<String>() { @OverrIDe public voID onSubscribe(disposable d) { disposable2 = d; } @OverrIDe public voID onNext(String s) { Log.d(TAG,"onComplete: "); } }); } @OverrIDe protected voID onDestroy() { super.onDestroy(); Compositedisposable compositedisposable = new Compositedisposable(); //批量添加 compositedisposable.add(disposable1); compositedisposable.add(disposable2); //最后一次性全部取消订阅 compositedisposable.dispose(); }}
Rxlifecyle取消
OnDestory取消
Observable.interval(1,TimeUnit.SECONDS) .doOndispose(new Action() { @OverrIDe public voID run() throws Exception { Log.d(TAG,"Unsubscribing bindTolifecycle from onDestroy()"); } }) .compose(this.<Long>bindTolifecycle()) .subscribe(new Consumer<Long>() { @OverrIDe public voID accept(Long num) throws Exception { Log.d(TAG,"accept: " + num); } });
指定生命周期取消
Observable.interval(1,"Unsubscribing UbindUntilEvent from onPause()"); } }).compose(this.<Long>bindUntilEvent(ActivityEvent.PAUSE)) .subscribe(new Consumer<Long>() { @OverrIDe public voID accept(Long aLong) throws Exception { Log.d(TAG,"bindUntilEvent accept: " + aLong); } });
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。
总结以上是内存溢出为你收集整理的RxJava取消订阅的各种方式的实现全部内容,希望文章能够帮你解决RxJava取消订阅的各种方式的实现所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)