RxJava系列之Publish,Share,Refcount操
2019-08-04 本文已影响0人
代码改变人生
1. Observable的分类——Cold 和 Hot
-
Hot Observable
无论有没有Subscriber
订阅,事件始终都会发射。当Hot Observable
有多个订阅者时,Hot Observable
与订阅者们是一对多的关系,即可以与多个订阅者共享信息。 -
Cold Observable
只有Subscriber
订阅时,才开始发射数据流,Cold Observable
与订阅者只能是一对一的关系,即当有多个不同的订阅者时,消息是重新完整发送的。也就是说对Cold Observable
而言,有多个Subscriber
的时候,他们各自的事件是独立的。
2. Cold Observable 如何转换成 Hot Observable
(1) publish操作符
-
publish将普通的Observable转换为可连接的Observable
publish.png
其实可连接的Observable类似于普通的Observable,区别在于它在订阅时才发射数据,只有当使用Connect操作符才开始。 通过这种方式,可以选择Observable的发射时间。
使用 publish 操作符,可以让 Cold Observable 转换成 Hot Observable。它将原先的 Observable 转换成 ConnectableObservable。
Consumer<Long> subscriber1 = new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
System.out.println("subscriber1: "+aLong);
}
};
Consumer<Long> subscriber2 = new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
System.out.println(" subscriber2: "+aLong);
}
};
ConnectableObservable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
Observable.interval(5, TimeUnit.MILLISECONDS,Schedulers.computation())
.take(Integer.MAX_VALUE)
.subscribe(e::onNext);
}
}).observeOn(Schedulers.newThread()).publish();
//注意生成的 ConnectableObservable 需要调用connect()才能真正执行。
observable.connect();
observable.subscribe(subscriber1);
try {
Thread.sleep(20L);
} catch (InterruptedException e) {
e.printStackTrace();
}
observable.subscribe(subscriber2);
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
e.printStackTrace();
}
执行结果如下:
subscriber1: 0
subscriber1: 1
subscriber1: 2
subscriber2: 2
subscriber1: 3
subscriber2: 3
subscriber1: 4
subscriber2: 4
subscriber1: 5
subscriber2: 5
subscriber1: 6
subscriber2: 6
3. Hot Observable 如何转换成 Cold Observable
(1) refCount操作符
该操作符可以使Connectable Observable的行为类似于普通的Observable
refcount.png
RefCount操作符把从一个可连接的 Observable 连接和断开的过程自动化了。它操作一个可连接的Observable,返回一个普通的Observable。当第一个订阅者订阅这个Observable时,RefCount连接到下层的可连接Observable。RefCount跟踪有多少个观察者订阅它,直到最后一个观察者完成才断开与下层可连接Observable的连接。
如果所有的订阅者都取消订阅了,则数据流停止。如果重新订阅则重新开始数据流。
Consumer<Long> subscriber1 = new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
System.out.println("subscriber1: "+aLong);
}
};
Consumer<Long> subscriber2 = new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
System.out.println(" subscriber2: "+aLong);
}
};
ConnectableObservable<Long> connectableObservable = Observable.create(new ObservableOnSubscribe<Long>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
Observable.interval(10, TimeUnit.MILLISECONDS,Schedulers.computation())
.take(Integer.MAX_VALUE)
.subscribe(e::onNext);
}
}).observeOn(Schedulers.newThread()).publish();
connectableObservable.connect();
Observable<Long> observable = connectableObservable.refCount();
Disposable disposable1 = observable.subscribe(subscriber1);
Disposable disposable2 = observable.subscribe(subscriber2);
try {
Thread.sleep(20L);
} catch (InterruptedException e) {
e.printStackTrace();
}
disposable1.dispose();
disposable2.dispose();
System.out.println("重新开始数据流");
disposable1 = observable.subscribe(subscriber1);
disposable2 = observable.subscribe(subscriber2);
try {
Thread.sleep(20L);
} catch (InterruptedException e) {
e.printStackTrace();
}
执行结果:
subscriber1: 0
subscriber2: 0
subscriber1: 1
subscriber2: 1
重新开始数据流
subscriber1: 0
subscriber2: 0
subscriber1: 1
subscriber2: 1
注:如果不是所有的订阅者都取消了订阅,只取消了部分。部分的订阅者重新开始订阅,则不会从头开始数据流。
Consumer<Long> subscriber1 = new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
System.out.println("subscriber1: "+aLong);
}
};
Consumer<Long> subscriber2 = new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
System.out.println(" subscriber2: "+aLong);
}
};
Consumer<Long> subscriber3 = new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
System.out.println(" subscriber3: "+aLong);
}
};
ConnectableObservable<Long> connectableObservable = Observable.create(new ObservableOnSubscribe<Long>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
Observable.interval(10, TimeUnit.MILLISECONDS,Schedulers.computation())
.take(Integer.MAX_VALUE)
.subscribe(e::onNext);
}
}).observeOn(Schedulers.newThread()).publish();
connectableObservable.connect();
Observable<Long> observable = connectableObservable.refCount();
Disposable disposable1 = observable.subscribe(subscriber1);
Disposable disposable2 = observable.subscribe(subscriber2);
observable.subscribe(subscriber3);
try {
Thread.sleep(20L);
} catch (InterruptedException e) {
e.printStackTrace();
}
disposable1.dispose();
disposable2.dispose();
System.out.println("subscriber1、subscriber2 重新订阅");
disposable1 = observable.subscribe(subscriber1);
disposable2 = observable.subscribe(subscriber2);
try {
Thread.sleep(20L);
} catch (InterruptedException e) {
e.printStackTrace();
}
执行结果:
subscriber1: 0
subscriber2: 0
subscriber3: 0
subscriber1: 1
subscriber2: 1
subscriber3: 1
subscriber1、subscriber2 重新订阅
subscriber3: 2
subscriber1: 2
subscriber2: 2
subscriber3: 3
subscriber1: 3
subscriber2: 3
subscriber3: 4
subscriber1: 4
subscriber2: 4
share操作符
share操作符封装了publish().refCount()调用,可以看其源码。
share.png