RxJava2.0操作符flatmap源码分析

2018-08-15  本文已影响117人  zuoweitan

RxJava 2.0简介

关于RxJava到底是什么,我们可以看看它的开发者是如何描述它的:
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.

它是通过使用可观察序列来组件异步和基于事件程序的库,它使用了观察这模式来支持事件与数据的序列化,并且提供了一些列的操作符,你可以用这些操作符来将各种序列显式的组合起来,并且它可以帮你把异步线程、线程同步、线程和并发的数据结构中抽象出来,让你远离那些实现细节。

嗯,没错,这就是RxJava,一个啥都不需要你考虑,只需要把你想做的事务和要处理的数据组合成操作序列,剩下的就交给RxJava就好了。

当然,不是说RxJava能做到的,其他库或者我们自己就不能做到,那为什么还要使用RxJava呢?(其实我不喜欢使用RxJava)这里我挑一些别人说的理由:

  1. 它是一种函数响应式编程,显然函数响应式编程的好处它都有(不知道函数响应式编程的自己查资料)

  2. 逻辑清晰,使用简洁(是不是这样呢?谁用谁知道)

而我不喜欢的理由:

我也不是不喜欢RxJava,只是觉得有时候只是为了一个异步任务就动不动RxJava,有点杀鸡用牛刀的感觉。这些轻量级别的任务,我还是喜欢用bolts-task(强推)。这里贴一个科普链接:https://blog.csdn.net/u010295885/article/details/52463039

操作符flatmap的简单介绍与应用

flatmap是RxJava众多操作符中的一个,flat字面含义就是"压平",当你需要把多个集合整理成一个集合,其实就是一个压平的操作,如下图,这个时候你就可以使用flatmap这个操作符了。


image.png

看一个简单的应用

List<Student> students = new ArrayList<Student>();
students.add(new Student("1", "zhangsan")
    .addCourse(new Course("1", "数学"))
    .addCourse(new Course("5", "计算机")));
students.add(new Student("2", "lisi")
    .addCourse(new Course("2", "语文")));
students.add(new Student("3", "wangwu")
    .addCourse(new Course("3", "英语")));
Observable.fromIterable(students)
    .flatMap(new Function<Student, ObservableSource<Course>>() {

        @Override
        public ObservableSource<Course> apply(Student student) throws Exception {
            return Observable.fromIterable(student.courses);
        }
    })
.subscribe(new Consumer<Course>() {
    @Override
    public void accept(Course course) throws Exception {
        System.out.println(course);
    }
});

应用非常简单,数据源是一些学生,而我最后打印的是每个学生的所选的课程,我们来试试使用map操作符来实现看看:

Observable.fromIterable(students)
    .map(new Function<Student, List<Course>>() {

        @Override
        public List<Course> apply(Student student) throws Exception {
            return student.courses;
        }
    })
    .subscribe(new Consumer<List<Course>>() {
        @Override
        public void accept(List<Course> courses) throws Exception {
            for (Course cours : courses) {
                System.out.println(cours);
            }
        }
    });

比较看下来,相对于flatmap,map操作符只能给我们每个学生所选的所有课程,并不能告诉我们他所选的每一门课都是什么,这就需要我们自己去加一层遍历了。
另外,从比较结果来看,flatmap好像是替我们对学生的课程列表做了一次遍历。嗯,没错,从结果上看它确实是做了,那它是怎么做到的呢?

flatmap源码分析

这里以上面那个简单应用作为一个入口,来看看flatmap如何帮我们把数据压平的。

1. Observable::fromIterable
public static <T> Observable<T> fromIterable(Iterable<? extends T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableFromIterable<T>(source));
}

创建一个ObservableFromIterable对象,构造参数为一个迭代器(在我们的应用就是students)。然后判断是否要进行Hook操作,最后返回一个Observable对象。

2. RxJavaPlugins::onAssembly
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}

这里的f为空,即不需要Hook,所以第一步中会直接返回一个ObservableFromIterable对象。接下来我们调用了flatmap。

3. ObservableFromIterable::flatmap
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
    return flatMap(mapper, false);
}

flatmap被重载了很多次,对于只传了一个映射方法的flatmap方法,最后会调用下面的flatmap

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
        boolean delayErrors, int maxConcurrency, int bufferSize) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    if (this instanceof ScalarCallable) {
        @SuppressWarnings("unchecked")
        T v = ((ScalarCallable<T>)this).call();
        if (v == null) {
            return empty();
        }
        return ObservableScalarXMap.scalarXMap(v, mapper);
    }
    return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}

这里需要关注它几个参数:

  1. mapper:就是student到ObservableSource<Course>的映射方法
  2. delayErrors:false
  3. maxConcurrency:Integer.MAX_VALUE
  4. bufferSize:bufferSize()
    由于不Hook,所以这里直接返回ObservableFlatMap对象,注意这里把我们在第一步中创
    ObservableFromIterable作为构造参数传给了ObservableFlatMap对象。
4.ObservableFlatMap::this
public ObservableFlatMap(ObservableSource<T> source,
        Function<? super T, ? extends ObservableSource<? extends U>> mapper,
        boolean delayErrors, int maxConcurrency, int bufferSize) {
    super(source);
    this.mapper = mapper;
    this.delayErrors = delayErrors;
    this.maxConcurrency = maxConcurrency;
    this.bufferSize = bufferSize;
}

流程很简单,这里要注意这个source最后会赋值给父类的一个名字为source成员。如下

/** The source consumable Observable. */
protected final ObservableSource<T> source;

/**
 * Constructs the ObservableSource with the given consumable.
 * @param source the consumable Observable
 */
AbstractObservableWithUpstream(ObservableSource<T> source) {
    this.source = source;
}

小结,flatmap只做了一件事情,就是把我们最开始的ObservableFromIterable包了一层,构造了一个ObservableFlatMap,然后我们对ObservableFlatMap进行了订阅操作

5. ObservableFlatMap::subscribe

回顾下我们的调用

.subscribe(new Consumer<Course>() {
    @Override
    public void accept(Course course) throws Exception {
        System.out.println(course);
    }
});

调用的是参数类型为Consumer的subscribe方法

public final Disposable subscribe(Consumer<? super T> onNext) {
    return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
        Action onComplete, Consumer<? super Disposable> onSubscribe) {
    ObjectHelper.requireNonNull(onNext, "onNext is null");
    ObjectHelper.requireNonNull(onError, "onError is null");
    ObjectHelper.requireNonNull(onComplete, "onComplete is null");
    ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

    LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

    subscribe(ls);

    return ls;
}

调用了subscribe多参重载方法,注意这里的传参

  1. onNext:就是我们的Consumer
  2. onSubscribe:Functions.emptyConsumer(),它返回一个EmptyConsumer对象,accept不做任何处理
static final class EmptyConsumer implements Consumer<Object> {
    @Override
    public void accept(Object v) { }

    @Override
    public String toString() {
        return "EmptyConsumer";
    }
}

多参的subscribe将我们的入参包装成了一个Observer对象,也就是LambdaObserver来代理了我们的Consumer,然后调用参数类型为Observer的subscribe

public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}

这里RxJavaPlugins.onSubscribe依然不会对observer进行Hook,所以这里最后就是直接调用subscribeActual方法,它是一个抽象方法,每个子类都会有自己的实现,进行真正的订阅操作,这里我们最后调用的就是ObservableFlatMap的subscribeActual方法

6. ObservableFlatMap::subscribeActual
@Override
public void subscribeActual(Observer<? super U> t) {

    if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
        return;
    }

    source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}

还记得这个source是谁吧?就是开始创建的那个ObservableFromIterable对象,这里才开始对我们原始的数据源进行订阅。而且从这里开始,每一次调用subscribe,都会先创建一个新的Observer对象来代理上一个Observer对象,从而形成一种上下游的Flow,这个为后面的onSubscribe做了准备。

小结,subscribe方法与onSubscribe是一对操作,subscribe负责把订阅者一层层包起来,每包一层就增加了一层对原始数据的加工,而onSubscribe则负责把加工的数据一层层往里面传,最后传给我们最初的订阅者。可以结合下图理解


image.png

接下来,就看看原始订阅者的上游MergeObserver

7. MergeObserver::this
MergeObserver(Observer<? super U> actual, Function<? super T, ? extends ObservableSource<? extends U>> mapper,
        boolean delayErrors, int maxConcurrency, int bufferSize) {
    this.downstream = actual;
    this.mapper = mapper;
    this.delayErrors = delayErrors;
    this.maxConcurrency = maxConcurrency;
    this.bufferSize = bufferSize;
    if (maxConcurrency != Integer.MAX_VALUE) {
        sources = new ArrayDeque<ObservableSource<? extends U>>(maxConcurrency);
    }
    this.observers = new AtomicReference<InnerObserver<?, ?>[]>(EMPTY);
}

重点关注几个地方:

  1. downStream:下游,这里就是我们最原始的订阅者
  2. mapper:就是之前我们给flatmap的映射函数
  3. maxConcurrency:就是我们构建ObservableFlatMap所传的值,Integer.MAX_VALUE
  4. observers:InnerObserver类型数组的一个原子引用

这里涉及到了InnerObserver,既然已经登场我们就先来简单了解下

8. InnerObserver
static final class InnerObserver<T, U> extends AtomicReference<Disposable>
implements Observer<U> {

    private static final long serialVersionUID = -4606175640614850599L;
    final long id;
    final MergeObserver<T, U> parent;

    volatile boolean done;
    volatile SimpleQueue<U> queue;

    int fusionMode;

    InnerObserver(MergeObserver<T, U> parent, long id) {
        this.id = id;
        this.parent = parent;
    }

这里暂时只需要知道,它是一个Observer对象,它有个parent指向MergeObserver,我们先回到第6步中source被MergeObserver订阅的地方。这里的source就是我们原始的观察源ObservableFromIterable,我们看看它的subscribe方法

9.ObservableFromIterable::subscribeActual

同样的,subscribe方法是它的基类方法,最后都会调用到子类的subscribeActual

@Override
public void subscribeActual(Observer<? super T> observer) {
    Iterator<? extends T> it;
    try {
        it = source.iterator();
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        EmptyDisposable.error(e, observer);
        return;
    }
    boolean hasNext;
    try {
        hasNext = it.hasNext();
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        EmptyDisposable.error(e, observer);
        return;
    }
    if (!hasNext) {
        EmptyDisposable.complete(observer);
        return;
    }

    FromIterableDisposable<T> d = new FromIterableDisposable<T>(observer, it);
    observer.onSubscribe(d);

    if (!d.fusionMode) {
        d.run();
    }
}

整个流程很简单,source由第1步中知道它就是我们提供的数据源(在我们的实例中就是students),如果source有数据,则走接下来的三步:

  1. 构建FromIterableDisposable,一个Disposable对象,是onSubscribe方法的参数。它的构造参数有两个
    1. observer:这里就是MergeObserver,作为下游保存起来
    2. it:数据源的迭代器
  2. 调用MergeObserver的onSubscribe方法
    如果FromIterableDisposable没有被消费,则执行它的run方法

小结,这里才开始真正的消费数据,两种情况:首先交给onSubscribe消费,如果没有被消费则通过FromIterableDisposable的run方法来消费

10. MergeObserver::onSubscribe
@Override
public void onSubscribe(Disposable d) {
    if (DisposableHelper.validate(this.upstream, d)) {
        this.upstream = d;
        downstream.onSubscribe(this);
    }
}

流程很简单,首先判断是否已经有了上游了,如果有了,则不做任何处理,否则参数d作为自己的上游,赋值给upStream,调用自己下游的onSubscribe方法。这里的downstream就是我们最原始的订阅者Consumer,由第5步可知,它被LambdaObserver代理了,我们看看LambdaObserver的onSubscribe方法

11. LambdaObserver::onSubscribe
public void onSubscribe(Disposable d) {
    if (DisposableHelper.setOnce(this, d)) {
        try {
            onSubscribe.accept(this);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            d.dispose();
            onError(ex);
        }
    }
}

首先保存了下d,然后调用它的onSubscribe的accept方法,由第5步可知,onSubscribe就是一个EmptyConsumer对象,看看EmptyConsumer的accept方法

12. EmptyConsumer::accept
static final class EmptyConsumer implements Consumer<Object> {
    @Override
    public void accept(Object v) { }

    @Override
    public String toString() {
        return "EmptyConsumer";
    }
}

accpet是一个空方法,所以onSubscribe并没有做消费数据的操作,所以FromIterableDisposable的fusionMode还是false,由第9步可知,会走到FromIterableDisposable::run方法,通过它来消费数据

小结,从以上onSubscribe流程可以看到,每个onSubscribe参数都是上游,执行体里面都是调用它的下游来消费数据,跟第6步的小结吻合

13. FromIterableDisposable::run
void run() {
    boolean hasNext;

    do {
        if (isDisposed()) {
            return;
        }
        T v;

        try {
            v = ObjectHelper.requireNonNull(it.next(), "The iterator returned a null value");
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            downstream.onError(e);
            return;
        }

        downstream.onNext(v);

        if (isDisposed()) {
            return;
        }
        try {
            hasNext = it.hasNext();
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            downstream.onError(e);
            return;
        }
    } while (hasNext);

    if (!isDisposed()) {
        downstream.onComplete();
    }
}

代码略长,简单梳理下流程

  1. 该任务是否已经取消了?是,则退出,否则判断是否有下一个数据,没有则到3,有则到2
  2. 调用下游的onNext,将数据传给下游,这里的downStream就是MergeObserver,再到1
  3. 调用下游的onComplete方法

还有一个错误处理的分支:当获取数据的过程中出错了,则会调用下游的onError

14. MergeObserver::onNext
@Override
public void onNext(T t) {
    // safeguard against misbehaving sources
    if (done) {
        return;
    }
    ObservableSource<? extends U> p;
    try {
        p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        upstream.dispose();
        onError(e);
        return;
    }

    if (maxConcurrency != Integer.MAX_VALUE) {
        synchronized (this) {
            if (wip == maxConcurrency) {
                sources.offer(p);
                return;
            }
            wip++;
        }
    }

    subscribeInner(p);
}

去掉一些判断,流程还是很简单的

  1. 通过映射函数mapper,将参数t转变成一个新的观察源ObservableSource对象
  2. maxConcurrency为Integer.MAX_VALUE,所以直接调用subscribeInner
15. MergeObserver::subscribeInner
void subscribeInner(ObservableSource<? extends U> p) {
    for (;;) {
        if (p instanceof Callable) {
            if (tryEmitScalar(((Callable<? extends U>)p)) && maxConcurrency != Integer.MAX_VALUE) {
                boolean empty = false;
                synchronized (this) {
                    p = sources.poll();
                    if (p == null) {
                        wip--;
                        empty = true;
                    }
                }
                if (empty) {
                    drain();
                    break;
                }
            } else {
                break;
            }
        } else {
            InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
            if (addInner(inner)) {
                p.subscribe(inner);
            }
            break;
        }
    }
}

由我们的映射函数生成的ObservableSource是ObservableFromIterable,它没有实现Callable,所有走到了else分支,流程如下:

  1. 创建InnerObserver
  2. addInner,将每个InnerObserver添加到observers中
  3. 使用InnerObserver来对我们通过mapper创建的ObservableFromIterable进行订阅

从第8步中,我们知道每一个InnerObserver都有一个parent指向MergeObserver,留意这一点

16. MergeObserver::addInner
boolean addInner(InnerObserver<T, U> inner) {
    for (;;) {
        InnerObserver<?, ?>[] a = observers.get();
        if (a == CANCELLED) {
            inner.dispose();
            return false;
        }
        int n = a.length;
        InnerObserver<?, ?>[] b = new InnerObserver[n + 1];
        System.arraycopy(a, 0, b, 0, n);
        b[n] = inner;
        if (observers.compareAndSet(a, b)) {
            return true;
        }
    }
}

将每一个InnerObserver存放到observers当中,其实逻辑很简单,但是这里做了很多同步操作
由第9步可知,对ObservableFromIterable进行订阅,首先会回调订阅者的onSubscribe来消费数据,我们看看InnerObserver的onSubscribe

17. InnerObserver::onSubscribe
@Override
public void onSubscribe(Disposable d) {
    if (DisposableHelper.setOnce(this, d)) {
        if (d instanceof QueueDisposable) {
            @SuppressWarnings("unchecked")
            QueueDisposable<U> qd = (QueueDisposable<U>) d;

            int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
            if (m == QueueDisposable.SYNC) {
                fusionMode = m;
                queue = qd;
                done = true;
                parent.drain();
                return;
            }
            if (m == QueueDisposable.ASYNC) {
                fusionMode = m;
                queue = qd;
            }
        }
    }
}

流程如下

  1. 一样的,保存自己的上游
  2. 判断d是否是QueueDisposable,而FromIterableDisposable是QueueDisposable的子类,所以成立
  3. 调用FromIterableDisposable::requestFusion,这里传参是ANY | BOUNDARY
  4. 如果requestFusion返回SYNC则会走parent.drain方法,这里的parent就是MergeObserver,另外有以下几个赋值
    1. fusionMode = SYNC
    2. done = true
    3. queue = qd,即queue指向了FromIterableDisposable,也就是它的上游,随后会通过queue从它的上游获取数据
18. FromIterableDisposable::requestFusion
@Override
public int requestFusion(int mode) {
    if ((mode & SYNC) != 0) {
        fusionMode = true;
        return SYNC;
    }
    return NONE;
}

由传参ANY | BOUNDARY,得到返回就是SYNC,故会走到MergeObserver::drain方法,另外这里讲标志位fusionMode标记为true了,即该FromIterableDisposable已经被消费了,它的FromIterableDisposable::run将不再被执行

19. MergeObserver::drain
void drain() {
    if (getAndIncrement() == 0) {
        drainLoop();
    }
}

做了多线程保护,然后直接调用drainLoop

20. MergeObserver::drainLoop
void drainLoop() {
    final Observer<? super U> child = this.downstream;
    int missed = 1;
    for (;;) {
        if (checkTerminate()) {
            return;
        }
        //这里queue没有赋值,所有为空
        SimplePlainQueue<U> svq = queue;

        if (svq != null) {
            for (;;) {
                U o;
                for (;;) {
                    if (checkTerminate()) {
                        return;
                    }

                    o = svq.poll();

                    if (o == null) {
                        break;
                    }

                    child.onNext(o);
                }
                if (o == null) {
                    break;
                }
            }
        }
        //done为false
        boolean d = done;
        svq = queue;
        InnerObserver<?, ?>[] inner = observers.get();
        int n = inner.length; //n至少为1

        int nSources = 0;
        if (maxConcurrency != Integer.MAX_VALUE) {
            synchronized (this) {
                nSources = sources.size();
            }
        }

        if (d && (svq == null || svq.isEmpty()) && n == 0 && nSources == 0) {
            Throwable ex = errors.terminate();
            if (ex != ExceptionHelper.TERMINATED) {
                if (ex == null) {
                    child.onComplete();
                } else {
                    child.onError(ex);
                }
            }
            return;
        }
        //所以我们可以从这里开始分析,以下一段代码是确定下一个需要处理的InnerObserver的lastIndex,lastId,记为流程1
        boolean innerCompleted = false;
        if (n != 0) {
            long startId = lastId;
            int index = lastIndex;

            if (n <= index || inner[index].id != startId) {
                if (n <= index) {
                    index = 0;
                }
                int j = index;
                for (int i = 0; i < n; i++) {
                    if (inner[j].id == startId) {
                        break;
                    }
                    j++;
                    if (j == n) {
                        j = 0;
                    }
                }
                index = j;
                lastIndex = j;
                lastId = inner[j].id;
            }
            //找到以下一个待处理的InnerObserver后,则开始从index处开始遍历依次向后处理observers中未处理的InnerObserver,记为流程2
            int j = index;
            sourceLoop:
            for (int i = 0; i < n; i++) {
                if (checkTerminate()) {
                    return;
                }
                @SuppressWarnings("unchecked")
                InnerObserver<T, U> is = (InnerObserver<T, U>)inner[j];

                for (;;) {
                    if (checkTerminate()) {
                        return;
                    }
                    SimpleQueue<U> q = is.queue;
                    if (q == null) {
                        break;
                    }
                    U o;
                    for (;;) {
                        try {
                            o = q.poll();
                        } catch (Throwable ex) {
                            Exceptions.throwIfFatal(ex);
                            is.dispose();
                            errors.addThrowable(ex);
                            if (checkTerminate()) {
                                return;
                            }
                            removeInner(is);
                            innerCompleted = true;
                            i++;
                            continue sourceLoop;
                        }
                        if (o == null) {
                            break;
                        }

                        child.onNext(o);

                        if (checkTerminate()) {
                            return;
                        }
                    }
                    if (o == null) {
                        break;
                    }
                }
                boolean innerDone = is.done;
                SimpleQueue<U> innerQueue = is.queue;
                if (innerDone && (innerQueue == null || innerQueue.isEmpty())) {
                    removeInner(is);
                    if (checkTerminate()) {
                        return;
                    }
                    innerCompleted = true;
                }

                j++;
                if (j == n) {
                    j = 0;
                }
            }
            lastIndex = j;
            lastId = inner[j].id;
        }

        if (innerCompleted) {
            if (maxConcurrency != Integer.MAX_VALUE) {
                ObservableSource<? extends U> p;
                synchronized (this) {
                    p = sources.poll();
                    if (p == null) {
                        wip--;
                        continue;
                    }
                }
                subscribeInner(p);
            }
            continue;
        }
        missed = addAndGet(-missed);
        if (missed == 0) {
            break;
        }
    }
}

鉴于代码比较长,代码有些地方我做了注释,由于判断条件都不满足,所以我们只用看代码中标注的两个流程

  1. 流程1
    n不为0,进入if分支,流程如下

    1. 获取上次记录的待处理的InnerObserver的id以及index
    2. 如果observers中的InnerObserver中的个数减少了或者lastIndex对应的InnerObserver发生变化了,则到3,否则已经找到
    3. 分两种情况:
      1. 如果observers中的InnerObserver的个数减少了,则需要重置index,重头开始找到待处理的InnerObserver,重新赋值给lastId以及lastIndex
      2. 如果observer中的innerObserver的个数没有减少,但是最之前lastIndex对应的InnerObserver已经发生了变化,则需要从index处重新找到待处理的InnerObserver,如果没有找到,则重头开始找
  2. 流程2
    通过index取出需要处理的InnerObserver,拿到InnerObserver中的queue,开始遍历
    queue中的数据,由第17步中,可知这个queue就是FromIterableDisposable,FromIterableDisposable中有一个迭代器,queue通过其中的迭代器不断的获取数据通过onNext回调传给它的下游,这里就是我们最原始的订阅者。当前的queue处理完后,外层循环会因为o == null而退出,随后就是赋值lastIndex与lastId,也就是下一个即将要被处理的InnerObserver。

小结,MergeObserver通过drainLoop方法将我们通过mapper创建出来的观察源一一创建一个内部订阅者,也就是InnerObserver,由InnerObserver来消费这些观察源,而InnerObserver又持有一个MergeObserver的引用,而MergeObserver又持有它的下游,也就是我们最原始的订阅者,这样我们就可以把由mapper创建的观察源的数据传给最终的订阅者

总结

  1. RxJava中有大量的服务于多线程的代码,从flatmap的源码分析总可以看出来,很多代码都是了线程同步,保证线程安全。
  2. flatmap通过在MergeObserver中创建一系列的InnerObserver,在同一层级对所有通过mapper创建的观察源进行消费,然后通过内部订阅者持有的parent即MergeObserver将数据派发给MergeObserver下游。虽然这里的InnerObserver并没有显式的指定它的下游,但是它的下游从它的parent也就是MergeObserver“继承了”。
  3. RxJava是基于数据流的,分析它的代码一定要重点关注数据的流动,它的数据加工过程可以看成责任链模式的运用,它的数据分发则是观察者模式与代理模式的运用。
  4. 数据加工的链是通过subscribe方法链接起来的,而数据的分发则是通过onSubscribe方法来完成的
上一篇下一篇

猜你喜欢

热点阅读