RxJava1 原理

2019-01-25  本文已影响0人  simplehych

0x01 RxJava 简介

github 地址:https://github.com/ReactiveX/RxJava

a library for composing asynchronous and event-based programs by using observable sequences for the Java VM.

一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库

关键字:异步事件可观测序列

RxJava1原理读自 扔物线 于2015年10月写的 给 Android 开发者的 RxJava 详解

功能:异步,解决异步处理问题
好处:简洁,链式调用,逻辑顺序执行

0x02 RxJava1 原理

鉴于大多数同学已经相对熟悉RxJava,不再逐一深入,只进行核心说明。

本节依据以下版本进行探讨

implementation 'io.reactivex:rxjava:1.0.14'
implementation 'io.reactivex:rxandroid:1.0.1'

抛出一个示例

简化代码如下

Observable
        .create( new OnSubscribe() {
              void call(Subscriber subscriber) {}
        })
        .subscribeOn()
        .flatMap()
        .observeOn()
        .map()
        .subscribeOn()
        .take()
        .observeOn()
        .subscribe(new Subscriber())

具体代码如下,可暂时忽略

Observable
        .create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
                subscriber.onCompleted();
            }
        })
        .subscribeOn(Schedulers.io())
        .flatMap(new Func1<Integer, Observable<String>>() {
            @Override
            public Observable<String> call(Integer integer) {
                return Observable.just("flatMap-" + integer);
            }
        })
        .observeOn(AndroidSchedulers.mainThread())
        .map(new Func1<String, String>() {
            @Override
            public String call(String s) {
                return "map-" + s;
            }
        })
        .doOnSubscribe(new Action0() {
            @Override
            public void call() {
                Toast.makeText(context, "doOnSubscribe call", Toast.LENGTH_SHORT).show();
            }
        })
        .subscribeOn(AndroidSchedulers.mainThread())
        .take(2)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<String>() {

            @Override
            public void onStart() {
                Log.i(TAG, "Subscriber onStart");

            }

            @Override
            public void onNext(String s) {
                Log.i(TAG, "Subscriber onNext " + s);
            }

            @Override
            public void onCompleted() {
                Log.i(TAG, "Subscriber onStart");
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "Subscriber onStart");
            }
        });

Point 核心知识点

// 这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
    return Observable.create(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber subscriber) {
            Subscriber newSubscriber = operator.call(subscriber);
            newSubscriber.onStart();
            onSubscribe.call(newSubscriber);
        }
    });
}
// operator.call() 的核心代码
public final class OperatorXXX<T> implements Observable.Operator<T, T> {
    @Override
    public Subscriber<? super T> call(final Subscriber<? super T> child) {
        final Subscriber<T> parent = new Subscriber<T>() {
            @Override
            public void onNext(T i) {
                child.onNext(i);
            }
        };
        return parent;
    }
}
// subscribe()的核心代码
public Subscription subscribe(Subscriber subscriber) {
    subscriber.onStart();
    onSubscribe.call(subscriber);
    return subscriber;
}
// 整体链式调用理解
Observable
        .create( new OnSubscribe() {
            void call(Subscriber subscriber) {}
        })
        .lift()
        .lift()
        .subscribe(new Subscriber())
上一篇下一篇

猜你喜欢

热点阅读