RxJava

Rxjava 创造型操作符

2020-09-30  本文已影响0人  tingtingtina

关于操作符的介绍,官网说明的还是非常清楚的,还配有事件流向图。



上面就是事件上游(被观察者)

下面是事件下游(观察者)

中间那部分就是功能强大操作符了
在这里面连接了上游和下游,使他们建立了订阅关系。

最基础的就是创造型操作符,顾名思义,它们的作用就是创建 Observable,并发送事件

基本创建

create

创建一个被观察者对象,使用者自己使用发射器的 onNext(), onError(), 和 onCompleted() 发送对应消息,下游可以接收。

有意思的是,这里就没有上游线,因为创造型操作符本身就是创建一个上游,并允许发送消息。

为了示例看起来更简洁,先写一个创建观察者的方法

private Observer createObserver() {
    return new Observer<Integer>() {
        // 下游 Observer 观察者 处理事件
        @Override
        public void onSubscribe(Disposable d) {
        }
        @Override
        public void onNext(Integer integer) {
            log("下游处理事件 onNext " + integer);
        }
        @Override
        public void onError(Throwable e) {
        }
        @Override
        public void onComplete() {
            log("下游处理事件 onComplete");
        }
    };
}
 /** Create 基本創建
  *  ObservableEmitter是事件的发送器,可以发送多个onNext()方法;一旦发送 onComplete(),onError() 事件之后,后续的事件将不会再发送
 */
public void rx_create() {
    // 上游 Observable 被观察者
    Observable.create(new ObservableOnSubscribe<Integer>() {
        // 发射器 发射事件
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            log("上游发射事件");
            // 发射事件
            emitter.onNext(1);
            emitter.onNext(2);
            log("上游发射完成");
        }
    }).subscribe(createObserver());
}

// 结果
上游发射事件
下游处理事件 onNext 1
下游处理事件 onNext 2
上游发射完成

快速创建

create 是使用自己的发射器发送的,RxJava 中也提供了更加快速的创建方式

Just

内部会自己发射数据,发射之后会再发一个 onComplete, 这里面支持发送多个数据
// just 会依次发射,最多发送 10 个
public void rx_just() {
    // 上游 Observable 被观察者
    // 内部会先发射 1, 再发射 2
    Observable.just(1, 2).subscribe(createObserver());
}
// 结果
下游处理事件 onNext 1
下游处理事件 onNext 2
下游处理事件 onComplete

from: fromArray/fromIterable

内部自己发射的,数集对象/迭代器
public void rx_formArray() {
    // 上游 Observable 被观察者
    Integer[] array = {1, 2, 3, 4, 5};
    Observable.fromArray(array).subscribe(createObserver());
}

// 结果
下游处理事件 onNext 1
下游处理事件 onNext 2
下游处理事件 onNext 3
下游处理事件 onNext 4
下游处理事件 onNext 5
下游处理事件 onComplete

range

发射一个范围内的有序数列,可以指定范围的起始和长度
public void rx_range() {
    // 从 1 开始加 数量 5个 (1,2,3,4,5)
    Observable.range(1,5).subscribe(createObserver());
}

// 结果
下游处理事件 onNext 1
下游处理事件 onNext 2
下游处理事件 onNext 3
下游处理事件 onNext 4
下游处理事件 onNext 5
下游处理事件 onComplete

empty / never / error

下游默认是 Object,无法发出有值的事件,创建后只会发射 onComplete 创建后,什么都不做 创建后会发射一个 Error 时间,通知异常
/**
 * 上游没有发射任何事件 无法指定类型,默认 Object Rxjava 泛型默认类型 == Object
 * 做一个耗时操作,不需要任何数据来刷新 UI 只会调用 onComplete
 */
public void rx_empty() {
    // 不会发射有值的事件
    Observable.empty().subscribe(new Observer<Object>() {
        @Override
        public void onSubscribe(Disposable d) {
        }
        @Override
        public void onNext(Object o) {
            // 没有事件可接收
            log("empty -- > onNext");
        }
        @Override
        public void onError(Throwable e) {
            // 如果是 .error 会在这接收
        }
        @Override
        public void onComplete() {
            // 内部一定会调用 onComplete 事件
            log("empty --- > onComplete");
        }
    });
}
// 结果
empty --- > onComplete

延迟创建

除了以上创建方法,我们可以为事件的发送添加延迟,支持延迟创建的有下几个操作符。

timer

快速创建后,延迟指定时间后,发送1个数值0(Long类型)

interval

按照固定时间发射一个无限递增的整数序列。发送的事件序列 = 从0开始、无限递增1的的整数序列,也可以为它添加第一发射数据前的延时时间
public void rx_interval() {
    log("------ Start ------");
    Observable.interval(1, TimeUnit.SECONDS)
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    log("下游处理事件 onNext " + aLong);
                }
            });
}
结果:每个一秒钟会发射一个数据,从 0 开始 加1 递增

TIPS
这里用到了个 新的类 Consumer 其实也是个观察者类,可以看做是 Observer 的简化版,里面只有一个方法 accpet 相当于 Observer 中的 onNext

intervalRange

每隔指定时间就发送事件,可指定发送数据的数量,与 interval 类似,但可以指定发送数据的数量。它们都支持设置第一次延时时间

public void rx_intervalRange() {
    log("------ Start ------");
    // start 开始累积, count 累积多少个数量, initialDelay 开始等待的时间, period 每隔多久执行, TimeUnit 时间单位
    Observable.intervalRange(1, 5, 1, 2, TimeUnit.SECONDS)
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    log("accept: " + aLong);
                }
            });
}
结果,数据从 1 开始 发送 5 个就是 1,2, 3, 4, 5,最后单位是 秒,开始延迟1s, 每个 2s 发送一次数据。

defer

直到有观察者(Observer)订阅时,才动态创建被观察者对象及发送事件

int i = 1;
public void rx_defer() {
    // 上游 被观察者
    Observable observable = Observable.defer(new Callable<ObservableSource<Integer>>() {
        @Override
        public ObservableSource<Integer> call() throws Exception {
            return Observable.just(i);
        }
    });
    // 再赋值
    i = 10;
    // 订阅
    // 此时,才会调用 defer()创建被观察者对象
    observable.subscribe(createObserver());
}

// 结果
下游处理事件 onNext 10
下游处理事件 onComplete

defer 操作符的入参是一个 Callable 的接口实现,里面的 call 方法返回的是一个 ObservableSource,看过之前手写框架就就已经知道,其实就是返回一个被观察者。通过之前的学习和经验应该能推断出,只有在订阅的时候,才会使用 call 方法,创建一个被观察者对象,然后再用这个对象订阅。

上一篇下一篇

猜你喜欢

热点阅读