RxJava智阳androidAndroid-Rxjava&retrofit&dagger

rxjava1-map操作符源码分析

2019-03-28  本文已影响2人  草蜢的逆袭

rxjava1 map操作符分析

<font color = "red">由于源码版本不同,可能会有所差别</font>

代码执行流程分析

create:
创建一个新的Observable

map :
也是创建了一个新的Observable

rx.Observable#create(rx.Observable.OnSubscribe<T>)

OnSubscribe 是create里面的参数

@Deprecated
public static <T> Observable<T> create(OnSubscribe<T> f) {
    return new Observable<T>(RxJavaHooks.onCreate(f));
}

rx.Observable#map

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    // 这个this是我们通过create方法创建的Observable
    return unsafeCreate(new OnSubscribeMap<T, R>(this, func));
}

public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {}     

rx.Observable#unsafeCreate

OnSubscribe 就是OnSubscribeMap对象,因为它实现了OnSubscribe接口

public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) {
    // 这里会修改this这个引用
    return new Observable<T>(RxJavaHooks.onCreate(f));
}

代码调用流程分析

rx.Observable#subscribe(rx.Observer<? super T>)

public final Subscription subscribe(final Observer<? super T> observer) {
    // 构建subscribe对象
    return subscribe(new ObserverSubscriber<T>(observer));
}

rx.Observable#subscribe(rx.Subscriber<? super T>)

this 这个对象是我们上面的Observable对象,即最后一次调用创建的Observable对象,这里是OnSubscribeMap

public final Subscription subscribe(Subscriber<? super T> subscriber) {
    return Observable.subscribe(subscriber, this);
}   

rx.Observable#subscribe(rx.Subscriber<? super T>, rx.Observable<T>)

static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
    // 这里直接调用了我们上面传入的Observable即OnSubscribeMap的call方法
    RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
    return RxJavaHooks.onObservableReturn(subscriber);
}

rx.internal.operators.OnSubscribeMap#call

@Override
public void call(final Subscriber<? super R> o) {
    MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
    o.add(parent);
    // source即Observable,即旧的Observable
    source.unsafeSubscribe(parent);
}

rx.Observable#unsafeSubscribe

subscriber OnSubscribeMap$MapSubscriber

public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
    // 这里调用onSubscribe的call方法,其实就是调用了在create时传入的OnSubscribe的call方法
    // 同把时OnSubscribeMap$MapSubscriber对象的引用作为call方法的参数传入
    // 这样, 在调用subscriber.onNext,onComplete即调用刚才了MapSubscriber中去了
    RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
    return RxJavaHooks.onObservableReturn(subscriber);
}

示例代码

Observable.
    create((Observable.OnSubscribe<String>) subscriber -> {
        LogUtils.loge("OnSubscribe call ... ");
        if (!subscriber.isUnsubscribed()) {
            LogUtils.loge("subscriber.onNext ... ");
            subscriber.onNext("10");
            subscriber.onCompleted();
        }
    }).
    map(s -> {
        return Integer.valueOf(s);
    }).
    subscribe(new Observer<Integer>() {
        @Override
        public void onCompleted() {
            LogUtils.loge("Observer onCompleted ... ");
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(Integer integer) {
            LogUtils.loge("Observer onNext value =  " + integer + " , cls type = " + integer.getClass());
        }
    });  

rx.internal.operators.OnSubscribeMap.MapSubscriber#onNext

@Override
public void onNext(T t) {
    R result;

    // map操作符的func1,即对传入的数据进行转换
    result = mapper.call(t);

    // actual 为ObserverSubscriber,即调用到了Observer的onNext方法
    actual.onNext(result);
}

rx.internal.util.ObserverSubscriber#onNext

@Override
public void onNext(T t) {
    // 调用observer的onNext方法
    observer.onNext(t);
}   
上一篇 下一篇

猜你喜欢

热点阅读