rxjava2.x 五种观察者模式
一. Observable
1. Hot Observable
Hot Observable 通俗一点就像时光一样,不管你有没有订阅它,它总会发生,总会消逝
可以通过publish,subject/processor生成
publish 方式:
Consumer<Long> longConsumer = new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG, " longConsumer accept : " + aLong);
}
};
Consumer<Long> longConsumer1 = new Consumer<Long>() {
@Override
public void accept(Long o) throws Exception {
Log.d(TAG, " longConsumer1 accept : " + o);
}
};
ConnectableObservable<Long> publish = Observable.create(new ObservableOnSubscribe<Long>() {
@Override
public void subscribe(ObservableEmitter<Long> emitter) throws Exception {
Observable.interval(1, TimeUnit.SECONDS, Schedulers.computation())
.take(10).subscribe(emitter::onNext);
}
}).observeOn(Schedulers.newThread()).publish();// cold -> hot 转换
// 建立连接
publish.connect();
publish.subscribe(longConsumer);
publish.subscribe(longConsumer1);
subject/processor
Consumer<Long> consumer = new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG, " consumer accept : " + aLong);
}
};
Consumer<Long> longConsumer = new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG, " longConsumer accept : " + aLong);
}
};
Observable<Long> longObservable = Observable.create(new ObservableOnSubscribe<Long>() {
@Override
public void subscribe(ObservableEmitter<Long> emitter) throws Exception {
Observable
.interval(10, TimeUnit.SECONDS, Schedulers.computation())
.take(Integer.MAX_VALUE)
.subscribe(emitter::onNext);
}
}).observeOn(Schedulers.newThread());
// cold -> hot转换并建立连接
PublishSubject<Long> objectPublishSubject = PublishSubject.create();
longObservable.subscribe(objectPublishSubject);
objectPublishSubject.subscribe(consumer);
objectPublishSubject.subscribe(longConsumer);
Subject四种类型介绍:
AsyncSubject:
无论订阅发生在什么时候,只发送最后一条数据
BehaviorSubject:
发送订阅之前的一条数据,以及之后的所有数据
ReplaySubject:
不论订阅发送在什么时候,都发送所有数据
PublishSubject:
发送订阅之后的所有数据
processor 介绍
processor 和 Subject 功能一样,但是支持背压
2. Cold Observable
Cold Observable 就像一个播放器,有人订阅它,他就会播放,没人订阅该事件处于暂停状态
Observable 使用 just ,create, range 等都是 Cold Observable 的一种表现形式
或者是从Hot Observable 转变为 Cold Observable 模式,但是一般人不会这么干
可以通过refcount 或者 share 操作符进行转换
二. Flowable
Flowable 建立初衷是背压的抽取,rxjava2.x中 observable 不再支持背压,而有Flowable 支持非阻塞背压,操作符和Observable很相近
后边介绍一下Flowable 和 observable 管道容量的情况
Observable : 最大处理不超过 1000 条,否则会出现内存溢出
Flowable: 最大处理超过10KB的数据
三.Single
Single 的定义如同表面意思一样,用于发送单一数据,目前 只有onSuccess 和 onError 事件
四. Completable
Completable 被誉为 rxjava 中的Runnable, 只用来接受消息,不会进行发射数据
五. Maybe
Maybe 是rxjava2.x 新加的观察者模式, 可以看成是Single 和 Completable 结合 ,可以通过onSuccess 的方式进行发射数据,但是发送的数据不能超过一条,超过一条的部分即使发送也不会被接受到,因为onSuccess 发送数据后,紧接着链路就不掐断,导致后续事件不能发送
总结: 通过这五种观察者模式,基本上满足所有事件的处理,无论是背压,发送和接受,只接收,接收到结果后反馈结果...... 场景都可以使用不能的观察者模式去解决,是不是初步感觉到了rxjava的强大。