RxJava2.0入门系列二:Map操作符源码解析

2019-01-29  本文已影响0人  Mr_BingBing

一、应用场景

RxJava 中 map 操作符:对数据源发送的数据进行类型转换。

二、代码示例

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        })
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer integer) throws Exception {

                        return " Map操作符 Integer类型" + integer + " 变换成 字符串类型" + integer;
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onNext(String s) {
                        Log.d(TAG, "onNext:" + s);
                    }

                    @Override
                    public void onComplete() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                });

上面的代码示例中事件源发送了三个int类型数据,分别是 1 、2、3,通过 map 操作符将其转换为 String 类型。

1.png

下面通过分析源码的方式来讲解map操作符是怎么进行数据类型转换的。

首先来看看map操作符源码长啥样?

 /**
     * Returns an Observable that applies a specified function to each item emitted by the source ObservableSource and
     * emits the results of these function applications.
   
     * @param <R> the output type
     * @param mapper  a function to apply to each item emitted by the ObservableSource
     * @return an Observable that emits the items from the source ObservableSource, transformed by the specified
     *         function
     */
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        //这个方法用来判空
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        //关键还是ObservableMap
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

map 方法内部通过RxJavaPlugins.onAssembly(Observable mObservableMap)方法将传入的Function对象转换成Observable对象,而onAssembly()方法主要是进行Observable之间转换。

onAssembly()源码

/**
     * Calls the associated hook function.
     * @param <T> the value type
     * @param source the hook's input value
     * @return the value returned by the hook
     */
    @SuppressWarnings({ "rawtypes", "unchecked" })
    @NonNull
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

通过以上分析最关键的是 ObservableMaple
ObservableMaple内部通过subscribeActual()方法将观察者source和被观察者MapObserver之间建立关系。
观察者MapObserver内部通过 U v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.")方法将输入的int数据类型转换成String类型u
···
···

ObservableMaple源码如下:

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        //被观察者source和观察者MapObserver建立关系
        source.subscribe(new MapObserver<T, U>(t, function));
    }

   //观察者
    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
      
           //  done  Flag indicating no further onXXX event should be accepted.
            if (done) {
                return;
            }
            //sourceMode   Holds the established fusion mode of the upstream
            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }

            U v;

            try {
                //核心代码  将传入的数据t转换成 v
                //Apply some calculation to the input value and return some other value.
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            //actual    The downstream subscriber
            //观察者接收转换之后的数据v
            actual.onNext(v);
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        @Nullable
        @Override
        public U poll() throws Exception {
            T t = qs.poll();
            return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
        }
    }
}

apply()方法:对输入值A通过计算返回其他值B

/**
 * A functional interface that takes a value and returns another value, possibly with a
 * different type and allows throwing a checked exception.
 *
 * @param <T> the input value type
 * @param <R> the output value type
 */
public interface Function<T, R> {
    /**
     * Apply some calculation to the input value and return some other value.
     * @param t the input value
     * @return the output value
     * @throws Exception on error
     */
    @NonNull
    R apply(@NonNull T t) throws Exception;
}
上一篇 下一篇

猜你喜欢

热点阅读