Rxjava从入门到精通:Rxjava转换 *** 作符之 Map(), flatMap() ,,concatmap(),compose

Rxjava从入门到精通:Rxjava转换 *** 作符之 Map(), flatMap() ,,concatmap(),compose,第1张

1: Map() *** 作符

直接对发射处理的事件进行处理后产生新的事件后,再次发射

下面的代码可以看到:map直接对 just() *** 作符发射出来的事件进行拼接后,再次发射

private static void testMap() {
        Observable.just("aaa")
                .map(new Function() {
                    @Override
                    public Object apply(String s) throws Throwable {
                        return s+" + bbb";
                    }
                }).subscribe(new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("use Subscribe connect Observable and Observer");
            }

            @Override
            public void onNext(Object o) {
                System.out.println("Next event:" + o + " response");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
 }


打印结果:

use Subscribe connect Observable and Observer
Next event:aaa + bbb response 
   2: flatMap()  *** 作符      

适应场景:与比如多次网络请求

拿到上一个被观察者事件后,再创建一个新的被观察者,新的被观察者再次发射事件,被最终的Observer消费


    /**
     *  适应场景:与比如多次网络请求
     *  拿到上一个被观察者事件后,再创建一个新的被观察者,新的被观察者再次发射事件
     */
    private static void testFlatMap() {
        Observable.just("register")
                .flatMap(new Function>() {
                    @Override
                    public ObservableSource apply(String s) throws Throwable {
                        System.out.println(s + "成功");
                        return Observable.just("login");
                    }
                }).subscribe(new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("use Subscribe connect Observable and Observer");
            }

            @Override
            public void onNext(Object o) {
                System.out.println("Next event:" + o + " 成功");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

    }

打印结果:
  register成功
  use Subscribe connect Observable and Observer
  Next event:login 成功 
3: ConactMap()  *** 作符 

conactMap() *** 作符相比 flatMap() *** 作符,它对事件的处理是顺序的

private static void testConcatMap() {
        Observable.just("111","222","3333","4444")
                .concatMap(new Function>() {
                    @Override
                    public ObservableSource apply(String s) throws Throwable {
                        System.out.println("just被观察者事件" + s);
                        return Observable.just(s);
                    }
                }).subscribe(new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("use Subscribe connect Observable and Observer");
            }

            @Override
            public void onNext(Object o) {
                System.out.println("Next event:" + o + " response");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

    }


打印结果:
use Subscribe connect Observable and Observer
just被观察者事件111
Next event:111 response
just被观察者事件222
Next event:222 response
just被观察者事件3333
Next event:3333 response
just被观察者事件4444
Next event:4444 response 
4:compose() *** 作符 

简而言之可以叫做代码复用 *** 作符, 比如将 发射线程和消费线程通过transformer来复用

public class SchedulerTransformr implements ObservableTransformer{

        @Override
        public @NonNull ObservableSource apply(Observable upstream) {
            return upstream.subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());
        }
}


private void testCompose() {
        Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter emitter) throws Throwable {
                System.out.println("subscribe 开始请求数据");
                Thread.sleep(5000);
                System.out.println("5s 后 subscribe 请求数据结束: "+ Thread.currentThread().getName());
                emitter.onNext("数据请求成功并发射:" );
                emitter.onComplete();
            }
        }).compose(new SchedulerTransformr<>())
                .subscribe(new Observer() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("use Subscribe connect Observable and Observer");
                    }

                    @Override
                    public void onNext( Object o) {
                        System.out.println("消费发射的数据: "+ Thread.currentThread().getName());
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
    }


打印结果:

I/System.out: use Subscribe connect Observable and Observer
I/System.out: subscribe 开始请求数据
I/System.out: 5s 后 subscribe 请求数据结束: RxCachedThreadScheduler-1
I/System.out: 消费发射的数据: main 

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/724480.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-26
下一篇 2022-04-26

发表评论

登录后才能评论

评论列表(0条)