Android-Rxjava&retrofit&daggerRxJAva OKHttp RetrofitRxJava

Rxjava2~defer~create~just~from~学

2017-06-23  本文已影响345人  品味与回味

<pre>
static Observable<String> sampleObservable() {
return Observable.defer(new Callable<ObservableSource<? extends String>>() {
@Override
public ObservableSource<? extends String> call() throws Exception {
// Do some long running operation
SystemClock.sleep(2000);
return Observable.just("one", "two", "three", "four", "five");
}
});
}
</pre>
一上来我们就举个例子可以看出来两个操作符。 有了前面的经验我们来简单的分析一下。这几个创建操作符的不同

<pre>
public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier) {
ObjectHelper.requireNonNull(supplier, "supplier is null");
return RxJavaPlugins.onAssembly(new ObservableDefer<T>(supplier));
}
</pre>

和以前一样Observable的静态方法。
<pre>
public final class ObservableDefer<T> extends Observable<T> {
final Callable<? extends ObservableSource<? extends T>> supplier;
public ObservableDefer(Callable<? extends ObservableSource<? extends T>> supplier) {
this.supplier = supplier;
}
@Override
public void subscribeActual(Observer<? super T> s) {
ObservableSource<? extends T> pub;
try {
pub = ObjectHelper.requireNonNull(supplier.call(), "null publisher supplied");
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
EmptyDisposable.error(t, s);
return;
}

    pub.subscribe(s);
}

}

</pre>

这是完整ObservableDefer代码。可以看到subscribeActual这个方法不?看到这个我想大家一定马上就想到另一个方法Observable的静态方法subscribe,中最重要的部分 subscribeActual(observer);。吼吼 看到了吗。 只有当你调用的时候(订阅)才会整理数据。
<pre>
a = 12;
Observable<String> o2 =
Observable.defer(new Func0<Observable<String>>() {

@Override
public Observable<String> call() {
    return Observable.just("defer result: " + a);
}

});
a = 20;

o2.subscribe(new Action1<String>() {

@Override
public void call(String t) {
    System.out.println(t);
}

});

</pre>
这是一个网上例子。根据咱们的分析 。 但你订阅了subscribe。它才开始去找寻数据a=20;是不是很简单? 好了 往下进行

<pre>
public void subscribeActual(Observer<? super T> s) {
ObservableSource<? extends T> pub;
try {
pub = ObjectHelper.requireNonNull(supplier.call(), "null publisher supplied");
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
EmptyDisposable.error(t, s);
return;
}

    pub.subscribe(s);
}

</pre>

最重要的相信大家直接看到出来了。
pub = ObjectHelper.requireNonNull(supplier.call(), "null publisher supplied");
supplier.call()这个就是调用你的call方法.下面的方法。大家试着分析一下?
pub.subscribe(s);
又回到了
Observable的静态方法subscribe。那么他会调用谁的?subscribeActual,大家肯定猜到了。
ObservableFromArray的subscribeActual方法~ 为什么呢? 看这里看这里
<pre>
public static <T> Observable<T> fromArray(T... items) {
ObjectHelper.requireNonNull(items, "items is null");
if (items.length == 0) {
return empty();
} else
if (items.length == 1) {
return just(items[0]);
}
return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
</pre>
这是just的方法哦,让我们继续
<pre>
public void subscribeActual(Observer<? super T> s) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);

    s.onSubscribe(d);

    if (d.fusionMode) {
        return;
    }

    d.run();
}

void run() {
T[] a = array;
int n = a.length;

        for (int i = 0; i < n && !isDisposed(); i++) {
            T value = a[i];
            if (value == null) {
                actual.onError(new NullPointerException("The " + i + "th element is null"));
                return;
            }
            actual.onNext(value);
        }
        if (!isDisposed()) {
            actual.onComplete();
        }
    }
}

</pre>
看到这里明白了吧!大家思考一下。这两个以后什么不同点?
3
2
1
defer 中文的意思是推迟。 所以他们的区别就是 在创建的时候 defer的数据都是最新的,因为他在订阅的时候才回去执行,其他的just 就不会。
让我们进行下一个
creat 这个原理其实差不多。 我们来简单的走一下流程

<pre>
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

</pre>

ObservableCreate的subscribeActual方法

<pre>
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);

    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

</pre>

这里要稍微留意一下
source.subscribe(parent);
这里的方法调用的是
<pre>
Observable.create(new ObservableOnSubscribe<List<User>>() {
@Override
public void subscribe(ObservableEmitter<List<User>> e) throws Exception {
if (!e.isDisposed()) {
e.onNext(Utils.getUserListWhoLovesCricket());
e.onComplete();
}
}
</pre>
的subscribe方法,当调用onNext 或者onComplete的时候。他调用的就是ObservableCreate下的
onNext等方法
<pre>
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}

    @Override
    public void onError(Throwable t) {
        if (t == null) {
            t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
        }
        if (!isDisposed()) {
            try {
                observer.onError(t);
            } finally {
                dispose();
            }
        } else {
            RxJavaPlugins.onError(t);
        }
    }

    @Override
    public void onComplete() {
        if (!isDisposed()) {
            try {
                observer.onComplete();
            } finally {
                dispose();
            }
        }
    }

<
</pre>
看看是不是特别简单
最后一个喽
from
。。。。。这个貌似很长。我要做一个专门的专题。

上一篇下一篇

猜你喜欢

热点阅读