RxJava

RxJava<第九篇>:创建操作符

2019-03-14  本文已影响21人  NoBugException

(1)create

    Observable.create();
    Flowable.create();
    Single.create();
    Completable.create();
    Maybe.create();

用来创建被观察者, 5种被观察者都可以使用create操作符创建。

(2)just

    Observable.just("1", "2", "3");
    Flowable.just("1", "2", "3");
    Single.just("1");
    Maybe.just("1");

只有四种被观察者可以使用just,一旦使用just就必须发射一条数据。

另外,需要注意的是:

just操作符最多可以设置10个参数。

(3)from

from只要是将其他类种的对象和数据类型转成Observable

Publisher将在后续介绍压背的时候重点提出。

看一下源码

图片.png

在这里被我圈出来的可变长度参数,也就是说参数的个数是可变的。

fromIterable的参数是Iterable类型, Collection是Iterable的子接口,所以只要是最终实现Collection接口的集合都可以作为参数,以下的java集合框架图可以作为参考:

图片.png

注意:其他被观察者也可以使用from操作符

图片.png 图片.png 图片.png 图片.png

(4)range

发射0~9之间的整数,左闭右开[0,9)

    Observable.range(0, 9).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            System.out.println(String.valueOf(integer));
        }
    });

其他方式

    Observable.rangeLong();
    Flowable.range();
    Flowable.rangeLong();

(5) defer

defer只有在订阅的时候才会创建被观察者,以保证每次发射的数据是最新的。

    Observable.defer(new Callable<ObservableSource<String>>() {
        @Override
        public ObservableSource<String> call() throws Exception {
            return Observable.just("A", "B");
        }
    }).subscribe(new Consumer<String>() {

        @Override
        public void accept(String s) throws Exception {
            System.out.println(s);
        }
    });

其他方式

    Flowable.defer();
    Single.defer();
    Completable.defer();
    Maybe.defer();

(6)interval

创建一个按固定时间间隔发射整数序列的Observable

图片.png
    //延迟initialDelay秒后,按period秒定时时间间隔发射整数序列,调度器为computation
    Observable.interval(1000, 1000, TimeUnit.MILLISECONDS, Schedulers.computation());

    //延迟period秒后,按period秒定时时间间隔发射整数序列,调度器为computation
    Observable.interval(1000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() {
        @Override
        public void accept(Long aLong) throws Exception {
            System.out.println(String.valueOf(aLong));
        }
    });

有关Schedulers的讲解:

注意:在Rxjava 2.x版本中,废弃了1.x版本Schedulers.immediate(),在1.x中,Schedulers.immediate的作用是在当前线程中立即执行任务,功能等同于Rxjava中的2.x版本中的Schedulers.trampoline(),而在Schedulers.trampoline()在1.x版本的时候,作用是:当其他排队的任务执行完成之后,在当前线程排队开始执行接收到的任务,有点像2.x版本的Schedulers.single(),但是也不完全相同,因为Schedulers.single()不是在当前线程而是在一个线程单例中排队执行任务。

其他方式

    Flowable.interval();

(7)timer

实现延迟执行

    Observable.timer(1000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() {
        @Override
        public void accept(Long aLong) throws Exception {
            System.out.println(String.valueOf(aLong));
        }
    });

    Observable.timer(1000, TimeUnit.MILLISECONDS, Schedulers.computation());

其他方式

    Flowable.timer();
    Single.timer();
    Completable.timer();
    Maybe.timer();

(8)empty

使直接完成发射数据,也就是直接执行了onComplete。

    Observable.empty().subscribe(new Observer<Object>() {

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("11111111111111");
        }

        @Override
        public void onNext(Object o) {
            System.out.println("2222222222222");
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("33333333333333");
        }

        @Override
        public void onComplete() {
            System.out.println("4444444444444444");
        }
    });

其他方式

    Flowable.empty();
    Maybe.empty();

(9)error

使直接发生异常结束发射数据,也就是直接执行了onError。

    Observable.error(new Throwable("nullpoint exception")).subscribe(new Observer<Object>() {

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("11111111111111");
        }

        @Override
        public void onNext(Object o) {
            System.out.println("2222222222222");
        }

        @Override
        public void onError(Throwable e) {
            System.out.println(e.getMessage());
        }

        @Override
        public void onComplete() {
            System.out.println("4444444444444444");
        }
    });

其他方式

    Flowable.error();
    Single.error();
    Completable.error();
    Maybe.error();

(10)never

不发射数据,也不结束发射。

    Observable.never().subscribe(new Observer<Object>() {

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("11111111111111");
        }

        @Override
        public void onNext(Object o) {
            System.out.println("2222222222222");
        }

        @Override
        public void onError(Throwable e) {
            System.out.println(e.getMessage());
        }

        @Override
        public void onComplete() {
            System.out.println("4444444444444444");
        }
    });

其他方式

    Flowable.never();
    Single.never();
    Completable.never();
    Maybe.never();
上一篇 下一篇

猜你喜欢

热点阅读