Map *** 作符号其实就是新生成一个 MapObservable来转换处理数据,然后将数据发射给 MapObserver ,待MapObserver数据处理好之后,才会最终调用自定义的 Observer对象,这个过程实际就是利用了Java里面多态的特性。
2:Map *** 作符源码 2.1 :MapObservable源码
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
emitter.onNext("one");
emitter.onNext("two");
emitter.onComplete();
}
}).map(new Function() {
@Override
public Integer apply(String s) throws Exception {
return Integer.valueOf(s);
}
}).subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe:" + d.toString());
}
@Override
public void onNext(Integer s) {
System.out.println("onNext:" + s);
}
@Override
public void onError(Throwable e) {
System.out.println("Throwable:" + e.getMessage());
}
@Override
public void onComplete() {
System.out.println("onComplete:");
}
});
2.2 Map *** 作符源码
public final Observable map(Function super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap(this, mapper));
}
2.3 将 Function 对象 mapper 通过 ObservableMap 传给 ObservableMap,并完成相应的赋值
public final class ObservableMap extends AbstractObservableWithUpstream {
final Function super T, ? extends U> function;
public ObservableMap(ObservableSource source, Function super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer super U> t) {
source.subscribe(new MapObserver(t, function));
}
static final class MapObserver extends BasicFuseableObserver {
final Function super T, ? extends U> mapper;
MapObserver(Observer super U> actual, Function super T, ? extends U> mapper) {
// 将观察者对象 actual 赋值给 downstream
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
// 调用 mapper.apply ,其实是自定义 Function 中的 apply 方法
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
// 最终调用 Observer 的 onNext 方法
downstream.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
....
}
}
2.4 :接下来的流程和 Rxjava 基本流程基本相同,执行产生订阅关系的方法 - subscribe 时调用 ObservableMap#subscribeActual:
@Override
public void subscribeActual(Observer super U> t) {
// 此处的 source 为调用 map *** 作符的 Observable,即上一步通过 create 创建的 Observable
(ObservableCreate)
source.subscribe(new MapObserver(t, function));
}
public final void subscribe(Observer super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook retu...);
// 由于多态的存在,此处的 subscribeActual 会调用 MapObservable 的subscribeActual
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't t
npe.initCause(e);
throw npe;
}
}
3: 总结
其中最重要的方法 subscribeActual 调用的为 ObservableCreate 的 subscribeActual 方法,接下来和基本流程一样会调用 ObservableCreate 的 subscribe 从而开启事件的分发,与 Rxjava 基本流程不同的是 map *** 作符构建了 MapObserver,完成 MapObserver 的相关 *** 作后,才会最终调用自定义的 Observer 对象。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)