RxJavaAndroid

RxJava<第六篇>:4种Subject

2019-03-13  本文已影响29人  NoBugException

4种Subject 分别为:AsyncSubject

Subject将观察者和被观察者结合为一体,是观察者和被观察者的代理。

(1)AsyncSubject

    AsyncSubject asyncSubject = AsyncSubject.create();
    asyncSubject.onNext("A");
    asyncSubject.onNext("B");
    asyncSubject.subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println("接收发射过来的数据:"+s);
        }
    }, new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {

        }
    }, new Action() {
        @Override
        public void run() throws Exception {
            System.out.println("执行完成!");
        }
    });

    asyncSubject.onNext("C");
    asyncSubject.onComplete();

日志如下:

图片.png

也就是说,AsyncSubject 无论发射多少条数据,无论在订阅前发射还是在订阅后发射,都只会收到最后一条发射的数据。

(2)BehaviorSubject

    BehaviorSubject behaviorSubject = BehaviorSubject.create();
    behaviorSubject.onNext("A");
    behaviorSubject.onNext("B");
    behaviorSubject.subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println("接收发射过来的数据:"+s);
        }
    }, new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {

        }
    }, new Action() {
        @Override
        public void run() throws Exception {
            System.out.println("执行完成!");
        }
    });

    behaviorSubject.onNext("C");
    behaviorSubject.onComplete();

日志如下:

图片.png

也就是说,BehaviorSubjec只会接收到订阅前最后一条发射的数据以及订阅之后所有的数据。

(3)ReplaySubject

    ReplaySubject replaySubject = ReplaySubject.create();
    replaySubject.onNext("A");
    replaySubject.onNext("B");
    replaySubject.subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println("接收发射过来的数据:"+s);
        }
    }, new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {

        }
    }, new Action() {
        @Override
        public void run() throws Exception {
            System.out.println("执行完成!");
        }
    });

    replaySubject.onNext("C");
    replaySubject.onComplete();

日志如下:

图片.png

也就是说,ReplaySubject会接收到全部数据。

这里需要补充一下:
ReplaySubject可以限制缓存的数量,也可以限制缓存的时间

    ReplaySubject replaySubject = ReplaySubject.createWithSize(1);
    ReplaySubject replaySubject = ReplaySubject.createWithTime(1000, TimeUnit.MILLISECONDS, Schedulers.newThread());
    ReplaySubject replaySubject = ReplaySubject.createWithTimeAndSize(1000, TimeUnit.MILLISECONDS, Schedulers.newThread(), 1);

(4)PublishSubject

    PublishSubject publishSubject = PublishSubject.create();

    publishSubject.onNext("A");
    publishSubject.onNext("B");
    publishSubject.subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println("接收发射过来的数据:"+s);
        }
    }, new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {

        }
    }, new Action() {
        @Override
        public void run() throws Exception {
            System.out.println("执行完成!");
        }
    });

    publishSubject.onNext("C");
    publishSubject.onComplete();

日志如下:

图片.png

也就是说,PublishSubject只会接收到订阅之后的所有数据。

上一篇 下一篇

猜你喜欢

热点阅读