RxJava<第六篇>:4种Subject
2019-03-13 本文已影响29人
NoBugException
4种Subject 分别为:AsyncSubject
Subject将观察者和被观察者结合为一体,是观察者和被观察者的代理。
(1)AsyncSubject
AsyncSubject asyncSubject = AsyncSubject.create();
asyncSubject.onNext("A");
asyncSubject.onNext("B");
asyncSubject.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("接收发射过来的数据:"+s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("执行完成!");
}
});
asyncSubject.onNext("C");
asyncSubject.onComplete();
日志如下:
图片.png也就是说,AsyncSubject 无论发射多少条数据,无论在订阅前发射还是在订阅后发射,都只会收到最后一条发射的数据。
(2)BehaviorSubject
BehaviorSubject behaviorSubject = BehaviorSubject.create();
behaviorSubject.onNext("A");
behaviorSubject.onNext("B");
behaviorSubject.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("接收发射过来的数据:"+s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("执行完成!");
}
});
behaviorSubject.onNext("C");
behaviorSubject.onComplete();
日志如下:
图片.png也就是说,BehaviorSubjec只会接收到订阅前最后一条发射的数据以及订阅之后所有的数据。
(3)ReplaySubject
ReplaySubject replaySubject = ReplaySubject.create();
replaySubject.onNext("A");
replaySubject.onNext("B");
replaySubject.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("接收发射过来的数据:"+s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("执行完成!");
}
});
replaySubject.onNext("C");
replaySubject.onComplete();
日志如下:
图片.png也就是说,ReplaySubject会接收到全部数据。
这里需要补充一下:
ReplaySubject可以限制缓存的数量,也可以限制缓存的时间
ReplaySubject replaySubject = ReplaySubject.createWithSize(1);
ReplaySubject replaySubject = ReplaySubject.createWithTime(1000, TimeUnit.MILLISECONDS, Schedulers.newThread());
ReplaySubject replaySubject = ReplaySubject.createWithTimeAndSize(1000, TimeUnit.MILLISECONDS, Schedulers.newThread(), 1);
(4)PublishSubject
PublishSubject publishSubject = PublishSubject.create();
publishSubject.onNext("A");
publishSubject.onNext("B");
publishSubject.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("接收发射过来的数据:"+s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("执行完成!");
}
});
publishSubject.onNext("C");
publishSubject.onComplete();
日志如下:
图片.png也就是说,PublishSubject只会接收到订阅之后的所有数据。