RxJava

RxJava<第四篇>:Cold Observabl

2019-03-13  本文已影响2人  NoBugException
(1)Cold Observable

定义两个观察者:

    Consumer consumer1 = new Consumer<CountBean>() {
        @Override
        public void accept(CountBean countBean) throws Exception {
            System.out.println("consumer1:"+"threadname:"+countBean.getThreadName()+"-----hashcode:"+countBean.hashCode());
        }
    };
    Consumer consumer2 = new Consumer<CountBean>() {
        @Override
        public void accept(CountBean countBean) throws Exception {
            System.out.println("consumer2:"+"threadname:"+countBean.getThreadName()+"-----hashcode:"+countBean.hashCode());
        }
    };

定义被观察者:

    Observable observable =  Observable.create(new ObservableOnSubscribe<CountBean>() {
        @Override
        public void subscribe(ObservableEmitter<CountBean> emitter) {
            CountBean countBean = new CountBean();
            countBean.setThreadName(Thread.currentThread().getName());
            emitter.onNext(countBean);
        }
    }).observeOn(Schedulers.newThread());

    observable.subscribe(consumer1);
    observable.subscribe(consumer2);

以上代码就是典型的Cold Observable

图片.png

由执行效果得知两次订阅CountBean的哈希值不同,即操作的CountBean对象不同。

(2)Hot Observable

定义两个观察者:

    Consumer consumer1 = new Consumer<CountBean>() {
        @Override
        public void accept(CountBean countBean) throws Exception {
            System.out.println("consumer1:"+"threadname:"+countBean.getThreadName()+"-----hashcode:"+countBean.hashCode());
        }
    };
    Consumer consumer2 = new Consumer<CountBean>() {
        @Override
        public void accept(CountBean countBean) throws Exception {
            System.out.println("consumer2:"+"threadname:"+countBean.getThreadName()+"-----hashcode:"+countBean.hashCode());
        }
    };

定义被观察者:(与Cold Observable不同)

    Observable observable =  Observable.create(new ObservableOnSubscribe<CountBean>() {
        @Override
        public void subscribe(ObservableEmitter<CountBean> emitter) {
            CountBean countBean = new CountBean();
            countBean.setThreadName(Thread.currentThread().getName());
            emitter.onNext(countBean);

        }
    }).observeOn(Schedulers.newThread()).publish().refCount();


    observable.subscribe(consumer1);
    observable.subscribe(consumer2);

比第一种方式简化了很多。
refcount本质上在后台维护着一个引用计数器,当一个subscription需要取消订阅或者销毁的时候,发出一个正确的动作。

看一下share的源码得知,其实share就是publish().refCount()。

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> share() {
    return publish().refCount();
}

PublishSubject的父类是Subject, 查看Subject的源码可知,Subject继承了Observable,同时又实现了Observer,也就是说,Subject同时兼备了观察者和被观察者的特性,了解这个特性之后再去理解以上代码就容易多了。

以上说明了4种Hot Observable的写法,他们的执行效果都是

图片.png

即,不管订阅多少个观察者,操作的对象是一样的,并且它是线程安全的。

上一篇 下一篇

猜你喜欢

热点阅读