RxJava浅析二——map操作原理

2017-11-30  本文已影响7人  青象

我之前写过RxJava1.x中Map操作的原理,然而时隔半年,我已经全然忘记RxJava1.x是怎么实现的了,唯一的印象是挺复杂的。

当时可能也是浅尝辄止,对其理解不是那么深,过一段时间决定就忘记了。所以把这些现在理解的东西记录下来是很有必要的。

所以,蹬蹬瞪蹬……,RxJava2版本的Map原理闪亮登场。

本文是基于RxJava浅析——事件如何从上游传递到下游。建议先看下这篇。

先上代码,没错,本文就是要分析下这段代码的执行过程。跟上一篇分析区别就在与create()subscribe()之间多了一个map操作。

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
            }
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(@NonNull Integer integer) throws Exception {
                return String.valueOf(integer);
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                
            }

            @Override
            public void onNext(String s) {
                Log.i(TAG, "onNext: " + s);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError:", e);
            }

            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete");
            }
        });

我们知道Observable.create()会返回一个ObservableCreate对象。ObservableCreateObservable的子类。记住这是我们第一个碰到的具体的Observable实现类。

那么核心代码就在这里了:

//Observable的map方法。ObservableCreate.map使用的是父类(Observable)的实现
//不是源码,去掉了一些空检查等
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        return new ObservableMap<T, R>(this, mapper);
}
//附上Observable.create()方法对比一下
//代码位置:io.reactivex.Observable.java
//不是源码,是我简化后的代码。源码还有空检查,Hook方法的回调等。这些不影响整个逻辑,所以可以先忽略。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        return new ObservableCreate<T>(source);
}

呀呀呀,这段代码是不是很熟悉,跟create()方法一样的套路啊。只是这个返回的是一个ObservableMap,这个ObservableMap也是Observable的子类,这是我们碰到的第二个具体的Observable实现类。

来看看构造方法中看了啥。

final Function<? super T, ? extends U> function;

public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
    super(source);//其实就是在父类中保存source,不用在乎这个super。
    this.function = function;//我们外部实现的Function接口。
}

呀呀呀,也只是保存了一下sourcefunction。套路还是一样的。注意注意,这个source应该是create()方法返回的ObservableCreate对象哦~

奥,解释一下这个Function也是一个接口。

public interface Function<T, R> {
    /**
     * Apply some calculation to the input value and return some other value.
     * @param t the input value
     * @return the output value
     * @throws Exception on error
     */
    R apply(@NonNull T t) throws Exception;
}

这个接口中只有一个apply()方法,接收T类型参数,返回R类型参数。在我们的例子里就是接收Integer类型的参数,返回String类型的参数。妥妥的,就是一个map操作嘛。把Integer类型转换成了String类型。怎么转换?当然是我们自己实现Function接口。在例子里只是简单地把Integer转换成了String而已。

所以到此为止,啥也没发生。

关键当然还是把上下游连接起来的subscribe(Observer)方法。

map操作时候返回的是ObservableMap对象,所以subscribe(Observer)方法最后会调到ObservableMap.subscribeActual()方法。这个还不清楚的,参见“RxJava浅析——事件如何从上游传递到下游”

@Override
public void subscribeActual(Observer<? super U> t) {
    source.subscribe(new MapObserver<T, U>(t, function)); //Point 0
}

这货干了两件事情:

  1. 创建了MapObserver
  2. 调用了source.subscribe()方法,并把创建出来的MapObserver传入。

先来讲第一件事情,这是我们除了自己创建的Observer之外,第一次遇到RxJava2内部创建的Observer对象。MapObserverObservableMap的静态内部类。

static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
    MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);//Point 1 
            this.mapper = mapper;//Point 2
     }
}

可以看到这个MapObserver继承BasicFuseableObserver, 可以猜到其父类肯定是继承Observer.

public abstract class BasicFuseableObserver<T, R> implements Observer<T>, QueueDisposable<R>

这个QueueDisposable可以就理解成Disposable。我们暂时不扯那么远。

回到Point 1,这句话就是把我们外部的Observer给保存下来。

Point 2,这句话就是把我们的function保存下来。

重点来了!

看Point 0处的代码,这个source是我们的ObservableCreate,这里的入参是刚刚创建的ObserverMap。所以RxJava内部又发生了我们在“RxJava浅析——事件如何从上游传递到下游”所分析的事情。

还记得么?这个subscribe()方法最终调到ObservableCreate.subscribeActual()方法。再来看一遍:

@Override
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);//Point 3
    try {
        source.subscribe(parent); //Point 4
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

这里就很熟悉啦。只是这里的observer是RxJava内部帮我们创建的ObserverMap

Point 3 会调到ObserverMap.onSubscribe()

//代码在BasicFuseableObserver中,非源码。去掉其他的一些代码
@Override
public final void onSubscribe(Disposable s) {
    this.s = s;
    actual.onSubscribe(this);
 }

这个actual就是我们在创建ObserverMap时传入的,就是我们外部实现的Observer。可以把this传出去,是因为ObserverMap本身也是Disposable接口的实现类。当然入参s(实际上是CreateEmitter)也会保存下来。

Point 4中会调到我们外部实现的ObservableOnSubscribe接口的subscribe()。即执行事件发送。再提一下,这个source是在创建ObservableCreate时保存的。

所以呢,当我们使用CreateEmitter发送数据时: e.onNext(1)

实际上也是先调到CreateEmitteronNext(1),他调到他自己保存的ObserveronNext(1),在我们的例子里这个就是ObserverMap。所以我们来看ObserverMaponNext方法。

//代码位置:ObserverMap的onNext方法。
final Function<? super T, ? extends U> mapper;
U v;
try {
    v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
    } catch (Throwable ex) {
        fail(ex);
         return;
    }
   actual.onNext(v);

关键点在于mapper.apply(t),这句话把T类型的对象转换成了U类型的对象,也就是我们例子中把Integer类型的对象转换成了String类型。requireNonNull只是对转换后的对象进行了空检查。因为这个转换是我们外部提供的实现,并不能保证非空。

最后再调用actual.onNext(v)就是调到了我们提供的ObserveronNext方法,此时类型已经发生了转换。

以上就是map操作的原理啦。对onErroronComplete事件其实没有起什么作用。

最后用一个图来解释:

map操作原理图.png

没有Map操作的时候

增加Map操作的时候事件流向

上一篇下一篇

猜你喜欢

热点阅读