RxJava系列|Subject(二)
- Subject既可以是一个Observer,也可以是一个Observable,这点从以下代码
public abstract class Subject<T> extends Observable<T> implements Observer<T>
就可以看出来。
- Subject是连接Observer和Observable的桥梁,因此Subject可以被理解为Subject = Observable + Observer。
- RxJava一共提供了8种Subject的实现, 分别是AsyncSubject、BehaviorSubject、PublishSubject、ReplaySubject、CompletableSubject、MaybeSubject、SingleSubject以及UnicastSubject。
AsyncSubject
当Observable完成时,AsyncSubject只会发射来自原始Observable的最后一个数据。如果原始的Observable因为发生了错误而终止,AsyncSubject将不会发射任何数据,但是会向Observer传递一个异常通知。
AsyncSubject<String> aSubject = AsyncSubject.create();
aSubject.onNext("1");
aSubject.onNext("2");
aSubject.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) {
System.out.println("1-AsyncSubject:" + s);
}
});
aSubject.onNext("3");
aSubject.onNext("4");
aSubject.onNext("5");
//必须调用,否则Subject不知道什么时候发射完,Observer接收不到数据
aSubject.onComplete();
aSubject.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) {
System.out.println("2-AsyncSubject:" + s);
}
});
输出结果为:
I/System.out: 1-AsyncSubject:5
I/System.out: 2-AsyncSubject:5
BehaviorSubject
当Observer订阅BehaviorSubject时,它开始发射原始Observable最近发射的数据。如果此时还没有收到任何数据,它会发射一个默认值,然后继续发射其他任何来自原始Observable的数据。如果原始的Observable因为发生了一个错误而终止,BehaviorSubject将不会发射任何数据,但是会向Observer传递一个异常通知。
BehaviorSubject<String> bSubject = BehaviorSubject.createDefault("Default-Value");
bSubject.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) {
System.out.println("BehaviorSubject:" + s);
}
});
bSubject.onNext("one");
bSubject.onNext("two");
bSubject.onNext("three");
bSubject.onNext("four");
输出结果为:
I/System.out: BehaviorSubject:Default-Value
I/System.out: BehaviorSubject:one
I/System.out: BehaviorSubject:two
I/System.out: BehaviorSubject:three
I/System.out: BehaviorSubject:four
若将以上的代码改为:
BehaviorSubject<String> bSubject = BehaviorSubject.createDefault("Default-Value");
bSubject.onNext("one");
bSubject.onNext("two");
bSubject.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) {
System.out.println("BehaviorSubject:" + s);
}
});
bSubject.onNext("three");
bSubject.onNext("four");
则输出的结果为:
I/System.out: BehaviorSubject:two
I/System.out: BehaviorSubject:three
I/System.out: BehaviorSubject:four
PublishSubject
PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。需要注意的是,PublishSubject可能会一创建完成就立刻开始发射数据,因此这里会有一个风险:在Subject被创建后到有观察者订阅它之前这个时间段内,一个或多个数据可能会丢失。如果要确保来自原始Observable 的所有数据都被分发,则可以当所有观察者都已经订阅时,才开始发射数据,或者改用ReplaySubject。
PublishSubject<String> pSubject = PublishSubject.create();
pSubject.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) {
System.out.println("Observer-1:" + s);
}
});
pSubject.onNext("A");
pSubject.onNext("B");
pSubject.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) {
System.out.println("Observer-2:" + s);
}
});
pSubject.onNext("C");
pSubject.onNext("D");
输出结果为:
I/System.out: Observer-1:A
I/System.out: Observer-1:B
I/System.out: Observer-1:C
I/System.out: Observer-2:C
I/System.out: Observer-1:D
I/System.out: Observer-2:D
ReplaySubject
不管Observer何时订阅ReplaySubject,ReplaySubject均会发射所有来自原始Observable的数据给Observer。有不同类型的ReplaySubject,它们用于限定Replay的范围,例如设定Buffer的具体大小,或者设定具体的时间范围。如果使用ReplaySubject作为Observer,注意不要在多个线程中调用onNext、onComplete和onError方法。这可能会导致顺序错乱,并且违反了Observer规则。
ReplaySubject<String> rSubject = ReplaySubject.create();
rSubject.onNext("one");
rSubject.onNext("two");
rSubject.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) {
System.out.println("Observer-1:" + s);
}
});
rSubject.onNext("three");
rSubject.onNext("four");
rSubject.onComplete();
rSubject.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) {
System.out.println("Observer-2:" + s);
}
});
输出结果为:
I/System.out: Observer-1:one
I/System.out: Observer-1:two
I/System.out: Observer-1:three
I/System.out: Observer-1:four
I/System.out: Observer-2:one
I/System.out: Observer-2:two
I/System.out: Observer-2:three
I/System.out: Observer-2:four
CompletableSubject
只发送Observer发射完毕数据,即:只发送onCompelted()
CompletableSubject cSubject = CompletableSubject.create();
cSubject.subscribe(new Action() {
@Override
public void run() {
System.out.println("CompletableSubject执行完毕");
}
});
cSubject.onComplete();
输出结果为:
I/System.out: CompletableSubject执行完毕
MaybeSubject
MaybeSubject所有方法都是线程安全的。主要用于发送一个结果数据,主要方法onSuccess(),多次调用无效。一般用于验证某个结果。
MaybeSubject<Boolean> mSubject = MaybeSubject.create() ;
mSubject.subscribe(new Consumer<Boolean>() {
@Override
public void accept(@NonNull Boolean b) {
System.out.println("登录状态:"+b);
}
}) ;
mSubject.onSuccess(true);
mSubject.onComplete();
输出结果为:
I/System.out: 登录状态:true
SingleSubject
SingleSubject和MaybeSubject区别不大,MaybeSubject提供了状态方法,比如:onComplete()和onErrorComplete(),而SingleSubject没有这些方法,更是印证了Single这个名字,这里就不多说,用法和MaybeSubject一样。
SingleSubject<String> sSubject = SingleSubject.create() ;
sSubject.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) {
System.out.println("Observable-one:"+s);
}
}) ;
sSubject.onSuccess("success...");
输出结果为:
I/System.out: Observable-one:success...
UnicastSubject
只允许一个 Observer 进行监听,在该 Observer 注册之前会将发射的所有的事件放进一个队列中,并在 Observer 注册的时候一起通知给它。如果有多个观察者订阅(监听),程序会报错。
UnicastSubject<String> uSubject = UnicastSubject.create();
uSubject.onNext("1");
uSubject.onNext("2");
uSubject.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("Observer-1:" + s);
}
});
uSubject.onNext("3");
uSubject.onNext("4");
uSubject.onComplete();
// 以下代码,不注释会崩溃
// 即使UnicastSubject发射完了数据,在订阅其他Observer也不行
uSubject.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("Observer-2:" + s);
}
});
输出结果为:
I/System.out: Observer-1:1
I/System.out: Observer-1:2
I/System.out: Observer-1:3
I/System.out: Observer-1:4
Subject作为观察者使用
Subject<String> subject = new Subject<String>() {
@Override
public boolean hasObservers() {
return false;
}
@Override
public boolean hasThrowable() {
return false;
}
@Override
public boolean hasComplete() {
return false;
}
@Override
public Throwable getThrowable() {
return null;
}
@Override
protected void subscribeActual(Observer<? super String> observer) {
}
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};