虽然项目中有用到Rxjava但是从来没有花功夫研究过所以对这里一直是懵逼状态,面试的时候也是很恐惧这方面的提问,但是Rxjava线程切换原理一直是面试必问的问题,与其心存侥幸不如直面恐惧,到了2022年,终于对这个问题有了一个完整的理解,如有理解偏差,还请指出。
// 本次源码分析基于rxjava 3.0 // 使用最新的rxjava版本看这里 https://github.com/ReactiveX/RxAndroid dependencies { implementation 'io.reactivex.rxjava3:rxandroid:3.0.0' implementation 'io.reactivex.rxjava3:rxjava:3.0.0' }
先看看我们平时是怎么使用的?
public class MainActivity extends AppCompatActivity { @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); Observable.just("one", "two", "three", "four", "five") .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull String s) { Toast.makeText(MainActivity.this, s, Toast.LENGTH_LONG).show(); } @Override public void onError(@NonNull Throwable e) { } @Override public void onComplete() { } }); } }
上来先把结论图放一下,然后慢慢研究
- 1 Observable.just() → ObservableFromArray.java
- 2 subscribeOn() → ObservableSubscribeOn.java
- 3 ObservableSubscribeOn.java
- 4 observeOn() → ObservableObserveOn.java
- 5 subscribe() → ObservableObserveOn.java
- 6 ObservableObserveOn.ObserveOnObserver
- 7 ObservableSubscribeOn.subscribeActual
- 8 ObservableFromArray.java
- 9 ObservableSubscribeOn.SubscribeOnObserver
public static2 subscribeOn() → ObservableSubscribeOn.javaObservable just(@NonNull T item1, @NonNull T item2, @NonNull T item3, @NonNull T item4, @NonNull T item5) { //... 省略无关代码 return fromArray(item1, item2, item3, item4, item5); } public static Observable fromArray(@NonNull T... items) { // ...省略无关代码 // 返回的就是 ObservableFromArray return RxJavaPlugins.onAssembly(new ObservableFromArray<>(items)); } public static Observable onAssembly(@NonNull Observable source) { // ...省略无关代码 return source; }
// ObservableFromArray类并没有实现 subscribeOn 方法 而是交由其父类 Observable实现 public final Observable3 ObservableSubscribeOn.javasubscribeOn(@NonNull Scheduler scheduler) { // scheduler → Schedulers.newThread() → NEW_THREAD → new NewThreadTask() → NewThreadHolder.DEFAULT → new NewThreadScheduler() → new RxThreadFactory(THREAD_NAME_PREFIX, priority) 这是一个线程工厂 // this 是 ObservableFromArray对象 // 根据1 这里返回的其实是 ObservableSubscribeOn return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler)); }
public final class ObservableSubscribeOn4 observeOn() → ObservableObserveOn.javaextends AbstractObservableWithUpstream { final Scheduler scheduler; public ObservableSubscribeOn(ObservableSource source, Scheduler scheduler) { // source 就是 ObservableFromArray 对象 super(source); // scheduler 就是 RxThreadFactory this.scheduler = scheduler; } // .... }
public final Observable5 subscribe() → ObservableObserveOn.javaobserveOn(@NonNull Scheduler scheduler) { return observeOn(scheduler, false, bufferSize()); } public final Observable observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) { // scheduler → AndroidSchedulers.mainThread() → MAIN_THREAD → MainHolder.DEFAULT → new HandlerScheduler(new Handler(Looper.getMainLooper()), true) 这里的Handler是声明在主线程的,所以后面发消息可以发到主线程里面 // this 指的是 ObservableSubscribeOn对象 return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize)); }
public final class ObservableObserveOn6 ObservableObserveOn.ObserveonObserverextends AbstractObservableWithUpstream { final Scheduler scheduler; final boolean delayError; final int bufferSize; public ObservableObserveOn(ObservableSource source, Scheduler scheduler, boolean delayError, int bufferSize) { // source 是 ObservableSubscribeOn super(source); // scheduler 是 HandlerScheduler this.scheduler = scheduler; this.delayError = delayError; this.bufferSize = bufferSize; } // subscribe() 的真正实现是 subscribeActual @Override protected void subscribeActual(Observer super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); // w 这里是创建了一个 HandlerWorker(handler, async); // handler 是声明在主线程的,所以后面发消息可以发到主线程里面 // observer 是我们自己传入的用来d吐司的Observer // ObserveonObserver 是 ObservableObserveOn的成员内部类 source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize)); } } }
static final class ObserveOnObserverextends BasicIntQueueDisposable implements Observer , Runnable { // ... SimpleQueue queue; // 很重要, 是一个消息队列 ObserveOnObserver(Observer super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) { // downstream 是 是我们自己传入的用来d吐司的Observer this.downstream = actual; // worker 是 HandlerWorker(handler, async)进行跨线程通信的 this.worker = worker; this.delayError = delayError; this.bufferSize = bufferSize; } // 每次上游传入一个元素给到下游就要走onNext @Override public void onNext(T t) { if (done) { return; } // 元素入队 if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } schedule(); } // worker.schedule 就是发一个Runnable给主线程 // getAndIncrement 执行之后就不为0了所以只会发一个发一个Runnable给主线程 void schedule() { if (getAndIncrement() == 0) { worker.schedule(this); } } @Override public void run() { // 下面代码都会在主线程执行 if (outputFused) { drainFused(); } else { drainNormal(); } } void drainNormal() { int missed = 1; final SimpleQueue q = queue; // a 是我们自己传入的用来d吐司的Observer final Observer super T> a = downstream; for (;;) { if (checkTerminated(done, q.isEmpty(), a)) { return; } for (;;) { boolean d = done; T v; try { v = q.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); disposed = true; upstream.dispose(); q.clear(); a.onError(ex); worker.dispose(); return; } boolean empty = v == null; if (checkTerminated(d, empty, a)) { return; } if (empty) { break; } // 每次拿到一个数据就回调我们业务的onNext方法 a.onNext(v); } missed = addAndGet(-missed); if (missed == 0) { break; } } } }
我们知道了是如何拿到数据发送到主线程消费的,那么是在哪里产生?怎么传输的呢?
根据5 我们知道了
source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
沿着这条线去看看,其实source.subscribe回调的就是 ObservableSubscribeOn.subscribeActual
7 ObservableSubscribeOn.subscribeActualpublic final class ObservableSubscribeOn8 ObservableFromArray.javaextends AbstractObservableWithUpstream { final Scheduler scheduler; public ObservableSubscribeOn(ObservableSource source, Scheduler scheduler) { // source 就是 ObservableFromArray 对象 super(source); // scheduler 就是 RxThreadFactory this.scheduler = scheduler; } @Override public void subscribeActual(final Observer super T> observer) { // observer 是 ObserveOnObserver final SubscribeOnObserver parent = new SubscribeOnObserver<>(observer); observer.onSubscribe(parent); // scheduler.scheduleDirect(new SubscribeTask(parent)) 这行代码是关键 // scheduler 就是 RxThreadFactory, 这句代码相当于新开一个线程执行了 SubscribeTask parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); } final class SubscribeTask implements Runnable { private final SubscribeOnObserver parent; SubscribeTask(SubscribeOnObserver parent) { // parent 就是 SubscribeOnObserver this.parent = parent; } @Override public void run() { // SubscribeTask 其实执行的是 source的 subscribeActual,也就是 ObservableFromArray.subscribeActual // parent 就是 SubscribeOnObserver source.subscribe(parent); } }
public final class ObservableFromArray9 ObservableSubscribeOn.SubscribeonObserverextends Observable { final T[] array; public ObservableFromArray(T[] array) { this.array = array; } @Override public void subscribeActual(Observer super T> observer) { // observer 就是 SubscribeOnObserver FromArrayDisposable d = new FromArrayDisposable<>(observer, array); observer.onSubscribe(d); if (d.fusionMode) { return; } d.run(); } static final class FromArrayDisposable extends BasicQueueDisposable { final Observer super T> downstream; final T[] array; // ... FromArrayDisposable(Observer super T> actual, T[] array) { // downstream 就是 SubscribeonObserver this.downstream = actual; this.array = array; } void run() { T[] a = array; int n = a.length; for (int i = 0; i < n && !isDisposed(); i++) { T value = a[i]; if (value == null) { downstream.onError(new NullPointerException("The element at index " + i + " is null")); return; } // 子线程每产生一个数据就回调 SubscribeOnObserver的 onNext一次 // 这个 SubscribeonObserver 到底是什么呢? 我们再回到 ObservableSubscribeOn downstream.onNext(value); } if (!isDisposed()) { downstream.onComplete(); } } }
static final class SubscribeOnObserverextends AtomicReference implements Observer , Disposable { private static final long serialVersionUID = 8094547886072529208L; final Observer super T> downstream; final AtomicReference upstream; SubscribeOnObserver(Observer super T> downstream) { // 根据 7 downstream 就是这里的 ObserveOnObserver this.downstream = downstream; this.upstream = new AtomicReference<>(); } @Override public void onSubscribe(Disposable d) { DisposableHelper.setOnce(this.upstream, d); } @Override public void onNext(T t) { // 每次 ObservableFromArray中 onNext 代码回调都会执行 ObserveonObserver 中的onNext代码回调 downstream.onNext(t); } @Override public void onError(Throwable t) { downstream.onError(t); } @Override public void onComplete() { downstream.onComplete(); } @Override public void dispose() { DisposableHelper.dispose(upstream); DisposableHelper.dispose(this); } @Override public boolean isDisposed() { return DisposableHelper.isDisposed(get()); } void setDisposable(Disposable d) { DisposableHelper.setOnce(this, d); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)