RxJava2.0----事件流操作符Observable Ut

2017-11-19  本文已影响300人  Calllanna

6.事件流操作符Observable Utility Operators

A toolbox of useful Operators for working with Observables
● Delay
● Do
● Materialize/Dematerialize
● Serialize
● TimeInterval
● Timeout
● Timestamp
● Using
● To
● Retry
● cache
● cast
● compese

● Delay
将一个事件流里的数据源全部都延时发送。

● Do
在观察者订阅前,接收数据前后,完成接收前后,事件流过程中发生错误后,事件流结束前后等回调被观察者通知
doAfterTerminate doOnComplete doOnDispose doOnEach doOnError doOnLifecycle doOnNext doOnSubscribe doOnTerminate onTerminateDetach

  private Observer observer = new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            print("onSubscribe ");
        }

        @Override
        public void onNext(Integer integer) {
            print("onNext "+integer);
        }

        @Override
        public void onError(Throwable e) {
            print("onError "+e.getMessage());
        }

        @Override
        public void onComplete() {
            print("onComplete " );
        }
    };
    private Observable getObservable(final boolean isError){
        return  Observable.just(1,2,3,4,5)
                .doOnSubscribe(new Consumer<Disposable>() {
                    @Override
                    public void accept(Disposable disposable) throws Exception {
                        print("doOnSubscribe ");
                    }
                })
                .doOnEach(new Consumer<Notification<Integer>>() {
                    @Override
                    public void accept(Notification<Integer> integerNotification) throws Exception {
                        print("doOnEach :"+integerNotification);
                    }
                }).doAfterNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        print("doAfterNext : "+integer  );
                        if(isError && integer == 3){
                            throw new Exception("There is a Error!!");
                        }
                    }
                }).doAfterTerminate(new Action() {
                    @Override
                    public void run() throws Exception {
                        print("doAfterTerminate : "  );
                    }
                }).doOnComplete(new Action() {
                    @Override
                    public void run() throws Exception {
                        print("doOnComplete : "  );
                    }
                }).doFinally(new Action() {
                    @Override
                    public void run() throws Exception {
                        print("doFinally : "  );
                    }
                }).doOnDispose(new Action() {
                    @Override
                    public void run() throws Exception {
                        print("doOnDispose : "  );
                    }
                }).doOnTerminate(new Action() {
                    @Override
                    public void run() throws Exception {
                        print("doOnTerminate : "  );
                    }
                }).onTerminateDetach()
                .doOnError(new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        print("doOnError : "  );
                    }
                }).doOnLifecycle(new Consumer<Disposable>() {
                    @Override
                    public void accept(Disposable disposable) throws Exception {
                        print("doOnLifecycle : accept"  );
                    }
                }, new Action() {
                    @Override
                    public void run() throws Exception {
                        print("doOnLifecycle Action : "  );
                    }
                });
    }
    private void doAct1() {
        //需要引入RxJava 1.0
        //-------------buffer operator------
        tx_console.setText("Do");
        getObservable(false).subscribe(observer);
Log.d(" ", "  =================");
        getObservable(true).subscribe(observer);
}
输出结果:
  doOnSubscribe
          doOnLifecycle : accept
          onSubscribe
          doOnEach :OnNextNotification[1]
          onNext 1
          doAfterNext : 1
          doOnEach :OnNextNotification[2]
          onNext 2
          doAfterNext : 2
          doOnEach :OnNextNotification[3]
          onNext 3
          doAfterNext : 3
          doOnEach :OnNextNotification[4]
          onNext 4
          doAfterNext : 4
          doOnEach :OnNextNotification[5]
          onNext 5
          doAfterNext : 5
          doOnEach :OnCompleteNotification
          doOnComplete :
          doOnTerminate :
          onComplete
          doFinally :
          doAfterTerminate :
================================

          doOnSubscribe
          doOnLifecycle : accept
          onSubscribe
          doOnEach :OnNextNotification[1]
          onNext 1
          doAfterNext : 1
          doOnEach :OnNextNotification[2]
          onNext 2
          doAfterNext : 2
          doOnEach :OnNextNotification[3]
          onNext 3
          doAfterNext : 3
          doOnTerminate :
          doOnError :
          onError There is a Error!!
          doFinally :
          doAfterTerminate :

● Materialize/Dematerialize
Materialize返回一个被观察者对象,该对象发射源数据的所有数据,以及通知,每一项item通过一个标记类Notification封装源数据以及通知。Dematerialize 则和materialize功能相反。



 Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("aaaa");
                e.onNext("bbbb");
                e.onNext("cccc");
                e.onComplete();
            }
        }).materialize()
                .map(new Function<Notification<String>, Notification<String>>() {
            @Override
            public Notification<String> apply(Notification<String> stringNotification) throws Exception {
                print("materialize:"+stringNotification +"--->getValue:"+stringNotification.getValue()
                        +"--->isOnComplete:"+stringNotification.isOnComplete()
                        +"--->isOnError:"+stringNotification.isOnError() );
                return stringNotification;
            }
        }).dematerialize().subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Exception {
                print("dematerialize:"+o.toString());//
            }
        });
输出结果
         materialize:OnNextNotification[aaaa]--->getValue:aaaa--->isOnComplete:false--->isOnError:false
           materialize:OnNextNotification[bbbb]--->getValue:bbbb--->isOnComplete:false--->isOnError:false
           materialize:OnNextNotification[cccc]--->getValue:cccc--->isOnComplete:false--->isOnError:false
           materialize:OnCompleteNotification--->getValue:null--->isOnComplete:true--->isOnError:false
           dematerialize:aaaa
           dematerialize:bbbb
           dematerialize:cccc

● Serialize
当ObservalbeSource数据源是从不同线程回调观察者(发射数据),那么极有可能出现其中一个线程调用观察者的onComplete()或则onError()发生在另一个线程调用onNext()之前,或则两个线程同时第调用观察者的onNext(),而Serialize 操作是给观察者的回调添加同步锁synchronized,来确保Observalbe对其观察者进行序列化的调用.

● TimeInterval
返回上级数据源每个数据从接收到发送的时间间隔的Observable。

● Timeout
当一个事件流中每一个数据在一定时间内没有发射出去,则抛出超时异常

● Timestamp
返回每个数据源发射的时候的时间戳的Observable。

 Observable.intervalRange(0,10,0,500,TimeUnit.MILLISECONDS)
                .timeInterval().subscribe(new Consumer<Timed<Long>>() {
            @Override
            public void accept(Timed<Long> longTimed) throws Exception {
                print("timeInterval---Timed--->"+longTimed.time());//0
            }
        });
        Observable.intervalRange(0,10,0,5500,TimeUnit.MILLISECONDS)
                .timeout(5000,TimeUnit.MILLISECONDS )
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        print("timeout---->"+aLong);//0
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        print("timeout----Throwable>"+throwable.getMessage());
                    }
                });
        Observable.intervalRange(0,10,0,5500,TimeUnit.MILLISECONDS)
                .timestamp()
                .subscribe(new Consumer<Timed<Long>>() {
                    @Override
                    public void accept(Timed<Long> longTimed) throws Exception {
                        print("timestamp---Timed--->"+longTimed.time());//1510388694052
                    }
                });
         11-11 16:24:54.034 : timeInterval---Timed--->0
         11-11 16:24:54.044   timeout---->0
         11-11 16:24:54.044   timestamp---Timed--->1510388694052
         11-11 16:24:54.544   timeInterval---Timed--->500
         11-11 16:24:55.034   timeInterval---Timed--->500
         11-11 16:24:55.544   timeInterval---Timed--->500
         11-11 16:24:56.034   timeInterval---Timed--->500
         11-11 16:24:56.544   timeInterval---Timed--->500
         11-11 16:24:57.034   timeInterval---Timed--->500
         11-11 16:24:57.544   timeInterval---Timed--->500
         11-11 16:24:58.034   timeInterval---Timed--->500
         11-11 16:24:58.534   timeInterval---Timed--->500
         11-11 16:24:59.044   timeout----Throwable>null
         11-11 16:24:59.544   timestamp---Timed--->1510388699553
         11-11 16:25:05.044   timestamp---Timed--->1510388705053
         11-11 16:25:10.544   timestamp---Timed--->1510388710553
         11-11 16:25:16.044   timestamp---Timed--->1510388716053
         11-11 16:25:21.544   timestamp---Timed--->1510388721553
         11-11 16:25:27.044   timestamp---Timed--->1510388727053
         11-11 16:25:32.544   timestamp---Timed--->1510388732553
         11-11 16:25:38.044   timestamp---Timed--->1510388738053
         11-11 16:25:43.544   timestamp---Timed--->1510388743553

● Using
通过对源资源对象的生命周期的控制(对源数据订阅),产生一个对源数据经过处理后的ObservableSource

 Observable.using(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "hello";//----源数据
            }
        }, new Function<String, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(String s) throws Exception {
                return Observable.just(s+"----》你好!");//--------目标数据
            }
        }, new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                print("using----->"+s);//hello----收到源数据
                throw new Exception("源数据-----Error :"+s);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                print("using Consumer accept----->" + s);//hello----》你好!
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                print("using Consumer throwable----->" + throwable.getMessage());
            }
        });
输出结果:
         using Consumer accept----->hello----》你好!
         using----->hello
         using Consumer throwable----->源数据-----Error :hello

● To
转换操作。
blockingIterable blockingLatest blockingMostRecent blockingNext sorted to toFuture toList toMap toMultimap toSortedList

String first = Observable.just("aaaa",2,3).blockingFirst().toString();
        print(""+first);//aaaa
        Iterable<String> stringIterable = Observable.just("1","2","3").blockingIterable();
        Iterator iterator = stringIterable.iterator();
        while (iterator.hasNext()){
            print(""+iterator.next());
        }
        //1,2,3


        Observable.just("1","2","3").toMap(new Function<String, String>() {
            @Override
            public String apply(String s) throws Exception {
                return s+"+"+s;
            }
        }).subscribe(new Consumer<Map<String, String>>() {
            @Override
            public void accept(Map<String, String> stringStringMap) throws Exception {
                print("toMap   "+stringStringMap );//{2+2=2, 3+3=3, 1+1=1}
            }
        });
        Observable.just(5,3,6,3,9,4)
                .toSortedList().subscribe(new Consumer<List<Integer>>() {
            @Override
            public void accept(List<Integer> integers) throws Exception {
                print("toSortedList"+integers);//[3, 3, 4, 5, 6, 9]
            }
        });  

● Retry
当发生错误的时候,重新发射数据。

 Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("2222");
                e.onError(new Throwable("Sorry!! an error occured sending the data"));
            }
        }).retry(3)
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        print("retry--->" + s);
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        print("retry--->throwable:" + throwable.getMessage());
                    }
                });
       输出结果:
         retry--->2222
         retry--->2222
         retry--->2222
         retry--->2222
         retry--->throwable:Sorry!! an error occured sending the data

● cache
当第一次订阅时,缓存所有的项目和通知,以使后续订阅者也可以接收到数据

ObservableEmitter<String> emitter = null;
        Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                emitter = e;
                emitter.onNext("1-----onNext");

            }
        });
        Observable.intervalRange(0, 5, 100, 5, TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                emitter.onNext("intervalRange  send " + aLong);
            }
        });

        observable.cache().subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String aLong) throws Exception {
                        print("no cache---->" + aLong);
                    }
                });
        observable.delay(2000,TimeUnit.MILLISECONDS).subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String aLong) throws Exception {
                        print(" cache---->" + aLong);
                    }
                });
        observable
                .delay(4000,TimeUnit.MILLISECONDS)
                .onTerminateDetach()
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String aLong) throws Exception {
                        print("onTerminateDetach cache---->" + aLong);
                    }
                });
输出结果:
/**
 *================================================no  cache ===========
  no cache---->1-----onNext
  cache---->intervalRange  send 0
  cache---->1-----onNext
  cache---->intervalRange  send 1
  cache---->intervalRange  send 2
  onTerminateDetach cache---->1-----onNext
  onTerminateDetach cache---->intervalRange  send 3
  onTerminateDetach cache---->intervalRange  send 4

 ===================================================cache===========
 : no cache---->1-----onNext
   cache---->1-----onNext
   onTerminateDetach cache---->1-----onNext
   onTerminateDetach cache---->intervalRange  send 0
   onTerminateDetach cache---->intervalRange  send 1
   onTerminateDetach cache---->intervalRange  send 2
   onTerminateDetach cache---->intervalRange  send 3
   onTerminateDetach cache---->intervalRange  send 4
 */

● cast
在将其转换为指定类型后,从源观察源发出每个项目,实际上通过map(Functions.castFunction(clazz))来实现,本质上是一个map操作。

 Observable.just("1", "2", "3") 
                .cast(Integer.class)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer val) throws Exception {
                        print("cast---->" + val);
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        print("" + throwable.getMessage());//java.lang.String cannot be cast to java.lang.Integer
                    }
                });

● compose
自定义操作符,参数为ObservableTransformer ,可以继承ObservableTransformer 实现方法apply,来制定自己的运算符。

 Observable.just("1", "2", "3")
                .compose(schedulersTransformer())
                .subscribe();
//自定义线程调度操作符
 public ObservableTransformer schedulersTransformer() {
        return new ObservableTransformer() {
            @Override
            public ObservableSource apply(Observable upstream) {
                return upstream.subscribeOn(Schedulers.computation())
                        .observeOn(AndroidSchedulers.mainThread());
            }
        };
    }

总结
终于将这些Rxjava2.0的操作符讲完了,哈哈!!妈妈再也不用担心我不会用RxJava操作符了!!


噢耶!

这可能是世上最全操作符详解,虽然每个演示的Demo简单,但是应该可以根据输出结果理解,如果还不太明白,或者有疑问,动手自己敲段代码跑一下。哈哈!小伙伴们,不要忘记点个赞哦!

本系列文章的demo演示代码下载地址:
https://github.com/Callanna/RxLoad.git
找到该项目下的demo的module就可以了哦。
同时也可以支持一下我正在写的RxLoad这个类库,一个使用Rxjava实现加载图片,加载文件,加载网页的lib。

上一篇下一篇

猜你喜欢

热点阅读