RxJava(十三)--map()解析

2019-03-08  本文已影响0人  azu_test

介绍

map()方法是对Observable内的数据处理器Observable.OnSubscribe执行完数据处理后的再次加工。

执行代码

        //初始化被观察者Observable,并给其加上数据处理器Observable.OnSubscribe
        Observable Aobservable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("杨");
                subscriber.onNext("月");
                subscriber.onCompleted();
            }
        });
        //做Map变换处理
        Observable  Bobservable = Aobservable.map(new Func1<String,String>() {
            @Override
            public String call(String string) {
                return string+"YaZhou";
            }
        });
         //初始化观察者Observer,视作结果接收器
         Observer observer = new Observer<String>() {
            @Override
            public void onCompleted() {
                LogShowUtil.addLog("RxJava","结束",true);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String string) {
                LogShowUtil.addLog("RxJava","结果: "+string,true);
            }
        };
        Bobservable.subscribe(observer);

上面代码AobservableBobservable命名不规范是为了做凸显之意,不要介意。

源码分析

1. 初始化被观察者AObservable
Observable  Aobservable = Observable.create(原始数据处理器);

由此可知被观察者AObservable持有原始数据处理器对象Observable.OnSubscribe。

2. 执行map()变换操作
    Observable  Bobservable = Aobservable.map(数据变换器)
    Observable#map
    public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return create(new OnSubscribeMap<T, R>(this, func));
    }

接着我们看其中的new OnSubscribeMap(Aobservable,数据变换器)操作

    OnSubscribeMap#OnSubscribeMap
    public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
        this.source = source;
        this.transformer = transformer;
    }

由代码可知代理数据变换器OnSubscribeMap持有Aobservable和数据转换器func

回到map()方法内继续执行create(代理数据变换器)

        return create(new OnSubscribeMap<T, R>(this, func));

create方法之前已经分析过,由此可知Bobservable持有代理数据变换器OnSubscribeMap。

3. 初始化结果接受器观察者Observer,
        Observer observer = new Observer<String>() {
           ...
        }
4. 订阅
        Bobservable.subscribe(observer);

由之前分析可知会使用 Bobservable内的代理数据变换器OnSubscribeMap做call()方法。
其中observer为结果接受器

    OnSubscribeMap#call
    @Override
    public void call(final Subscriber<? super R> o) {
        //步骤一
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);
        //步骤二
        source.unsafeSubscribe(parent);
    }

先看步骤一此处初始化了数据变换中转器MapSubscriber(结果接收器,数据变换器)。
其中结果接收器是subscribe()方法传递进来的。
数据变换器是map时初始化OnSubscribeMap传递进来的。

接着看数据变换中转器MapSubscriber()构造方法

        public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }

由此可知数据变换中转器中持有结果接收器actual和数据变换器mapper

再回到OnSubscribeMap的call()方法内继续执行步骤二

        //步骤二
        source.unsafeSubscribe(parent);

其中source为Aobservable,parent为数据变换中转器MapSubscriber

继续执行会进入Aobservable.unsafeSubscribe()方法

    public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
        try {
            subscriber.onStart();
            //获取数据处理器Observable.OnSubscribe,并做数据处理工作
            RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            try {
                subscriber.onError(RxJavaHooks.onObservableError(e));
            } catch (Throwable e2) {
                Exceptions.throwIfFatal(e2);
                RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                RxJavaHooks.onObservableError(r);
                throw r;
            }
            return Subscriptions.unsubscribed();
        }
    }

由此可知AObservable的原始数据处理器先执行call(数据变换中转器MapSubscriber)方法

接下来会进入外部实现的外部数据处理器

           数据处理器内的call()方法
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("杨");
                subscriber.onNext("月");
                subscriber.onCompleted();
            }

然后会进入数据变换中转器MapSubscriber的onNext()方法

        @Override
        public void onNext(T t) {
            R result;
            try {
                //步骤一 数据变换操作
                result = mapper.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }
            //步骤二 返回数据结果给用户(结果接收器)
            actual.onNext(result);
        }

执行步骤一进入map()方法Func的外部实现方法,并返回数据

        observable = observable.map(new Func1<String,String>() {
            @Override
            public String call(String string) {
                return string+"YaZhou";
            }
        });

接着会执行步骤二,其中actual上文已经分析可知为结果接收器

            actual.onNext(result);

接着就会进入结果接收器Observer内方法体内

        Observer observer = new Observer<String>() {
            @Override
            public void onCompleted() {
                LogShowUtil.addLog("RxJava","结束",true);
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onNext(String string) {
                LogShowUtil.addLog("RxJava","结果: "+string,true);
            }
        };

最终输出结果

结果: 杨YaZhou
结果: 月YaZhou
结束
上一篇下一篇

猜你喜欢

热点阅读