程序员今日看点Android技术知识

RxJava学习笔记(解读源码)

2016-11-27  本文已影响0人  cgzysan

写在前面的话

看了源码也不少,但是每次看源码都仅仅,也就是仅仅跟着作者一步步跟进ta的方法,点进去,再点进去,再再点进去,渐渐的感觉自己当时看的太肤浅,有很多问题都会冒出来,最多的就是为什么要这样写呢,一追究这个问题就会发现自己在知识的海洋里是如此的渺小。写了这么多,希望能够在以后眼光放高一些,看大局,看别人是如何排兵布阵!

rxjava执行原理

先从调用rxjava功能的代码入手,首先最简单的调用就是:

Observable.create(new Observable.OnSubscribe<Integer>() {//创建

    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        subscriber.onNext(1);
        subscriber.onCompleted();
    }

}).subscribe(new Observer<Integer>() {

    @Override
    public void onCompleted() {
        System.out.println("Completed");
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("Error");
    }

    @Override
    public void onNext(Integer integer) {
        System.out.println("i = " + integer);
    }
});

Observable.create()

Subscriber

创建了生产者Observable,那么肯定还要创建消费者ObserverSubscriber就是Observer接口的一个实现类,同时还实现了Subscription接口,这个接口的方法unsubscribe()用于取消订阅,还有一个isUnsubscribed()方法判断订阅的状态,unsubscribe() 这个方法很关键,因为在 subscribe()之后, Observable 会持有 Subscriber 的引用,这个引用如果不能及时被释放,将有内存泄露的风险。

subscribe()

rxjava变换原理

rxjava的变换虽然功能各有不同,但实质上都是针对时间序列的处理和再发送,这里我们就通过map()来了解其中的原理,一下就是利用map的一段代码:

Observable.just(1,2,3)
        .map(new Func1<Integer, String>() {
            @Override
            public String call(Integer integer) {
                return "i = " + integer;
            }
        })
        .subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {
                System.out.println("s : " + s);
            }
        });

接着看map()方法:

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    return create(new OnSubscribeMap<T, R>(this, func));
}

是不是感觉很熟悉,返回的就是创建Observable的方法,也就是说它将我们map()中的func作为onSubscribeMap的构造参数,那么onSubscribeMap又是什么,它其实就是OnSubscribe的一个实现类,先看其构造方法:

public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
    this.source = source;
    this.transformer = transformer;
}

这里通过内部的成员变量保存了传递进来的ObservableFunc1,记住,这里也就是说在外头调用该方法的Observable,也就是源Observable保存在了source,而我们在map()中写的Func1()保存在了transformer中,在往后看.subscribe(),注意调用订阅方法的已经不是源Observable,而是通过map()内部自己创建返回的一个新Observable,也就是说新的Observable持有了Subscriber的对象,那么,订阅了之后自然就会激活Observable发射数据,也就是onSubscribe中的call()方法开始执行,在这里就是onSubscribeMapcall()方法。

public void call(final Subscriber<? super R> o) {
    MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
    o.add(parent);
    source.unsafeSubscribe(parent);
}

代码开头就创建了一个MapSubscriber,它是Subscriber的一个子类,构造函数如下:

public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
    this.actual = actual;
    this.mapper = mapper;
}

联系上头代码,可以发现它将传入的Subscriber包装成了MapSubscriber,同时还讲源SubscriberFunc1保存在了成员变量中,之后执行的时候肯定要执行onNext()方法,直接看onNext()

public void onNext(T t) {
    R result;

    try {
        result = mapper.call(t);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        unsubscribe();
        onError(OnErrorThrowable.addValueAsLastCause(ex, t));
        return;
    }

    actual.onNext(result);
}

这里有一个泛型R,也就是返回值类型,是通过构造函数传入的,最重要的代码result = mapper.call(t),这里的mapper就是我们在map()中写的Func1,接着传入参数,通过我们自己的方法得到我们想要的结果返回。

上一篇下一篇

猜你喜欢

热点阅读