Android开发规范技巧Android开发Android开发经验谈

【RxJava】- 创建操作符源码分析

2020-03-23  本文已影响0人  拔萝卜占坑

【RxJava】- 变换操作符源码分析
【RxJava】- 过滤操作符源码分析
【RxJava】- 结合操作符源码分析
【RxJava】- 连接操作符源码分析

简介

一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。简单一点就是创建一个事件,注册一个观察者,当事件发生改变时,及时通知观察者。同时RxJava可以把一序列的异步事件按照一定规则组合成新的事件序列。

RxAndroid里面就几个类,是在RxJava基础上针对Android开发封装了一些使用方法而已。

参考

GitHub:RxJava
GitHub:RxAndroid
ReactiveX:reactivex
中文文档地址:ReactiveX/RxJava文档中文版

版本

implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
implementation 'io.reactivex.rxjava3:rxjava:3.0.0'

订阅

调度器

下面将大体介绍一下RaJava里面的调度器(Scheduler)。Schedulers类用于返回标准Scheduler实例的静态工厂方法。

Observable

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
   if (f != null) { return apply(f, source);}
   return source;
 }

下面基本都是在变量“f”等于null情况下分析。

Creating Observables

创建操作

create

返回ObservableCreate实例

Observable.create((ObservableOnSubscribe<Integer>) emitter -> {            
    try {
        if (!emitter.isDisposed()) {
              for (int i = 1; i < 5; i++) {emitter.onNext(i);}
              emitter.onComplete();
         }
    } catch (Exception e) {
                emitter.onError(e);
    }}.subscribe(new Observer<Integer>() {...}

看一下create方法

public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
   Objects.requireNonNull(source, "source is null");
   return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}

看一下onAssembly方法

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}

onObservableAssembly默认是null,所以create最终返回的是持有被观察者的source(ObservableCreate类型)。

然后调用ObservableCreate对象的subscribe方法,并传入观察者实例,在subscribe方法中继续调用ObservableCreate的subscribeActual方法并传入观察者实例observer。

看一下subscribeActual方法做了什么

protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<>(observer);
    observer.onSubscribe(parent);
    try {
       source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

创建事件发射器CreateEmitter并传入观察者实例observer,调用onSubscribe方法,通知观察者,观察关系已经准备好,并传入事件发射器,以便观察者可以主动选择放弃对事件通知的接受。

在被观察者中,通过调用CreateEmitter中的onNext,onError,onComplete等方法,如果事件观察没有被取消,那么会调用观察者(observer)中对应的方法来通知观察者。

defer

返回ObservableDefer实例

延期创建被观察者对象,直到有观察者订阅时才创建Observable,并且为每个观察者创建一个新的Observable。

调用defer传入的实例(Supplier类型)的get()方法,获取被观察者对象(ObservableSource类型)

Observable.defer((Supplier<ObservableSource<Integer>>) () -> observer -> observer.onNext(1)).subscribe(...)

具体的可以看一下ObservableDefer这个类,至于subscribe方法怎么传参数,大致看一下调用逻辑就明白了。

Empty/Never/Throw
From

数据转换,实现有:fromAction,fromArray,fromCallable,fromCompletable,fromFuture,fromFuture,fromIterable,fromMaybe,fromPublisher,fromRunnable,fromSingle,fromSupplier。对应Observable实现请自己查看源码,里面代码不多,阅读完基本知道怎么用。

Interval

创建一个按固定时间间隔发射整数序列的Observable,返回ObservableInterval实例,同时会默认传入一个ComputationScheduler实例作为调度器。从0开始自加,每发射一次加1。

interval(long initialDelay, long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler)
Just

创建一个发射指定值的Observable,可以传入多个参数,如果传入一个参数返回ObservableJust实例,多个返回ObservableFromArray实例,前者把传入的一个参数原封不动发射后就完成,后者需要把数组中的数据一个一个原封不动发射后完成,前者完成可以看成是后者的一种特殊情况。

Range

创建一个发射特定整数序列的Observable,参数如果是int返回ObservableRange实例,如果是long返回ObservableRangeLong实例。

这个和上面的interval差不多,不同的interval从0开始,如果不取消,就一直发射,而Range只发射自定参数范围的整数,发射完就停止,发射一次自加1。比如range(1,3)将收到1,2,3.

Repeat

创建一个发射特定数据重复多次的Observable,返回ObservableRepeat实例。比如对just操作做重复操作:

Observable.just(1,3).repeat().subscribe(...)

repeat可接收一个参数用作重复次数。

Start

返回一个Observable,它发射一个类似于函数声明的值。

Timer

创建一个Observable,它在一个给定的延迟后发射一个特殊的值,返回ObservableTimer实例。

Transforming Observables

转换操作

Buffer
Observable.create(emitter -> {}).buffer(1)

定期收集Observable的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值。

总结

首先需要了解,RxJava被观察者和观察者之间的调度流程,个一定要清楚,这样对分析RxJava的操作符源码很有帮助,否则你将会陷入代码无尽的调用中。

Observable的subscribeActual就是做中转作用,调用到下一个ObservableOnSubscribe的subscribe方法中。

上一篇 下一篇

猜你喜欢

热点阅读