RxJava操作符介绍,部分使用,使用场景,源码浅析(二)

2016-08-11  本文已影响305人  梦sora

ReactiveX操作符

1. RxJava操作符介绍

创建操作
变换操作(对Observable发射的数据进行变换)
过滤操作
组合操作
错误处理
辅助操作
条件和布尔操作
算术和聚合操作
连接操作
自定义操作符

2. RxJava操作符的使用

RxJava操作符的使用

3. lift源码解析

1.先看看Observablecreate()

public static <T> Observable<T> create(OnSubscribe<T> f) {
    return new Observable<T>(RxJavaHooks.onCreate(f));
}

Observable的构造方法

protected Observable(OnSubscribe<T> f) {
    this.onSubscribe = f;
}

RxJavaHooksonCreate()方法

public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
    Func1<OnSubscribe, OnSubscribe> f = onObservableCreate;
    if (f != null) {
        return f.call(onSubscribe);
    }
    return onSubscribe;
}

hook了Observable.OnSubscribe

2.在看Observablelift()

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    return create(new OnSubscribeLift<T, R>(onSubscribe, operator));
}

OnSubscribeLift

public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
    final OnSubscribe<T> parent;
    final Operator<? extends R, ? super T> operator;
    public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
    this.parent = parent;
    this.operator = operator;
    }
    @Override
    public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
            try {
                st.onStart();
                parent.call(st);
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                st.onError(e);
            }
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            o.onError(e);
        }
    }
}

RxJavaHooksonObservableLift()方法

public static <T, R> Operator<R, T> onObservableLift(Operator<R, T> operator) {
    Func1<Operator, Operator> f = onObservableLift;
    if (f != null) {
        return f.call(operator);
    }
    return operator;
}

同样hook了Observable.OnSubscribe,是新的onSubscribe
这里的parent就是旧onSubscribe
总结下,新的Observable会像一个代理一样,负责接收原始的Observable发出的事件,并在处理后发送给Subscriber

4. RxJava应用场景

上一篇 下一篇

猜你喜欢

热点阅读