RxJava从使用到原理

2018-12-27  本文已影响0人  帝王鲨kingcp
创建操作符

操作符使用

  1. 基本创建
    create() 完整创建1个被观察者对象(Observable)
  2. 快速创建,发送事件
    just() 快速创建1个被观察者对象(Observable),发送事件的特点:直接发送 传入的事件
    fromArray() 快速创建1个被观察者对象(Observable),发送事件的特点:直接发送 传入的数组数据
    fromIterable() 快速创建1个被观察者对象(Observable),发送事件的特点:直接发送 传入的集合List数据
    测试使用 empty() error() never()
创建操作符.png
Android RxJava:最基础的操作符详解 - 创建操作符
变化操作符
  1. map() 对被观察者发送的每1个事件都通过指定的函数处理,从而变换成另外一种事件即,将被观察者发送的事件转换为任意的类型事件。
  2. flatmap() 将被观察者发送的事件序列进行拆分 & 单独转换,再合并成一个新的事件序列,最后再进行发送。新合并生成的事件序列顺序是无序的,即与旧序列发送事件的顺序无关。
  3. ConcatMap() 类似FlatMap()操作符,拆分 & 重新合并生成的事件序列的顺序 = 被观察者旧序列生产的顺序
  4. Buffer() 定期从被观察者(Obervable)需要发送的事件中获取一定数量的事件 & 放到缓存区中,最终发送
    Android RxJava:图文详解 变换操作符
组合/合并操作符
  1. concat() / concatArray() 组合多个被观察者一起发送数据,合并后按发送顺序串行执行
  2. merge() / mergeArray() 组合多个被观察者一起发送数据,合并后按时间线并行执行
  3. concatDelayError() / mergeDelayError() onError事件推迟到其他被被观察者发送事件结束后触发
  4. zip() 合并多个被观察者(Observable)发送的事件,生成一个新的事件序列(即组合过后的事件序列),并最终发送。事件组合方式 = 严格按照原先事件序列进行对位合并,最终合并的事件数量 = 多个被观察者(Observable)中数量最少的数量。
  5. combineLatest() 当两个Observables中的任何一个发送了数据后,将先发送了数据的Observables 的最新(最后)一个数据与另外一个Observable发送的每个数据结合,最终基于该函数的结果发送数据。
  6. combineLatestDelayError() 作用类似于concatDelayError() / mergeDelayError() ,即错误处理,此处不作过多描述。
  7. reduce() 把被观察者需要发送的事件聚合成1个事件 & 发送
  8. startWith() / startWithArray() 在一个被观察者发送事件前,追加发送一些数据 / 一个新的被观察者
  9. count() 统计被观察者发送事件的数量
    Android RxJava:组合 / 合并操作符 详细教程
功能性操作符
  1. subscribe() 订阅,即连接观察者&被观察者
  2. 线程调度
  3. 延迟操作
  4. do() 在事件的生命周期中操作


    do操作符.png
  5. 错误处理


    错误操作符.png
  6. 重复发送
    repeat() 无条件地、重复发送 被观察者事件
    repeatWhen() 有条件地、重复发送 被观察者事件
    image.png
    Android RxJava:功能性操作符 全面讲解

原理

Single:如下面的代码做最简单的操作,创建被观察者,观察者以及相互之间订阅。
//创建被观察者
Single<String> single = Single.just("1");
//创建观察者
SingleObserver<String> observer = new SingleObserver<String>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onSuccess(String s) {

    }

    @Override
    public void onError(Throwable e) {

    }
};
  //发生订阅关系
  single.subscribe(observer);

just方法,返回一个包裹了Single的SingleJust。SingleJust继承Single类。

public static <T> Single<T> just(final T item) {
        ObjectHelper.requireNonNull(item, "value is null");
        return RxJavaPlugins.onAssembly(new SingleJust<T>(item));
}

被观察者订阅观察者,subscribe方法回去实现subscribeActual(subscriber),这个方法就在SingleJust中

 public final void subscribe(SingleObserver<? super T> subscriber) {
        ObjectHelper.requireNonNull(subscriber, "subscriber is null");

        subscriber = RxJavaPlugins.onSubscribe(this, subscriber);

        ObjectHelper.requireNonNull(subscriber, "subscriber returned by the RxJavaPlugins hook is null");

        try {
            subscribeActual(subscriber);
        } catch (NullPointerException ex) {
            throw ex;
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            NullPointerException npe = new NullPointerException("subscribeActual failed");
            npe.initCause(ex);
            throw npe;
        }
    }

subscribleActual会去调用观察者SingleObserver的onSubscribe和onSuccess方法。其中onSubscribe还返回一个已经丢弃的丢弃对象Disposables,Disposable会在下面讲。

public final class SingleJust<T> extends Single<T> {

    final T value;

    public SingleJust(T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(SingleObserver<? super T> s) {
        s.onSubscribe(Disposables.disposed());
        s.onSuccess(value);
    }

}

Create作为和just一样的创建操作符,其实流程是相似的,下面是流程图
create创建原理.png
Map操作符使用如下,能将just中的内容进行变换后往下传递。
        SingleJust.just(1)
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer integer) throws Exception {
                        return String.valueOf(integer);
                    }
                })
                .subscribe(new SingleObserver<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onSuccess(String integer) {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }
                });

进入map方法,让后会返回一个新创建的SingleMap。

 public final <R> Single<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new SingleMap<T, R>(this, mapper));
}

在SingleMap类当中,subscribeActual中上游source订阅一个新的观察者MapSingleObserver,source.subscribe(new MapSingleObserver<T, R>(t, mapper)),这样就是让SingleJust去调用它自己的subscribeActual(),这样整个过程启动了。MapSingleObserver包裹一个我们创建的SingleObserver(t),看MapSingleObserver类,内部做的就是一个桥接和自己apply的工作。代码和整体流程图在下面。

public final class SingleMap<T, R> extends Single<R> {
    final SingleSource<? extends T> source;

    final Function<? super T, ? extends R> mapper;

    public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
        this.source = source;
        this.mapper = mapper;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super R> t) {
        source.subscribe(new MapSingleObserver<T, R>(t, mapper));
    }

    static final class MapSingleObserver<T, R> implements SingleObserver<T> {

        final SingleObserver<? super R> t;

        final Function<? super T, ? extends R> mapper;

        MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
            this.t = t;
            this.mapper = mapper;
        }

        @Override
        public void onSubscribe(Disposable d) {
            t.onSubscribe(d);
        }

        @Override
        public void onSuccess(T value) {
            R v;
            try {
                v = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                onError(e);
                return;
            }

            t.onSuccess(v);
        }

        @Override
        public void onError(Throwable e) {
            t.onError(e);
        }
    }
}

带map操作符的流程图.png
Disposable:作用主要就是让上游停止工作。实现方式主要有两种方式:桥接,替换。这两种方式我会具体讲。用法一般就是全局定义一个Disposable,在onSubscribe(Disposable d)中获得disposable对象,在生命周期的onDestroy中去丢弃。
1. Single.just()没有延迟没有后续,直接传一个已经丢弃的丢弃对象,相当于传了一个没有用的对象。
public final class SingleJust<T> extends Single<T> {

    @Override
    protected void subscribeActual(SingleObserver<? super T> s) {
        s.onSubscribe(Disposables.disposed());
        s.onSuccess(value);
    }

}
2. delay(),有的延迟丢弃,看看是如何完成的。
    public final Single<T> delay(long time, TimeUnit unit, boolean delayError) {
        return delay(time, unit, Schedulers.computation(), delayError);
    }

创建返回一个SingleDelay

    public final Single<T> delay(final long time, final TimeUnit unit, final Scheduler scheduler, boolean delayError) {
        ObjectHelper.requireNonNull(unit, "unit is null");
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new SingleDelay<T>(this, time, unit, scheduler, delayError));
    }

进入SingleDelay类,重点还是看subscribeActual,创建一个SequentialDisposable,通过 s.onSubscribe(sd)方法,让下游的SingleObserver能够拿到sd。上游source,用subscribe(new Delay(sd, s))进行启动事件。重点看一下Delay类,当上游调用onSubscribe(Disposable d)方法时,sd.replace(d)将sd替换成上游的d。当上游调用onSuccess(final T value)方法时, sd.replace(scheduler.scheduleDirect(new OnSuccess(value), time, unit))将sd替换成在执行延迟的disposable。同样onError也是一样。
总结来说,delay用替换的方式去传递丢弃事件disposable。

public final class SingleDelay<T> extends Single<T> {

    final SingleSource<? extends T> source;
    final long time;
    final TimeUnit unit;
    final Scheduler scheduler;
    final boolean delayError;

    public SingleDelay(SingleSource<? extends T> source, long time, TimeUnit unit, Scheduler scheduler, boolean delayError) {
        this.source = source;
        this.time = time;
        this.unit = unit;
        this.scheduler = scheduler;
        this.delayError = delayError;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super T> s) {

        final SequentialDisposable sd = new SequentialDisposable();
        s.onSubscribe(sd);
        source.subscribe(new Delay(sd, s));
    }

    final class Delay implements SingleObserver<T> {
        private final SequentialDisposable sd;
        final SingleObserver<? super T> s;

        Delay(SequentialDisposable sd, SingleObserver<? super T> s) {
            this.sd = sd;
            this.s = s;
        }

        @Override
        public void onSubscribe(Disposable d) {
            sd.replace(d);
        }

        @Override
        public void onSuccess(final T value) {
            sd.replace(scheduler.scheduleDirect(new OnSuccess(value), time, unit));
        }

        @Override
        public void onError(final Throwable e) {
            sd.replace(scheduler.scheduleDirect(new OnError(e), delayError ? time : 0, unit));
        }

        final class OnSuccess implements Runnable {
            private final T value;

            OnSuccess(T value) {
                this.value = value;
            }

            @Override
            public void run() {
                s.onSuccess(value);
            }
        }

        final class OnError implements Runnable {
            private final Throwable e;

            OnError(Throwable e) {
                this.e = e;
            }

            @Override
            public void run() {
                s.onError(e);
            }
        }
    }
}
带延迟的disposable流程.png
3. subscribeOn,observeOn线程转换操作符,subscribeOn是切换上游的线程,observeOn是切换下游的线程。

不断跟进SubscribeOn(),最后会到SingleSubscribeOn类中,看一下subscribeActual方法,scheduler.scheduleDirect(parent)切线执行上游任务。如果没有线程的再次切换,后续任务将在这个线程中一直执行下去。

public final class SingleSubscribeOn<T> extends Single<T> {
    final SingleSource<? extends T> source;

    final Scheduler scheduler;

    public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s, source);
        s.onSubscribe(parent);

        Disposable f = scheduler.scheduleDirect(parent);

        parent.task.replace(f);

    }

    static final class SubscribeOnObserver<T>
    extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable, Runnable {

        private static final long serialVersionUID = 7000911171163930287L;

        final SingleObserver<? super T> actual;

        final SequentialDisposable task;

        final SingleSource<? extends T> source;

        SubscribeOnObserver(SingleObserver<? super T> actual, SingleSource<? extends T> source) {
            this.actual = actual;
            this.source = source;
            this.task = new SequentialDisposable();
        }

        @Override
        public void onSubscribe(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }

        @Override
        public void onSuccess(T value) {
            actual.onSuccess(value);
        }

        @Override
        public void onError(Throwable e) {
            actual.onError(e);
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
            task.dispose();
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        @Override
        public void run() {
            source.subscribe(this);
        }
    }

}

同样不断跟进observeOn(),最后进入SingleObserveOn类,查看subscribeActual()方法,source.subscribe(new ObserveOnSingleObserver<T>(s, scheduler))具体查看ObserveOnSingleObserver,在onSuccess方法中,进行线程切换scheduler.scheduleDirect(this)。所以observeOn用来切换下游方法。

public final class SingleObserveOn<T> extends Single<T> {

    final SingleSource<T> source;

    final Scheduler scheduler;

    public SingleObserveOn(SingleSource<T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super T> s) {
        source.subscribe(new ObserveOnSingleObserver<T>(s, scheduler));
    }

    static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable, Runnable {
        private static final long serialVersionUID = 3528003840217436037L;

        final SingleObserver<? super T> actual;

        final Scheduler scheduler;

        T value;
        Throwable error;

        ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
            this.actual = actual;
            this.scheduler = scheduler;
        }

        @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.setOnce(this, d)) {
                actual.onSubscribe(this);
            }
        }

        @Override
        public void onSuccess(T value) {
            this.value = value;
            Disposable d = scheduler.scheduleDirect(this);
            DisposableHelper.replace(this, d);
        }

        @Override
        public void onError(Throwable e) {
            this.error = e;
            Disposable d = scheduler.scheduleDirect(this);
            DisposableHelper.replace(this, d);
        }

        @Override
        public void run() {
            Throwable ex = error;
            if (ex != null) {
                actual.onError(ex);
            } else {
                actual.onSuccess(value);
            }
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    }
}

箭头颜色代表线程


线程切换.png

三件套缺一不可:

OkHttp从使用到原理
Retrofit从使用到原理
RxJava从使用到原理

真诚推荐

下面的三篇文章我觉的分析的很好,特别是下面这张图流程很到位。
基本流程及Rxjava中的设计模式
线程切换subscribeOn
线程切换observerOn

线程切换subscribeOn
上一篇下一篇

猜你喜欢

热点阅读