Rxjava线程切换原理终于在2022年有了答案。

Rxjava线程切换原理终于在2022年有了答案。,第1张

Rxjava线程切换原理终于在2022年有了答案。

虽然项目中有用到Rxjava但是从来没有花功夫研究过所以对这里一直是懵逼状态,面试的时候也是很恐惧这方面的提问,但是Rxjava线程切换原理一直是面试必问的问题,与其心存侥幸不如直面恐惧,到了2022年,终于对这个问题有了一个完整的理解,如有理解偏差,还请指出。

//  本次源码分析基于rxjava 3.0 
//  使用最新的rxjava版本看这里 https://github.com/ReactiveX/RxAndroid
dependencies {
    implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
    implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
}

先看看我们平时是怎么使用的?

public class MainActivity extends AppCompatActivity {

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        Observable.just("one", "two", "three", "four", "five")
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull String s) {
                        Toast.makeText(MainActivity.this, s, Toast.LENGTH_LONG).show();
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

    }
}

上来先把结论图放一下,然后慢慢研究


文章目录
  • 1 Observable.just() → ObservableFromArray.java
  • 2 subscribeOn() → ObservableSubscribeOn.java
  • 3 ObservableSubscribeOn.java
  • 4 observeOn() → ObservableObserveOn.java
  • 5 subscribe() → ObservableObserveOn.java
  • 6 ObservableObserveOn.ObserveOnObserver
  • 7 ObservableSubscribeOn.subscribeActual
  • 8 ObservableFromArray.java
  • 9 ObservableSubscribeOn.SubscribeOnObserver

1 Observable.just() → ObservableFromArray.java
public static  Observable just(@NonNull T item1, @NonNull T item2, @NonNull T item3, @NonNull T item4, @NonNull T item5) {
    //... 省略无关代码
    return fromArray(item1, item2, item3, item4, item5);
}

 public static  Observable fromArray(@NonNull T... items) {
        // ...省略无关代码
        // 返回的就是 ObservableFromArray
        return RxJavaPlugins.onAssembly(new ObservableFromArray<>(items));
 }
  public static  Observable onAssembly(@NonNull Observable source) {
        // ...省略无关代码
        return source;
    }
2 subscribeOn() → ObservableSubscribeOn.java
//  ObservableFromArray类并没有实现 subscribeOn 方法  而是交由其父类 Observable实现
  
 public final Observable subscribeOn(@NonNull Scheduler scheduler) {
         //  scheduler → Schedulers.newThread() →  NEW_THREAD →  new NewThreadTask() → NewThreadHolder.DEFAULT → new NewThreadScheduler() → new RxThreadFactory(THREAD_NAME_PREFIX, priority) 这是一个线程工厂
        //   this 是  ObservableFromArray对象
        // 根据1   这里返回的其实是 ObservableSubscribeOn
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
 }
3 ObservableSubscribeOn.java
public final class ObservableSubscribeOn extends AbstractObservableWithUpstream {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource source, Scheduler scheduler) {
        // source 就是 ObservableFromArray 对象
        super(source);
        // scheduler 就是 RxThreadFactory
        this.scheduler = scheduler;
    }
    // ....
}
4 observeOn() → ObservableObserveOn.java
public final Observable observeOn(@NonNull Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}

  public final Observable observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
       //  scheduler  →  AndroidSchedulers.mainThread()  → MAIN_THREAD → MainHolder.DEFAULT → new HandlerScheduler(new Handler(Looper.getMainLooper()), true)   这里的Handler是声明在主线程的,所以后面发消息可以发到主线程里面
       // this 指的是 ObservableSubscribeOn对象
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));
    }
5 subscribe() → ObservableObserveOn.java
public final class ObservableObserveOn extends AbstractObservableWithUpstream {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource source, Scheduler scheduler, boolean delayError, int bufferSize) {
        // source 是 ObservableSubscribeOn
        super(source);
        // scheduler 是 HandlerScheduler
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }


    // subscribe()  的真正实现是 subscribeActual
    @Override
    protected void subscribeActual(Observer observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
            // w  这里是创建了一个 HandlerWorker(handler, async);
            // handler 是声明在主线程的,所以后面发消息可以发到主线程里面
            // observer 是我们自己传入的用来d吐司的Observer
            // ObserveonObserver 是 ObservableObserveOn的成员内部类
            source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
        }
    }
}
6 ObservableObserveOn.ObserveonObserver
 static final class ObserveOnObserver extends BasicIntQueueDisposable
    implements Observer, Runnable {
     // ...
     SimpleQueue queue; //  很重要, 是一个消息队列
     ObserveOnObserver(Observer actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            // downstream 是 是我们自己传入的用来d吐司的Observer
            this.downstream = actual;
            // worker 是 HandlerWorker(handler, async)进行跨线程通信的
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
     
      // 每次上游传入一个元素给到下游就要走onNext
      @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            // 元素入队
            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }

        // worker.schedule 就是发一个Runnable给主线程
        // getAndIncrement 执行之后就不为0了所以只会发一个发一个Runnable给主线程
        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }

         @Override
        public void run() {
             // 下面代码都会在主线程执行  
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }


         void drainNormal() {
            int missed = 1;

            final SimpleQueue q = queue;
             // a 是我们自己传入的用来d吐司的Observer
            final Observer a = downstream;

            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }

                for (;;) {
                    boolean d = done;
                    T v;

                    try {
                        
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        disposed = true;
                        upstream.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    boolean empty = v == null;

                    if (checkTerminated(d, empty, a)) {
                        return;
                    }

                    if (empty) {
                        break;
                    }
                    // 每次拿到一个数据就回调我们业务的onNext方法
                    a.onNext(v);
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }
    }

我们知道了是如何拿到数据发送到主线程消费的,那么是在哪里产生?怎么传输的呢?
根据5 我们知道了

source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));

沿着这条线去看看,其实source.subscribe回调的就是 ObservableSubscribeOn.subscribeActual

7 ObservableSubscribeOn.subscribeActual
public final class ObservableSubscribeOn extends AbstractObservableWithUpstream {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource source, Scheduler scheduler) {
        // source 就是 ObservableFromArray 对象
        super(source);
        // scheduler 就是 RxThreadFactory
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer observer) {
        // observer 是 ObserveOnObserver
        final SubscribeOnObserver parent = new SubscribeOnObserver<>(observer);
   
        observer.onSubscribe(parent);
        // scheduler.scheduleDirect(new SubscribeTask(parent))  这行代码是关键
        // scheduler 就是 RxThreadFactory, 这句代码相当于新开一个线程执行了 SubscribeTask
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver parent;

        SubscribeTask(SubscribeOnObserver parent) {
            // parent 就是 SubscribeOnObserver
            this.parent = parent;
        }

        @Override
        public void run() {
             // SubscribeTask 其实执行的是 source的 subscribeActual,也就是 ObservableFromArray.subscribeActual
             // parent 就是 SubscribeOnObserver
            source.subscribe(parent);
        }
    }
8 ObservableFromArray.java
public final class ObservableFromArray extends Observable {
    final T[] array;
    public ObservableFromArray(T[] array) {
        this.array = array;
    }
     
    @Override
    public void subscribeActual(Observer observer) {
         // observer 就是 SubscribeOnObserver
        FromArrayDisposable d = new FromArrayDisposable<>(observer, array);

        observer.onSubscribe(d);

        if (d.fusionMode) {
            return;
        }

        d.run();
    }

   static final class FromArrayDisposable extends BasicQueueDisposable {

        final Observer downstream;

        final T[] array;
        // ...
        FromArrayDisposable(Observer actual, T[] array) {
             // downstream 就是 SubscribeonObserver 
            this.downstream = actual;
            this.array = array;
        }
 
        void run() {
            T[] a = array;
            int n = a.length;

            for (int i = 0; i < n && !isDisposed(); i++) {
                T value = a[i];
                if (value == null) {
                    downstream.onError(new NullPointerException("The element at index " + i + " is null"));
                    return;
                }
                // 子线程每产生一个数据就回调 SubscribeOnObserver的 onNext一次
                // 这个 SubscribeonObserver 到底是什么呢?  我们再回到 ObservableSubscribeOn
                downstream.onNext(value);
            }
            if (!isDisposed()) {
                downstream.onComplete();
            }
        }
}
9 ObservableSubscribeOn.SubscribeonObserver
   static final class SubscribeOnObserver extends AtomicReference implements Observer, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        final Observer downstream;

        final AtomicReference upstream;

        SubscribeOnObserver(Observer downstream) {
            // 根据 7  downstream 就是这里的  ObserveOnObserver
            this.downstream = downstream;
            this.upstream = new AtomicReference<>();
        }

        @Override
        public void onSubscribe(Disposable d) {
            DisposableHelper.setOnce(this.upstream, d);
        }

        @Override
        public void onNext(T t) {
            // 每次 ObservableFromArray中 onNext 代码回调都会执行  ObserveonObserver 中的onNext代码回调
            downstream.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            downstream.onError(t);
        }

        @Override
        public void onComplete() {
            downstream.onComplete();
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(upstream);
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5696375.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存