Rxjava 创造型操作符
2020-09-30 本文已影响0人
tingtingtina
关于操作符的介绍,官网说明的还是非常清楚的,还配有事件流向图。
上面就是事件上游(被观察者)
- 箭头表示 Observable 发射消息的时间线;
- 花花绿绿,形形状状的就是发射的数据;
- 最后有竖线,就是结束的意思,对应着时间的onComplete,× 对应着 onError
下面是事件下游(观察者)
- 时间线是观察者接收消息的时间线
- 花花绿绿,形形状状的就是接收的数据;
中间那部分就是功能强大操作符了
在这里面连接了上游和下游,使他们建立了订阅关系。
最基础的就是创造型操作符,顾名思义,它们的作用就是创建 Observable,并发送事件
基本创建
create
创建一个被观察者对象,使用者自己使用发射器的 onNext() , onError() , 和 onCompleted() 发送对应消息,下游可以接收。 |
有意思的是,这里就没有上游线,因为创造型操作符本身就是创建一个上游,并允许发送消息。
为了示例看起来更简洁,先写一个创建观察者的方法
private Observer createObserver() {
return new Observer<Integer>() {
// 下游 Observer 观察者 处理事件
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
log("下游处理事件 onNext " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
log("下游处理事件 onComplete");
}
};
}
/** Create 基本創建
* ObservableEmitter是事件的发送器,可以发送多个onNext()方法;一旦发送 onComplete(),onError() 事件之后,后续的事件将不会再发送
*/
public void rx_create() {
// 上游 Observable 被观察者
Observable.create(new ObservableOnSubscribe<Integer>() {
// 发射器 发射事件
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
log("上游发射事件");
// 发射事件
emitter.onNext(1);
emitter.onNext(2);
log("上游发射完成");
}
}).subscribe(createObserver());
}
// 结果
上游发射事件
下游处理事件 onNext 1
下游处理事件 onNext 2
上游发射完成
快速创建
create 是使用自己的发射器发送的,RxJava 中也提供了更加快速的创建方式
Just
内部会自己发射数据,发射之后会再发一个 onComplete, 这里面支持发送多个数据 |
// just 会依次发射,最多发送 10 个
public void rx_just() {
// 上游 Observable 被观察者
// 内部会先发射 1, 再发射 2
Observable.just(1, 2).subscribe(createObserver());
}
// 结果
下游处理事件 onNext 1
下游处理事件 onNext 2
下游处理事件 onComplete
from: fromArray/fromIterable
内部自己发射的,数集对象/迭代器 |
public void rx_formArray() {
// 上游 Observable 被观察者
Integer[] array = {1, 2, 3, 4, 5};
Observable.fromArray(array).subscribe(createObserver());
}
// 结果
下游处理事件 onNext 1
下游处理事件 onNext 2
下游处理事件 onNext 3
下游处理事件 onNext 4
下游处理事件 onNext 5
下游处理事件 onComplete
range
发射一个范围内的有序数列,可以指定范围的起始和长度 |
public void rx_range() {
// 从 1 开始加 数量 5个 (1,2,3,4,5)
Observable.range(1,5).subscribe(createObserver());
}
// 结果
下游处理事件 onNext 1
下游处理事件 onNext 2
下游处理事件 onNext 3
下游处理事件 onNext 4
下游处理事件 onNext 5
下游处理事件 onComplete
empty / never / error
下游默认是 Object,无法发出有值的事件,创建后只会发射 onComplete
|
创建后,什么都不做 | 创建后会发射一个 Error 时间,通知异常 |
/**
* 上游没有发射任何事件 无法指定类型,默认 Object Rxjava 泛型默认类型 == Object
* 做一个耗时操作,不需要任何数据来刷新 UI 只会调用 onComplete
*/
public void rx_empty() {
// 不会发射有值的事件
Observable.empty().subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
// 没有事件可接收
log("empty -- > onNext");
}
@Override
public void onError(Throwable e) {
// 如果是 .error 会在这接收
}
@Override
public void onComplete() {
// 内部一定会调用 onComplete 事件
log("empty --- > onComplete");
}
});
}
// 结果
empty --- > onComplete
延迟创建
除了以上创建方法,我们可以为事件的发送添加延迟,支持延迟创建的有下几个操作符。
timer
快速创建后,延迟指定时间后,发送1个数值0(Long类型) |
interval
按照固定时间发射一个无限递增的整数序列。发送的事件序列 = 从0开始、无限递增1的的整数序列,也可以为它添加第一发射数据前的延时时间 |
public void rx_interval() {
log("------ Start ------");
Observable.interval(1, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
log("下游处理事件 onNext " + aLong);
}
});
}
结果:每个一秒钟会发射一个数据,从 0 开始 加1 递增
TIPS
这里用到了个 新的类 Consumer 其实也是个观察者类,可以看做是 Observer 的简化版,里面只有一个方法 accpet
相当于 Observer 中的 onNext
。
intervalRange
每隔指定时间就发送事件,可指定发送数据的数量,与 interval 类似,但可以指定发送数据的数量。它们都支持设置第一次延时时间
public void rx_intervalRange() {
log("------ Start ------");
// start 开始累积, count 累积多少个数量, initialDelay 开始等待的时间, period 每隔多久执行, TimeUnit 时间单位
Observable.intervalRange(1, 5, 1, 2, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
log("accept: " + aLong);
}
});
}
结果,数据从 1 开始 发送 5 个就是 1,2, 3, 4, 5,最后单位是 秒,开始延迟1s, 每个 2s 发送一次数据。
defer
直到有观察者(Observer)订阅时,才动态创建被观察者对象及发送事件
int i = 1;
public void rx_defer() {
// 上游 被观察者
Observable observable = Observable.defer(new Callable<ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> call() throws Exception {
return Observable.just(i);
}
});
// 再赋值
i = 10;
// 订阅
// 此时,才会调用 defer()创建被观察者对象
observable.subscribe(createObserver());
}
// 结果
下游处理事件 onNext 10
下游处理事件 onComplete
defer 操作符的入参是一个 Callable 的接口实现,里面的 call 方法返回的是一个 ObservableSource,看过之前手写框架就就已经知道,其实就是返回一个被观察者。通过之前的学习和经验应该能推断出,只有在订阅的时候,才会使用 call 方法,创建一个被观察者对象,然后再用这个对象订阅。