当.repeat()接收到.onCompleted()事件后触发重订阅。
当.retry()接收到.onError()事件后触发重订阅。
参考:
[Android开发] RxJava2之路七 - 错误处理 *** 作符例子Demo
RX *** 作符之错误处理(catch[onErrorReturn、onErrorResumeNext、onExceptionResumeNext]、retry、retryWhen)
retry *** 作符重试的意思,拦截到错误,然后让 被观察者重新发射数据。Throwable和Exception都额可以拦截
它有五种参数方法:
retry()
: 让被观察者重新发射数据,要是一直错误就一直发送了
retry(BiPredicate)
: interger是第几次重新发送,Throwable是错误的内容
retry(long time)
: 最多让被观察者重新发射数据多少次
retry(long time,Predicate predicate)
: 最多让被观察者重新发射数据多少次,在predicate里面进行判断拦截 返回是否继续
retry(Predicate predicate)
: 在predicate里面进行判断拦截 返回是否继续
//public final Observable retry()
public void retryTest() {
Observable.create(getObservableOnSubscribe())
.subscribeOn(Schedulers.newThread())
.retry() //无限重试
.subscribe(getObserver());
}
//收到消息================ 0
//收到消息================ 1
//收到消息================ 0
//收到消息================ 1
//收到消息================ 0
//收到消息================ 1
//.........
//public final Observable retry(long times)
public void retryTest1() {
Observable.create(getObservableOnSubscribe())
.subscribeOn(Schedulers.newThread())
.retry(3) //最多让被观察者重新发射数据3次
.subscribe(getObserver());
}
//收到消息================ 0
//收到消息================ 1
//收到消息================ 0
//收到消息================ 1
//收到消息================ 0
//收到消息================ 1
//收到消息================ 0
//收到消息================ 1
//结果错误============= java.lang.Exception: 出现错误了
// public final Observable retry(Predicate super Throwable> predicate)
public void retryTest2() {
Observable.create(getObservableOnSubscribe())
.subscribeOn(Schedulers.newThread())
.retry(new Predicate<Throwable>() {
@Override
public boolean test(@NonNull Throwable throwable) throws Exception {
System.out.println("retry错误================= " + throwable.toString());
//返回假就是不让重新发射数据了,调用观察者的onError就终止了。
//返回真就是让被观察者重新发射请求
return true;
}
})
.subscribe(getObserver());
}
//收到消息================ 0
//收到消息================ 1
//retry错误================= java.lang.Exception: 出现错误了
//收到消息================ 0
//收到消息================ 1
//retry错误================= java.lang.Exception: 出现错误了
//收到消息================ 0
//收到消息================ 1
//retry错误================= java.lang.Exception: 出现错误了
//收到消息================ 0
//收到消息================ 1
//retry错误================= java.lang.Exception: 出现错误了
//...............
//public final Observable retry(long times, Predicate super Throwable> predicate)
public void retryTest3() {
Observable.create(getObservableOnSubscribe())
.subscribeOn(Schedulers.newThread())
.retry(3, new Predicate<Throwable>() {
@Override
public boolean test(@NonNull Throwable throwable) throws Exception {
System.out.println("retry错误============== " + throwable.toString());
//最多让被观察者重新发射数据3次,但是这里返回值可以进行处理
//返回假就是不让重新发射数据了,调用观察者的onError就终止了。
//返回真就是让被观察者重新发射请求
return true;
}
})
.subscribe(getObserver());
}
//收到消息================ 0
//收到消息================ 1
//retry错误============== java.lang.Exception: 出现错误了
//收到消息================ 0
//收到消息================ 1
//retry错误============== java.lang.Exception: 出现错误了
//收到消息================ 0
//收到消息================ 1
//retry错误============== java.lang.Exception: 出现错误了
//收到消息================ 0
//收到消息================ 1
//结果错误============= java.lang.Exception: 出现错误了
// public final Observable retry(BiPredicate super Integer, ? super Throwable> predicate)
public void retryTest4() {
Observable.create(getObservableOnSubscribe())
.subscribeOn(Schedulers.newThread())
.retry(new BiPredicate<Integer, Throwable>() {
@Override
public boolean test(@NonNull Integer integer, @NonNull Throwable throwable) {
System.out.println("retry错误========" + integer + " "
+ throwable.toString());
//返回假就是不让重新发射数据了,调用观察者的onError就终止了。
//返回真就是让被观察者重新发射请求
return true;
}
})
.subscribe(getObserver());
}
//收到消息================ 0
//收到消息================ 1
//retry错误========1 java.lang.Exception: 出现错误了
//收到消息================ 0
//收到消息================ 1
//retry错误========2 java.lang.Exception: 出现错误了
//收到消息================ 0
//收到消息================ 1
//retry错误========3 java.lang.Exception: 出现错误了
//收到消息================ 0
//收到消息================ 1
//retry错误========4 java.lang.Exception: 出现错误了
//...................
private ObservableOnSubscribe<String> getObservableOnSubscribe() {
return new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
for (int i = 0; i <= 3; i++) {
if (i == 2) {
e.onError(new Exception("出现错误了"));
} else {
e.onNext(i + "");
}
try {
Thread.sleep(1000);
} catch (Exception ex) {
ex.printStackTrace();
}
}
e.onComplete();
}
};
}
private Observer<String> getObserver() {
return new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("收到消息================ " + s);
}
@Override
public void onError(Throwable e) {
System.out.println("结果错误============= " + e.toString());
}
@Override
public void onComplete() {
}
};
}
retryUntil 测试:
public void retryUntil1() {
Observable.create(getObservableOnSubscribe())
.subscribeOn(Schedulers.newThread())
.retryUntil(new BooleanSupplier() {
@Override
public boolean getAsBoolean() throws Exception {
return false;//false 继续订阅
}
})
.subscribe(getObserver());
}
//收到消息================ 0
//收到消息================ 1
//收到消息================ 0
//收到消息================ 1
//收到消息================ 0
//收到消息================ 1
//..............
public void retryUntil2() {
Observable.create(getObservableOnSubscribe())
.subscribeOn(Schedulers.newThread())
.retryUntil(new BooleanSupplier() {
@Override
public boolean getAsBoolean() throws Exception {
return true;//true : 不在订阅
}
})
.subscribe(getObserver());
}
//收到消息================ 0
//收到消息================ 1
//结果错误============= java.lang.Exception: 出现错误了
retryWhen 测试:
//retryWhen和retry类似,
// 区别是:
// retryWhen将onError中的Throwable传递给一个函数,这个函数产生另一个Observable,
// retryWhen观察它的结果再决定是不是要重新订阅原始的Observable。
//如果这个Observable发射了一项数据,它就重新订阅,
// 如果这个Observable发射的是onError通知,它就将这个通知传递给观察者然后终止。
public void retryWhen1() {
Observable.create(getObservableOnSubscribe())
.subscribeOn(Schedulers.newThread())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable){
//这里可以发送新的被观察者 Observable
return throwableObservable
.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Throwable throwable){
//如果发射的onError就终止
return Observable.error(new Throwable("retryWhen终止啦"));
}
});
}
})
.subscribe(getObserver());
}
//收到消息================ 0
//收到消息================ 1
//结果错误============= java.lang.Throwable: retryWhen终止啦
public void retryWhen2() {
Observable.create(getObservableOnSubscribe())
.subscribeOn(Schedulers.newThread())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable){
//这里可以发送新的被观察者 Observable
return throwableObservable
.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Throwable throwable){
//如果发射的onError就终止,否则重新订阅
return Observable.just("再试试?");
}
});
}
})
.subscribe(getObserver());
}
//收到消息================ 0
//收到消息================ 1
//收到消息================ 0
//收到消息================ 1
//收到消息================ 0
//收到消息================ 1
//...........
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)