Android开发规范技巧Android开发Android开发经验谈

【RxJava】- 结合操作符源码分析

2020-03-23  本文已影响0人  拔萝卜占坑

目录

【RxJava】- 创建操作符源码分析
【RxJava】- 变换操作符源码分析
【RxJava】- 过滤操作符源码分析
【RxJava】- 连接操作符源码分析

CombineLatest

作用于最近发射的数据项:如果Observable1发射了A并且Observable2发射了B和C,combineLatest()将会分组处理AB和AC,处理方式是使用一个函数结合每个Observable发射的最近数据项,并且基于这个函数的结果发射数据。

RxJava将这个操作符实现为combineLatest,它接受二到九个Observable作为参数,或者单个Observables列表作为参数。它默认不在任何特定的调度器上执行。

ObservableCombineLatest

实现
看一下ObservableCombineLatest$LatestCoordinator中的innerNext,combineLatest操作符实现逻辑在这个内部类中。

void innerNext(int index, T item) {
    boolean shouldDrain = false;
    synchronized (this) {
        Object[] latest = this.latest;
        if (latest == null) {
            return;
        }
        Object o = latest[index];
        int a = active;
        if (o == null) {
            active = ++a;
        }
        latest[index] = item;
        if (a == latest.length) {
            queue.offer(latest.clone());
            shouldDrain = true;
        }
    }
    if (shouldDrain) {
        drain();

this.latest数组是在创建LatestCoordinator对象时,根据combineLatest方法Observable参数个数为数组长度创建的数组。

每一个Observable(发射数据的Observable,即被观察者)都有自己的index,当发射数据时,先从缓存数组取该索引对应的值,如果存在者更新,如果不存在者计数active变量加1,如果计数变量值等于数组长度,表示每一个Observable都发射过数据,那么这个时候将数组克隆,然后调用 drain()方法。

void drain() {
    ...
    v = ObjectHelper.requireNonNull(combiner.apply(s), "The combiner returned a null value");
    ...
    a.onNext(v);
    ...
}

传入的s就是上面数组保存的发射数据,a是观察者,即订阅了combineLatest消息的实例。

Join

任何时候,只要在另一个Observable发射的数据定义的时间窗口内,这个Observable发射了一条数据,就结合两个Observable发射的数据。由

ObservableJoin

实现。

subscribeActual方法
@Override
protected void subscribeActual(Observer<? super R> observer) {
   JoinDisposable<TLeft, TRight, TLeftEnd, TRightEnd, R> parent =new JoinDisposable<>(observer, leftEnd, rightEnd, resultSelector);
  // 通知观察者,并传入parent参数
   observer.onSubscribe(parent);
   LeftRightObserver left = new LeftRightObserver(parent, true);
  // disposables到JoinDisposable集合中
   parent.disposables.add(left);
   LeftRightObserver right = new LeftRightObserver(parent, false);
   // disposables到JoinDisposable集合中
   parent.disposables.add(right);  
   // 调用第一个被观察者的subscribe方法
   source.subscribe(left);
   // 第二个加入的被观察者subscribe方法
   other.subscribe(right);
}

调用第两个(left和right)被观察者的subscribe

source.subscribe(left);

会执行LeftRightObserver中的onNext方法

public void onNext(Object t) {
    parent.innerValue(isLeft, t);
}

插入数组队列queue

queue.offer(isLeft ? LEFT_CLOSE : RIGHT_CLOSE, index);
LeftRightEndObserver

上面会调用数据包装实例的subscribe方法,那么在LeftRightEndObserver中做了什么操作呢?

public void onNext(Object t) {
    if (DisposableHelper.dispose(this)) {
         parent.innerClose(isLeft, this);
     }
}

首先结束订阅关系,调用innerClose方法。

public void innerClose(boolean isLeft, LeftRightEndObserver index) {synchronized (this) {
   queue.offer(isLeft ? LEFT_CLOSE : RIGHT_CLOSE, index);}
   drain();
}

那么又回到上面的逻辑了。

使用举例
private void join(){Observable.<Integer>create(emitter -> emitter.onNext(1))
      .join(observer -> observer.onNext(2), new Function<Integer, ObservableSource<Integer>>() {
            @Override
            public ObservableSource<Integer> apply(Integer integer) {
                return observer -> {
                      // 立即调用,发射的数据会被移除,最后观察者得不到任何数据
                      // observer.onComplete();
                };
            }
        }, integer -> (ObservableSource<Integer>) observer -> {
            // 立即调用,发射的数据会被移除,最后观察者得不到任何数据
            // observer.onComplete();
        }, (BiFunction<Integer, Integer, Integer>) (integer, integer2) -> integer + integer2).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) { }

            @Override
            public void onNext(@NonNull Integer integer) {
                // 输出为:create operate--->join: 3
                System.out.println("create operate--->join: " + integer);
            }

            @Override
            public void onError(@NonNull Throwable e) { }

            @Override
            public void onComplete() { }
        });
    }
总结

LeftRightEndObserver作用就是将lefts,rights,disposables中对应索引的数据移除,也就是数据有效的时间期限,一旦被移除,就不能被通知给观察者了。其实功能就是将两个被观察者(left和right)发射的数据由resultSelector(BiFunction)做变换后通知给观察者。

比如你在p.subscribe(end)方法后,会执行leftEnd或者rightEnd的subscribe方法,如果你这时立即调用LeftRightEndObserver的onNext或者onComplete方法,最终都是得不到resultSelector变换后的数据的。

所以为什么说leftEnd和rightEnd是持续时间的函数。我们可以控制数据的有效时间。

Merge

合并多个Observables的发射物。太多了,自己查看源码。如果想知道大概用法请参考:Merge

StartWith

在数据序列的开头插入一条指定的项。

Switch

将一个发射多个Observables的Observable转换成另一个单独的Observable,后者发射那些Observables最近发射的数据项

Zip

通过一个函数将多个Observables的发射物结合到一起,基于这个函数的结果为每个结合体发射单个数据项。下面讲解一下ObservableZip实现,看一下里面的drain()方法实现

 public void onNext(T t) {
     queue.offer(t);
     parent.drain();
 }
public void drain() {
    ...
    for (;;) {
        for (;;) {
            int i = 0;
            int emptyCount = 0;
            for (ZipObserver<T, R> z : zs) {
                if (os[i] == null) {
                    boolean d = z.done;
                    T v = z.queue.poll();
                    boolean empty = v == null;
                    if (checkTerminated(d, empty, a, delayError, z)) {
                        return;
                    }
                    if (!empty) {
                        os[i] = v;
                    } else {
                        emptyCount++;
                    }
                } else {
                    if (z.done && !delayError) {
                        Throwable ex = z.error;
                        if (ex != null) {
                            cancelled = true;
                            cancel();
                            a.onError(ex);
                            return;
                        }
                    }
                }
                i++;
            }
            if (emptyCount != 0) {
                break;
            }
            R v;
            try {
                v = Objects.requireNonNull(zipper.apply(os.clone()), "The zipper returned a null value");
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                cancel();
                a.onError(ex);
                return;
            }
            a.onNext(v);
            Arrays.fill(os, null);
        }
        missing = addAndGet(-missing);
        if (missing == 0) {
            return;
        }
    }
}

精华逻辑全在者几层for循环里面,由里向外,第一层for循环:遍历observers,而observers的长度也是Observable参数个数,即有多少个发射的数据的Observable实例,而row保存发射的值的数组。

如果Observable发射过数据,那么对应取出来的值就不等于null,执行else,如果没有发生错误或者终止,那么继续循环。

如果Observable没有发射过数据,者将数据保存到对应的数据索引下。

第二层循环,if (emptyCount != 0)为true,者跳出第二次循环,否则将row保存的数据应用变换函数,然后把变换后的值发射给观察者。

emptyCount是在row数组中还有Observable没有发射过数据的时候,会加1。

第三层循环,很简单,自己查看。

如果在地一层循环里面 ,f (os[i] == null)始终不成立,也就是说所以Observable都发射过数据了,那么调用变换函数,然后发射变换后的值。

上一篇 下一篇

猜你喜欢

热点阅读