Rx转换操作符

2019-04-06  本文已影响0人  gczxbb

map操作符

被观察者数据源泛型,当发射器的数据类型和观察者数据类型不同时,通过map操作符转换,可以将上游发射的类型转换成任意对象类型。

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onComplete();
    }
}).map(new Function<Integer, String>() {
    @Override
    public String apply(Integer s) throws Exception {
        String newStr = s + "_";
        Log.d(TAG, "int apply s " + newStr);
        return newStr;
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String response) throws Exception {
        Log.d(TAG, "Observer : " + response);
    }
});

发射数据类型是Integer类,通过map操作符,将类型转换成String类。Function是一个类型转换接口,Function<T, R>,将T转换R,解决被观察者和观察者数据类型不匹配问题。

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

返回一个被观察者ObservableMap,封装原始被观察者ObservableCreate和转换接口Function,调用ObservableMap的subscribe注册方法。

@Override
public void subscribeActual(Observer<? super U> t) {
    source.subscribe(new MapObserver<T, U>(t, function));
}

创建观察者MapObserver,封装自定义观察者和转换接口Function。source源即内部ObservableCreate,调用它的subscribe方法。

被观察者链

ObservableCreate的#subscribeActual方法,创建CreateEmitter数据发射器,通知观察者已经注册。
调用数据源source(ObservableOnSubscribe)的subscribe方法,将发射器暴漏给外部。外部通过发射器发射数据,如onNext方法。
发射器CreateEmitter持有观察者MapObserver,当onNext事件发射后,通知观察者MapObserver的onNext方法,传参发射的数据类型Integer类。

public void onNext(T t) {
    ...    
    U v;
    try {
        v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
    } catch (Throwable ex) {
        fail(ex);
        return;
    }
    actual.onNext(v);
}

根据MapObserver内部转换接口Function,apply方法,将T类型转换成U类型,再调用自己定义观察者Observer的onNext方法,入参数据类型转换成String。
发射器onNext方法和观察者accept方法按照通知顺序执行。

Rx的map操作符

flatMap操作符

flatMap操作符和map类似,Function接口实现类型转换,转换的对象是一个被观察者ObservableSource。

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
    }
}).flatMap(new Function<Integer, ObservableSource<String>>() {
    @Override
    public ObservableSource<String> apply(Integer s) throws Exception {
        return Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                //将上游Integer类型数据,在新发射器中改造发射。
                String newStr = s + "_gc1";
                String newStr2 = s + "_gc2";
                Log.d(TAG, "int apply s " + newStr);
                Log.d(TAG, "int apply s " + newStr2);
                e.onNext(newStr);
                e.onNext(newStr2);
            }
        });
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String response) throws Exception {
        Log.d(TAG, "Observer : " + response);
    }
});

将上游发射器每个Integer类型的数据转换成Observable类型,再由每个转换的被观察者发射目标类型数据。

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
                                       boolean delayErrors, int maxConcurrency, int bufferSize) {
    ...
    return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}

返回一个被观察者ObservableFlatMap。封装原始被观察者ObservableCreate和转换接口Function,调用ObservableFlatMap的subscribe注册方法。

public void subscribeActual(Observer<? super U> t) {
    ...
    source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}

创建观察者MergeObserver,封装自定义观察者和转换接口Function,source源即内部ObservableCreate,调用它的subscribe方法。当发射器onNext方法发射时,调用发射器内部MergeObserver的onNext方法。

@Override
public void onNext(T t) {

    ObservableSource<? extends U> p;
    try {
        p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
    } catch (Throwable e) {
        return;
    }
    //调用的是新建ObservableSource的注册方法。
    subscribeInner(p);
}

通过Function接口方法,将Integer类型转换成ObservableSource类型,转换对象是一个被观察者,外部创建,ObservableCreate类型,将Integer类型的数据暴露在新被观察者的数据源发射器中,处理转换成新发射器支持String类型,subscribeInner方法,新被观察者订阅。

void subscribeInner(ObservableSource<? extends U> p) {
    for (;;) {
        if (p instanceof Callable) { 
            ...
        } else {
            InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
            if (addInner(inner)) {
                p.subscribe(inner);
            }
            break;
        }
    }
}

调用Observable的subscribe方法,订阅InnerObserver观察者,外部调用发射器onNext方法,可以获取apply方法中上层发射的Integer数据,按照String类型,触发两个onNext方法再次发射数据,两次调用观察者InnerObserver的onNext方法,每次,调用它引用MergeObserver的onNext方法,最终,通知到外部观察者。

flatMap最初的onNext顺序,在Function转换成新Observable后,根据收到的数据,包装重新发射一批新数据。在观察者到的onNext顺序不一定是按照最初的onNext顺序调用的。
上面发送的1,2,3,在观察者中看到的不一定是1,2,3的排序,加一个延迟就能看到,即1_gc1,1_gc2,3_gc1,3_gc2,2_gc1,2_gc2。

flatMap操作符数据流程

总结

flatMap不保证数据发射流的通知顺序。
concatMap和flatMap功能相同,可以保证按照发射顺序通知。


任重而道远

上一篇 下一篇

猜你喜欢

热点阅读