Rxjava从入门到精通:手写Rxjava框架(一):理论知识梳理,源码分析

Rxjava从入门到精通:手写Rxjava框架(一):理论知识梳理,源码分析,第1张

1:Rxjava需要达成的共识两种设计模式 1.1:观察者模式:实现响应式编程的基础

观察者模式

1.2:装饰器模式:各种 *** 作符的具体实现类都通过装饰器模式类拓展完成

装饰器模式框架图

2:Rxjava核心框架核心部分

ObservableSource : 被观察者的顶层接口,提供订阅subsccribe()方法

Observable: 被观察者抽象类,实现ObservableSource的接口,并提供实际订阅的抽象方法。

Observer : 观察者接口,提供处理事件的回调方法。

ObservableOnSubscribe:被观察者与事件解耦的接口

Emitter : 事件发射的接口,提供发射事件的方法。

ObservableXXX: 具体的被观察者实现类,持有ObservableOnSubscribe接口的引用

XXXEmitter : 事件发射器具体实现,持有观察者引用。

XXXObserver : 具体观察者的实现类。

AbstractObservableWithUpStream: 被观察者的抽象装饰类,持有了顶层接口的引用,都是通过继承该抽象类来实现各种 *** 作符的被观察者 。

 

2.1 源码分析create() *** 作符
private static void testCreate() {
        Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter emitter) throws Throwable {
                for (int i = 0; i < 10; i++) {
                    System.out.println("emitter发射value数据:" + i);
                    emitter.onNext("value=" + i);
                }
                emitter.onComplete();
            }
        }).subscribe(new Consumer() {
            @Override
            public void accept(Object o) throws Throwable {
                System.out.println(o);
            }
        });
}
 
2.2  Observable & ObservableSource & ObservableOnSubscribe等 

ObservableSource : 被观察者的顶层接口,提供订阅subsccribe()方法

Observable: 被观察者抽象类,实现ObservableSource的接口,并提供实际订阅的抽象方法。

ObservableOnSubscribe :被观察者与事件发送解耦的接口

Observer: 观察者接口,提供处理事件的回调方法。可以在此接口的onSubscribe()函数来控制被观察者的事件发送后,观察者能否被消费

ObservableXXX: 具体的被观察者实现类,持有ObservableOnSubscribe接口的引用

Observable.java


Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter emitter) throws Throwable {
                for (int i = 0; i < 10; i++) {
                    System.out.println("步骤一 :emitter发射value数据:" + i);
                    emitter.onNext("value=" + i);
                }
                emitter.onComplete();
            }
        }).subscribe(new Consumer() {
            @Override
            public void accept(Object o) throws Throwable {
                System.out.println("步骤二消费事件:" + o);
            }
});


//  ObservableOnSubscribe:被观察者与事件解耦的接口
public static  Observable create(@NonNull ObservableOnSubscribe source) {
        Objects.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}


ObservableCreate.java 

// 1: 构造函数保存 ObservableOnSubscribe
public ObservableCreate(ObservableOnSubscribe source) {
        this.source = source;
}

    
@Override
protected void subscribeActual(Observer observer) {
     CreateEmitter parent = new CreateEmitter<>(observer);

     // 2: 先回调Observer的onSubscribe()函数
     observer.onSubscribe(parent);

     try {

         // 3:  ObservableOnSubscribe再 发射事件
         source.subscribe(parent);

     } catch (Throwable ex) {
         Exceptions.throwIfFatal(ex);
         parent.onError(ex);
     }
}
 

Emitter : 事件发射的接口,提供发射事件的方法。

ObservableXXX: 具体的被观察者实现类,持有ObservableOnSubscribe接口的引用

XXXEmitter : 事件发射器具体实现,持有观察者引用。

XXXObserver : 具体观察者的实现类。

ObservableCreate.java

static final class CreateEmitter
    extends AtomicReference
    implements ObservableEmitter, Disposable {

        private static final long serialVersionUID = -3434801548987643227L;

        final Observer observer;

        // 1: CreateEmitter 持有下游 Observer的引用
        CreateEmitter(Observer observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {

                onError(ExceptionHelper.createNullPointerException("onNext called with a 
                      null value."));

                return;
            }

            // 2: 根据下游的 Observer 的onSubscribe()函数判断是否取消发射,决定Observer是 
              否调用 observer.onNext函数
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }



 Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter emitter) throws Throwable {
                for (int i = 0; i < 10; i++) {
                    System.out.println("步骤一 :emitter发射value数据:" + i);

                    // 0 : 上游被观察者传递过来的 emitter
                    emitter.onNext("value=" + i);
                }
                emitter.onComplete();
            }
        }).subscribe(new Consumer() {
            @Override
            public void accept(Object o) throws Throwable {
                System.out.println("步骤二消费事件:" + o);
            }
        }); 
2.2 : map() *** 作符源码分析 
 /**
     *  直接对发射出来的事件进行处理并且产生新的事件,然后再次发射
     */
    private static void testMap() {
        Observable.just("aaa")
                .map(new Function() {
                    @Override
                    public Object apply(String s) throws Throwable {
                        System.out.println("步骤二: "+"事件转换之后再次发射");
                        return s+" + bbb";
                    }
                }).subscribe(new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("步骤一:"+ "use Subscribe connect Observable and Observer");
            }

            @Override
            public void onNext(Object o) {
                System.out.println("步骤三: "+"Next event:" + o + " response");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
    } 
2.2.1:  Observable.just()  生产:ObservableJust类,并在发射事件时调用subscribeAcutal函数 
ObservableJust.java

public final class ObservableJust extends Observable implements ScalarSupplier {

    private final T value;
    public ObservableJust(final T value) {
        this.value = value;
    }

    // Observer 是下游的最后一个 observer

    // subscribeActual 是 抽象类 Observable的具体实现类ObservableJust,它会在
   // Observable的 subscribe()中被回调

    @Override
    protected void subscribeActual(Observer observer) {
        ScalarDisposable sd = new ScalarDisposable<>(observer, value);
        observer.onSubscribe(sd);
        sd.run();
    }

    @Override
    public T get() {
        return value;
    }
}



Observable.java

public final void subscribe(@NonNull Observer observer) {
        Objects.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);
            
            // 利用java多态的特性,直接调用 ObservableJust.java中的 subscribeActual
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }
2.2.2 :被观察者通过 ObservableScalarXMap.ScalarDisposable.run()发射事件
ObservableScalarXMap.ScalarDisposable.java

@Override
public void run() {
     if (get() == START && compareAndSet(START, ON_NEXT)) {

           // 持有下游的 Observer的引用,直接消费事件
           observer.onNext(value);
           if (get() == ON_NEXT) {
                 lazySet(ON_COMPLETE);
                 observer.onComplete();
           }
      }
}

      

2.3 :flatMap() *** 作符源码分析 2.4:compose() *** 作符源码分析 2.5:subscribeOn()  , observerOn() 源码分析

3:观察者模式 3.1 :观察者模式:

是指多个对象间存在一对多的依赖关系

3.2 : Rxjava中使用的观察者模式 4:装饰器模式: 4.1 : 装饰器模式

是指在不改变现有对象结构的情况下,动态地给该对象增加一些职责(即增加其额外的功能)的模式,它属于对象结构型模式

4.2 :Rxjava中使用的装饰器模式 5: Rxjava核心部分框架

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/734013.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-27
下一篇 2022-04-27

发表评论

登录后才能评论

评论列表(0条)