rxjava

rxjava,第1张

rxjava

是rxjava1.X的版本,现在有2.X了

        
        Observable observable = Observable.create(new Observable.OnSubscribe() {
            @Override
            //当 Observable被订阅的时候,Observable.OnSubscribe的call()方法会自动被调用,
            //事件序列就会依照设定依次触发。这样,由被观察者调用了观察者的回调方法,
            //就实现了由被观察者向观察者的事件传递
            public void call(Subscriber subscriber) {
                System.out.println("test call");
                //定义事件队列
                subscriber.onNext("test next");
                subscriber.onCompleted();
            }
        });

        
        Observer observer = new Observer() {
            //被观察者调用onCompleted时触发
            @Override
            public void onCompleted() {
                System.out.println("1");
            }
            //被观察者调用onError时触发
            @Override
            public void onError(Throwable e) {
                System.out.println("2");
            }
            //被观察者调用onNext时触发
            @Override
            public void onNext(String s) {
                System.out.println("3");
            }
        };

        
        observable.subscribe(observer);

        
        //只对事件序列中的onNext做出响应
        Action1 action1 = new Action1() {
            @Override
            public void call(String s) {
                System.out.println("action1--->onNext  "+s);
            }
        };
        //只对事件序列中的onError做出响应
        Action1 action11 = new Action1() {
            @Override
            public void call(Throwable throwable) {
                System.out.println("action11--->onError  "+throwable.getMessage());
            }
        };
        //只对事件序列中的onCompleted做出响应
        Action0 action0 = new Action0() {
            @Override
            public void call() {
                System.out.println("action11--->onCompleted  ");
            }
        };
        //订阅事件,只处理onNext事件
        observable.subscribe(action1);
        //订阅事件,处理onNext、onError和onCompleted事件
        observable.subscribe(action1,action11,action0);

        
        //just( ),将为你创建一个Observable并自动为你调用onNext( )发射数据
        observable =  Observable.just("hello");

        //from()方法将传入的数组或Iterable拆分成具体对象后,订阅之后自动调用onNext方法依次发送,
        // 再发送结束后发送onCompleted结束整个事件
        observable = Observable.from(new ArrayList<>());

        //创建一个按固定时间间隔发射整数序列的Observable,可用作定时器。订阅之后即按照固定时长调用onNext()方法
        Observable observablelong = Observable.interval(1, TimeUnit.SECONDS);
        observablelong.subscribe(new Action1() {
            @Override
            public void call(Long aLong) {
                System.out.println("时长"+aLong);
            }
        });

        //timer()该方法可以在订阅之后延迟发送特定事件
        observablelong = Observable.timer(2000,TimeUnit.MILLISECONDS);

        
        // map将被观察者发送的事件转换为任意类型的事件
        Integer[] ints = {1, 2, 3};
        //map方法把Intger类型转换为String类型
        Observable.from(ints).map(new Func1() {
            @Override
            public String call(Integer i) {
                //对Integer数据进行处理,转换成String类型返回
                return i + "i";
            }
        }).subscribe(new Action1() {
            @Override
            public void call(String s) {
                System.out.println(s+"   mapamap");
            }
        });
        //FlatMap *** 作符将被观察者发送的多个事件进行拆分,分别转换为不同的事件,多线程下顺序不一定
        Observable.just("1,2,3").flatMap(new Func1>() {
            @Override
            public Observable call(String s) {
                String[] split = s.split(",");

                return  Observable.from(split);
            }
        }).subscribe(new Observer() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Object o) {
                System.out.println(o);
            }
        });
        //ConcatMap *** 作符,和FlatMap *** 作符原理一样,
        区别在于 FlatMap把事件拆分后在发送时,顺序可能和被观察者发送的事件顺序不一样,
        而ConcatMap *** 作符处理后的事件顺序和被观察者发送事件的顺序是一样的
        
        //buffer *** 作符将源Observable变换为一个新的Observable,这个新的Observable每次发射一组列表值而不是一个一个发射
        Observable.just(1,2,3,4).buffer(3);
        //filter() *** 作符加入逻辑判断,返回false表示数据需要被过滤。最后过滤出的数据将加入到新的Observable对象中

        
        Observable.just(1)
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.computation());
					
										


					

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5607544.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)