AndroidAndroid开发Android开发

RxJava 2.x 源码分析(二) 之 FlatMap

2018-05-11  本文已影响21人  zYoung_Tang

FlatMap

image

RxJava使用了观察者模式,封装了很多ObservableObserver,针对不同的操作符的调用会用对应的ObservaEbleObserver实现。

根据源码Observable发射的事件都是有序的,使用FlatMap时由事件转换的被观察者也是有序地发射自己的事件,我们可以猜测:


为了证实我们的猜测,我们先具体简单的例子:

public static void main(String args[]) {
    Observable
            .create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                    e.onNext(1);
                    e.onNext(2);
                    e.onNext(3);
                    e.onComplete();
                }
            })
            .flatMap(new Function<Integer, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(Integer integer) throws Exception {
                    return Observable.just(
                            "item " + integer + " sub-item " + 1 + " Observable Thread: " + Thread.currentThread().getName()
                            , "item " + integer + " sub-item " + 2 + " Observable Thread: " + Thread.currentThread().getName()
                            , "item " + integer + " sub-item " + 3 + " Observable Thread: " + Thread.currentThread().getName());
                }
            })
            .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(String s) {
                    System.out.println(s + " Observer Thread: " + Thread.currentThread().getName());
                }

                @Override
                public void onError(Throwable e) {
                }

                @Override
                public void onComplete() {
                }
            });
}

运行结果:

item 1 sub-item 1 Observable Thread: main Observer Thread: main
item 1 sub-item 2 Observable Thread: main Observer Thread: main
item 1 sub-item 3 Observable Thread: main Observer Thread: main
item 2 sub-item 1 Observable Thread: main Observer Thread: main
item 2 sub-item 2 Observable Thread: main Observer Thread: main
item 2 sub-item 3 Observable Thread: main Observer Thread: main
item 3 sub-item 1 Observable Thread: main Observer Thread: main
item 3 sub-item 2 Observable Thread: main Observer Thread: main
item 3 sub-item 3 Observable Thread: main Observer Thread: main

阅读源码之前我们需要知道,RxJava 内部包装了很多ObservableObserver,用FlatMap实现该例子的实现方法不止一种,使用的操作符也可以不一样,所以运行时调用到的ObservableObserver不一定相同,所以下面阅读的源码路径只是基于这个例子。

//Observable.java
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

上一篇源码分析一样,内部创建的是ObservableCreate对象,ObservableOnSubscribe是一个提供subscribe()作为约定函数的接口。

//Observable.java
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
        return flatMap(mapper, false);
}

mapper 代表的是我们的刚才创建的Function对象,这里有函数式编程的味道(如果用过Kotlin的同学就更加熟悉了),相当于把我们写的函数作为一个参数。

false 表示异常是否需要延迟到所有内部被观察者都结束后才抛出。

继续进下一层

//Observable.java
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors) {
        return flatMap(mapper, delayErrors, Integer.MAX_VALUE);
}

maxConcurrency: 最大并发数

继续进下一层

//Observable.java
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
        return flatMap(mapper, delayErrors, maxConcurrency, bufferSize());
}

bufferSize :缓存所有子被观察者的事件加起来总数大小

继续进下一层

//Observable.java
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
    // 检查参数是否合法
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    if (this instanceof ScalarCallable) { //false
        @SuppressWarnings("unchecked")
        T v = ((ScalarCallable<T>)this).call();
        if (v == null) {
            return empty();
        }
        return ObservableScalarXMap.scalarXMap(v, mapper);
    }
    // ObservableCreate 没有实现 ScalarCallable,所以走这里
    return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}

这里返回一个 ObservableFlatMap 对象,实际上ObservableFlatMap包装了ObservableCreate并把ObservableCreate对象作为常量source,那么flatMap()到这里结束了。

//Observable.java
public final void subscribe(Observer<? super T> observer) {
    // 检查观察者是否为空
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        // 调用 hook 方法
        observer = RxJavaPlugins.onSubscribe(this, observer);
        // 检查调用 hook 的观察者是否为空
        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
        // 实际订阅操作在这个方法里
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
     ...
    }
}

下面看看 subscribeActual:

// Observable.java
protected abstract void subscribeActual(Observer<? super T> observer);

居然是一个抽象方法,在哪里被实现了?机智的我们反应过来了,具体实现在flatMap()返回的ObservableFlatMap中:

// ObservableFlatMap.java
public ObservableFlatMap(ObservableSource<T> source,
        Function<? super T, ? extends ObservableSource<? extends U>> mapper,
        boolean delayErrors, int maxConcurrency, int bufferSize) {
    super(source);
    this.mapper = mapper;
    this.delayErrors = delayErrors;
    this.maxConcurrency = maxConcurrency;
    this.bufferSize = bufferSize;
}
    
@Override
public void subscribeActual(Observer<? super U> t) {
    // source : ObservableCreate对象
    // t : 观察者
    // mapper : 之前我们写的 mapper 函数
    if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) { // false
        return;
    }
    // 本次运行走这里,调用 ObservableCreate 的 subscribe() 并用    
    // MergeObserver 包装了我们写的 observer
    source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}

可以说ObservableFlatMapFlatMap来说是比较重要的类,里面包含了许多重要逻辑。

调用 ObservableCreatesubscribe() 之前我们先看看 MergeObserver 的构造方法:

// ObservableFlatMap.java
MergeObserver(Observer<? super U> actual, Function<? super T, ? extends ObservableSource<? extends U>> mapper,
                boolean delayErrors, int maxConcurrency, int bufferSize) {
    this.actual = actual; // 我们写的 observer
    this.mapper = mapper; 
    this.delayErrors = delayErrors;
    this.maxConcurrency = maxConcurrency; // Integer.MAX_VALUE
    this.bufferSize = bufferSize;
    if (maxConcurrency != Integer.MAX_VALUE) {
        sources = new ArrayDeque<ObservableSource<? extends U>>(maxConcurrency);
    }
    // 创建一个原子性的内部观察者对象数组
    this.observers = new AtomicReference<InnerObserver<?, ?>[]>(EMPTY);
}

由于ObservableCreate没有覆写 subscribe(),所以实际上调用的是父类Observablesubscribe()且源码上面已经贴过,可以直接跳过进入ObservableCreatesubscribeActual()中:

// ObservableCreate.java
@Override
protected void subscribeActual(Observer<? super T> observer) {
    // 创建发射器,这里先忽略发射器内部实现,只需要知道发射器主要用来回调观察者的
    // onNext onComplete onError 方法
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    // 回调 MergeObserver 的 onSubscribe() 
    // 再由其回调我们写的 observer 的 onSubscribe()
    observer.onSubscribe(parent);

    try {
        // source : 我们创建的 ObservableOnSubscribe 对象
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

下面走到了我们写的subcribe()逻辑里,我们调用了发射器的 onNext(),看看发射器的源码:

@Override
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    // 检测是否切断
    // 如果切断了被观察者就接收不到后续的事件了
    if (!isDisposed()) {
        // observer : MergeObserver 对象
        // 回调观察者的 onNext()
        observer.onNext(t);
    }
}

还记得我们写的 observer 对象被包装成了MergeObserver,那么进入MergeObserveronNext():

// ObservableFlatMap.java
@Override
public void onNext(T t) {
    // safeguard against misbehaving sources
    if (done) {
        return;
    }
    ObservableSource<? extends U> p;
    try {
        // 把我们的 mapper 方法里面返回的子被观察者提取出来,
        // 由于我们当初用的是 .just() 创建子被观察者,
        // 所以子被观察者是 ObservableFromArray 对象,
        // 这里先忽略 just() 内部实现
        p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        s.dispose();
        onError(e);
        return;
    }

    if (maxConcurrency != Integer.MAX_VALUE) { // false
        synchronized (this) {
            if (wip == maxConcurrency) {
                sources.offer(p);
                return;
            }
            wip++;
        }
    }
    // 进入下面的函数
    subscribeInner(p);
}
    
    
void subscribeInner(ObservableSource<? extends U> p) {
    for (;;) {
        // p : ObservableFromArray 对象
        if (p instanceof Callable) { //false
            tryEmitScalar(((Callable<? extends U>)p));

            if (maxConcurrency != Integer.MAX_VALUE) {
                synchronized (this) {
                    p = sources.poll();
                    if (p == null) {
                        wip--;
                        break;
                    }
                }
            } else {
                break;
            }
        } else {
            // ObservableFromArray 没有实现 Callable 接口,所以走这里
            InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
            if (addInner(inner)) {
                p.subscribe(inner); // 飙车入口
            }
            break;
        }
    }
}
    
// 这里主要做内部观察者对象数组的增加
// 通过创建size为原数组长度+1的新数组并作为新的内部观察者对象数组来实现
boolean addInner(InnerObserver<T, U> inner) {
    for (;;) {
        // 获取之前 MergeObserver 创建的内部观察者对象数组
        InnerObserver<?, ?>[] a = observers.get();
        if (a == CANCELLED) {
            inner.dispose();
            return false;
        }
        int n = a.length; // 0
        InnerObserver<?, ?>[] b = new InnerObserver[n + 1];
        System.arraycopy(a, 0, b, 0, n);
        b[n] = inner;
        if (observers.compareAndSet(a, b)) {
            return true;
        }
    }
}
        

看到这里我们应该到了 p.subscribe(inner) ,又调用Observablesubscribe()?到现在我们好像已经调用了好几次了,证明这两个东西都被包装好几层了,前面我提到过RxJava包装了许多ObservableObserver,配合观察者模式一层一层地地传递事件下去,这是 RxJava 的其中一个奥妙之处。

下面整个过程我们会一直在 ObservableFromArray InnerObserver ObservableFlatMap MergeObserver的方法调用中飙车,可能会感到不适。

我们直接跳过进入从ObservableFromArraysubscribeActual()开始看,这里会有很多的跳转不方便一步步展示,所有相关代码调用顺序和注释都在下面:

// ObservableFromArray.java
public final class ObservableFromArray<T> extends Observable<T> {
    final T[] array;
    public ObservableFromArray(T[] array) {
        this.array = array;
    }
    @Override
    public void subscribeActual(Observer<? super T> s) {
        // [1]
        FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
        // s : InnerObserver 对象
        // array : 我们创建子被观察者时调用 .just() 方法生成的String数组
        // [4]
        s.onSubscribe(d);
    
        // [9]
        if (d.fusionMode) { // true 看到这里结束了 onNext() 所有操作
            return;
        }

        d.run();
    }

    // [2]
    static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {
        
        final Observer<? super T> actual; // InnerObserver 对象
        
        final T[] array;
        
        int index;
        
        boolean fusionMode;
        
        volatile boolean disposed;
        // [3]
        FromArrayDisposable(Observer<? super T> actual, T[] array) {
        this.actual = actual;
        this.array = array;
        }
...
        // [7]
        @Override
        public int requestFusion(int mode) {
            if ((mode & SYNC) != 0) { //true
                fusionMode = true;
                return SYNC;
            }
                return NONE;
        }
    }
}

InnerObserver:

// ObservableFlatMap.java
static final class InnerObserver<T, U> extends AtomicReference<Disposable>
    implements Observer<U> {

    private static final long serialVersionUID = -4606175640614850599L;
    final long id;
    final MergeObserver<T, U> parent;

    volatile boolean done;
    volatile SimpleQueue<U> queue;

    int fusionMode;

    InnerObserver(MergeObserver<T, U> parent, long id) {
        this.id = id;
        this.parent = parent;
    }
    
    // [5]
    @Override
    public void onSubscribe(Disposable s) {
        if (DisposableHelper.setOnce(this, s)) { // true
            if (s instanceof QueueDisposable) {  // true
                @SuppressWarnings("unchecked")
                // s : FromArrayDisposable 对象
                QueueDisposable<U> qd = (QueueDisposable<U>) s;
                // 获取合并的标记 这里返回同步标记 SYNC
                // [6]
                int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
                if (m == QueueDisposable.SYNC) { // true
                    fusionMode = m;
                    queue = qd;
                    done = true;
                    // parent: MergeObserver
                    // 把 MergeObserver 的所有事件都发送完毕
                    // [8]
                    parent.drain();
                    return;
                }
                if (m == QueueDisposable.ASYNC) {
                    fusionMode = m;
                    queue = qd;
                }
            }
        }
    }        
    ...
}

MergeObserver:

// [9]
void drain() {
    if (getAndIncrement() == 0) {
        drainLoop();
    }
}
// [10]
// 这里利用之前创建的具有原子性的内部观察者数组发射子被观察者的所有事件
void drainLoop() {
 ...
}

从上面的源码主要过程是被观察者的单次调用onNext()发射的事件变成一个子被观察者且将其事件都发射给观察者,然后执行下一个onNext()重新走一遍上述代码或者进入其他回调方法,所以整个过程都在同一个线程中且同步执行的,事件的顺序是有序的。


无序事件

修改下flatMap()代码,实现无序事件:

.flatMap(new Function<Integer, ObservableSource<String>>() {
    @Override
    public ObservableSource<String> apply(Integer integer) throws Exception {
        final Integer i = integer;
        Observable observable_create = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext(i + "-" + 1 + " Observable: " + Thread.currentThread().getName());
                e.onNext(i + "-" + 2 + " Observable: " + Thread.currentThread().getName());
                e.onNext(i + "-" + 3 + " Observable: " + Thread.currentThread().getName());
                e.onComplete();
            }
        });
        Observable observable_just = Observable.just(
                integer + "-" + 1 + " Observable: " + Thread.currentThread().getName(),
                integer + "-" + 2 + " Observable: " + Thread.currentThread().getName(),
                integer + "-" + 3 + " Observable: " + Thread.currentThread().getName());

        return observable_create
                .subscribeOn(Schedulers.newThread());
                // .delay((int(Math.random()*1000),TimeUnit.MILLISECONDS);
    }
}) 

运行结果:

03-16 10:56:09.255 12781-12799/com.example.myapplication E/RxJava: 1-1 Observable: RxCachedThreadScheduler-1 Observer :RxCachedThreadScheduler-1
03-16 10:56:09.255 12781-12799/com.example.myapplication E/RxJava: 1-2 Observable: RxCachedThreadScheduler-1 Observer :RxCachedThreadScheduler-1
03-16 10:56:09.255 12781-12799/com.example.myapplication E/RxJava: 1-3 Observable: RxCachedThreadScheduler-1 Observer :RxCachedThreadScheduler-1
03-16 10:56:09.275 12781-12800/com.example.myapplication E/RxJava: 2-1 Observable: RxCachedThreadScheduler-2 Observer :RxCachedThreadScheduler-2
03-16 10:56:09.305 12781-12800/com.example.myapplication E/RxJava: 2-2 Observable: RxCachedThreadScheduler-2 Observer :RxCachedThreadScheduler-2
03-16 10:56:09.305 12781-12800/com.example.myapplication E/RxJava: 3-1 Observable: RxCachedThreadScheduler-3 Observer :RxCachedThreadScheduler-2
03-16 10:56:09.305 12781-12800/com.example.myapplication E/RxJava: 3-2 Observable: RxCachedThreadScheduler-3 Observer :RxCachedThreadScheduler-2
03-16 10:56:09.305 12781-12800/com.example.myapplication E/RxJava: 3-3 Observable: RxCachedThreadScheduler-3 Observer :RxCachedThreadScheduler-2
03-16 10:56:09.305 12781-12800/com.example.myapplication E/RxJava: 2-3 Observable: RxCachedThreadScheduler-2 Observer :RxCachedThreadScheduler-2

无序事件的子被观察者不再由.just()方法创建,而是.create()代替,原因是由于ObservableFromArray实现逻辑不能在日志中直接明确的显示子被观察者发送事件是在子线程进行的。

observable_just 作为子被观察者的运行结果:

03-16 11:10:55.381 13418-13437/? E/RxJava: 1-1 Observable: main Observer :RxCachedThreadScheduler-1
03-16 11:10:55.381 13418-13437/? E/RxJava: 1-2 Observable: main Observer :RxCachedThreadScheduler-1
03-16 11:10:55.381 13418-13437/? E/RxJava: 1-3 Observable: main Observer :RxCachedThreadScheduler-1
03-16 11:10:55.391 13418-13437/? E/RxJava: 3-1 Observable: main Observer :RxCachedThreadScheduler-1
03-16 11:10:55.391 13418-13437/? E/RxJava: 3-2 Observable: main Observer :RxCachedThreadScheduler-1
03-16 11:10:55.391 13418-13437/? E/RxJava: 3-3 Observable: main Observer :RxCachedThreadScheduler-1
03-16 11:10:55.401 13418-13438/? E/RxJava: 2-1 Observable: main Observer :RxCachedThreadScheduler-2
03-16 11:10:55.401 13418-13438/? E/RxJava: 2-2 Observable: main Observer :RxCachedThreadScheduler-2
03-16 11:10:55.401 13418-13438/? E/RxJava: 2-3 Observable: main Observer :RxCachedThreadScheduler-2

由于调用just()的时候已经在当前线程(默认主线程)把事件都准备好了,再在子线程中发射出去,所以日志上打印的是主线程


总结:

无序事件涉及到 RxJava 切换线程的操作,具体实现以及源码分析将会留到下一章中介绍。

上一篇下一篇

猜你喜欢

热点阅读