观察者模式
1.2:装饰器模式:各种 *** 作符的具体实现类都通过装饰器模式类拓展完成装饰器模式框架图
2:Rxjava核心框架核心部分ObservableSource : 被观察者的顶层接口,提供订阅subsccribe()方法
Observable: 被观察者抽象类,实现ObservableSource的接口,并提供实际订阅的抽象方法。
Observer : 观察者接口,提供处理事件的回调方法。
ObservableOnSubscribe:被观察者与事件解耦的接口
Emitter : 事件发射的接口,提供发射事件的方法。
ObservableXXX: 具体的被观察者实现类,持有ObservableOnSubscribe接口的引用
XXXEmitter : 事件发射器具体实现,持有观察者引用。
XXXObserver : 具体观察者的实现类。
AbstractObservableWithUpStream: 被观察者的抽象装饰类,持有了顶层接口的引用,都是通过继承该抽象类来实现各种 *** 作符的被观察者 。
2.1 源码分析create() *** 作符
private static void testCreate() {
Observable.create(new ObservableOnSubscribe
2.2 Observable & ObservableSource & ObservableOnSubscribe等
ObservableSource : 被观察者的顶层接口,提供订阅subsccribe()方法
Observable: 被观察者抽象类,实现ObservableSource的接口,并提供实际订阅的抽象方法。
ObservableOnSubscribe :被观察者与事件发送解耦的接口
Observer: 观察者接口,提供处理事件的回调方法。可以在此接口的onSubscribe()函数来控制被观察者的事件发送后,观察者能否被消费
ObservableXXX: 具体的被观察者实现类,持有ObservableOnSubscribe接口的引用
Observable.java
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Throwable {
for (int i = 0; i < 10; i++) {
System.out.println("步骤一 :emitter发射value数据:" + i);
emitter.onNext("value=" + i);
}
emitter.onComplete();
}
}).subscribe(new Consumer() {
@Override
public void accept(Object o) throws Throwable {
System.out.println("步骤二消费事件:" + o);
}
});
// ObservableOnSubscribe:被观察者与事件解耦的接口
public static Observable create(@NonNull ObservableOnSubscribe source) {
Objects.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}
ObservableCreate.java
// 1: 构造函数保存 ObservableOnSubscribe
public ObservableCreate(ObservableOnSubscribe source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer super T> observer) {
CreateEmitter parent = new CreateEmitter<>(observer);
// 2: 先回调Observer的onSubscribe()函数
observer.onSubscribe(parent);
try {
// 3: ObservableOnSubscribe再 发射事件
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
Emitter : 事件发射的接口,提供发射事件的方法。
ObservableXXX: 具体的被观察者实现类,持有ObservableOnSubscribe接口的引用
XXXEmitter : 事件发射器具体实现,持有观察者引用。
XXXObserver : 具体观察者的实现类。
ObservableCreate.java
static final class CreateEmitter
extends AtomicReference
implements ObservableEmitter, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer super T> observer;
// 1: CreateEmitter 持有下游 Observer的引用
CreateEmitter(Observer super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(ExceptionHelper.createNullPointerException("onNext called with a
null value."));
return;
}
// 2: 根据下游的 Observer 的onSubscribe()函数判断是否取消发射,决定Observer是
否调用 observer.onNext函数
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Throwable {
for (int i = 0; i < 10; i++) {
System.out.println("步骤一 :emitter发射value数据:" + i);
// 0 : 上游被观察者传递过来的 emitter
emitter.onNext("value=" + i);
}
emitter.onComplete();
}
}).subscribe(new Consumer() {
@Override
public void accept(Object o) throws Throwable {
System.out.println("步骤二消费事件:" + o);
}
});
2.2 : map() *** 作符源码分析
/**
* 直接对发射出来的事件进行处理并且产生新的事件,然后再次发射
*/
private static void testMap() {
Observable.just("aaa")
.map(new Function() {
@Override
public Object apply(String s) throws Throwable {
System.out.println("步骤二: "+"事件转换之后再次发射");
return s+" + bbb";
}
}).subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("步骤一:"+ "use Subscribe connect Observable and Observer");
}
@Override
public void onNext(Object o) {
System.out.println("步骤三: "+"Next event:" + o + " response");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
2.2.1: Observable.just() 生产:ObservableJust类,并在发射事件时调用subscribeAcutal函数
ObservableJust.java
public final class ObservableJust extends Observable implements ScalarSupplier {
private final T value;
public ObservableJust(final T value) {
this.value = value;
}
// Observer 是下游的最后一个 observer
// subscribeActual 是 抽象类 Observable的具体实现类ObservableJust,它会在
// Observable的 subscribe()中被回调
@Override
protected void subscribeActual(Observer super T> observer) {
ScalarDisposable sd = new ScalarDisposable<>(observer, value);
observer.onSubscribe(sd);
sd.run();
}
@Override
public T get() {
return value;
}
}
Observable.java
public final void subscribe(@NonNull Observer super T> observer) {
Objects.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
// 利用java多态的特性,直接调用 ObservableJust.java中的 subscribeActual
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
2.2.2 :被观察者通过 ObservableScalarXMap.ScalarDisposable.run()发射事件
ObservableScalarXMap.ScalarDisposable.java
@Override
public void run() {
if (get() == START && compareAndSet(START, ON_NEXT)) {
// 持有下游的 Observer的引用,直接消费事件
observer.onNext(value);
if (get() == ON_NEXT) {
lazySet(ON_COMPLETE);
observer.onComplete();
}
}
}
2.3 :flatMap() *** 作符源码分析 2.4:compose() *** 作符源码分析 2.5:subscribeOn() , observerOn() 源码分析 3:观察者模式 3.1 :观察者模式:
是指多个对象间存在一对多的依赖关系
3.2 : Rxjava中使用的观察者模式 4:装饰器模式: 4.1 : 装饰器模式是指在不改变现有对象结构的情况下,动态地给该对象增加一些职责(即增加其额外的功能)的模式,它属于对象结构型模式
4.2 :Rxjava中使用的装饰器模式 5: Rxjava核心部分框架欢迎分享,转载请注明来源:内存溢出
评论列表(0条)