Android中RxJava解析

2021-01-12  本文已影响0人  RmondJone

一、什么是RxJava?

RxJava 在 GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。这就是 RxJava ,概括得非常精准。

二、为什么要用RxJava?

1、异步操作很关键的一点是程序的简洁性,因为在调度过程比较复杂的情况下,异步代码经常会既难写也难被读懂。 Android 创造的 AsyncTask 和Handler ,其实都是为了让异步代码更加简洁。RxJava 的优势也是简洁,但它的简洁的与众不同之处在于,随着程序逻辑变得越来越复杂,它依然能够保持简洁,增加了工程的可维护性。

2、轻松实现线程的切换工作,只需调用 subscribeOn() 和 observeOn() 两个方法。

三、RxJava原理概括

这里以RxJava1来阐述原理,RxJava2引入了最新的背压概念,用于解决上流事件发送太快,下流事件处理不及时问题。我们在使用过程中,只需了解RxJava的基本原理即可。最重要的就是理解RxJava为什么可以实现切换线程以及RxJava中流的转换过程。

订阅的实现:

1、RxJava的异步实现,是通过一种扩展的观察者模式来实现的。
RxJava 的观察者模式大致如下图:

2、 RxJava 使用 create() 方法来创建一个被观察者即Observable ,并为它定义事件触发规则

首先创建一个OnSubscribe对象,在这个对象中定义了一些触发规则,然后调用create()方法来新建一个Observable对象。

Observable observable = Observable.create(new Observable.OnSubscribe() {
    @Override
    public void call(String subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onNext("Aloha");
        subscriber.onCompleted();
    }
});

3、创建观察者Subscriber或者Observer,Subscriber 对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的。不仅基本使用方式一样,实质上,在 RxJava 的 subscribe(订阅) 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用。

注意点:这里的概念仅限于RxJava1,在RxJava2中这个概念是不同的,关于不同点可以查看文章最后的RxJava2的相关文章。

Subscriber subscriber = new Subscriber() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};

4、订阅

创建了 Observable 和 Observer 之后,再用 subscribe() 方法将它们联结起来,整条链子就可以工作了。代码形式很简单

observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);

我们来看一下subscribe()方法中做了哪些事情?

// 注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
public Subscription subscribe(Subscriber subscriber) {
    subscriber.onStart();
    onSubscribe.call(subscriber);
    return subscriber;
}

可以看到,subscriber() 做了3件事:

线程切换、流的变换实现:

1、在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。

在RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:

2、在了解RxJava为什么可以切换线程之前,需要了解一下RxJava的变换

Observable.just("images/logo.png") // 输入类型 String
    .map(new Func1() {
        @Override
        public Bitmap call(String filePath) { // 参数类型 String
            return getBitmapFromPath(filePath); // 返回类型 Bitmap
        }
    })
    .subscribe(new Action1() {
        @Override
        public void call(Bitmap bitmap) { // 参数类型 Bitmap
            showBitmap(bitmap);
        }
    });

这里通过map()函数实现了String->Bitmap的流的转换。


Student[] students = ...;
Subscriber subscriber = new Subscriber() {
    @Override
    public void onNext(Course course) {
        Log.d(tag, course.getName());
    }
    ...
};
Observable.from(students)
    .flatMap(new Func1() {
        @Override
        public Observable call(Student student) {
            return Observable.from(student.getCourses());
        }
    })
    .subscribe(subscriber);

flatMap() 的原理是这样的:

这三个步骤,把事件拆成了两级,通过一组新创建的 Observable 将初始的对象『铺平』之后通过统一路径分发了下去。

3、变换的原理:lift()
这些变换虽然功能各有不同,但实质上都是针对事件序列的处理和再发送。而在 RxJava 的内部,它们是基于同一个基础的变换方法: lift(Operator)。首先看一下 lift() 的内部实现(仅核心代码):

// 注意:这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
public Observable lift(Operator operator) {
    return Observable.create(new OnSubscribe() {
        @Override
        public void call(Subscriber subscriber) {
            Subscriber newSubscriber = operator.call(subscriber);
            newSubscriber.onStart();
            onSubscribe.call(newSubscriber);
        }
    });
}

这段代码很有意思:它生成了一个新的 Observable 并返回,而且创建新 Observable 所用的参数 OnSubscribe 的回调方法 call() 中的实现竟然看起来和前面讲过的 Observable.subscribe() 一样!然而它们并不一样哟~不一样的地方关键就在于第二行 onSubscribe.call(subscriber) 中的 onSubscribe 所指代的对象不同(高能预警:接下来的几句话可能会导致身体的严重不适)

当含有 lift() 时:

4、如果经过多次变换也就是多次lift()之后是什么样的?

如下图所示:

5、变换和线程切换有什么关系?

我们知道subscribeOn() 和 observeOn() 可以实现线程的切换,其实subscribeOn() 和 observeOn() 内部的实现也是lift()。

subscribeOn() 原理图:


observeOn() 原理图:

从图中可以看出,subscribeOn() 和 observeOn() 都做了线程切换的工作(图中的 "schedule..." 部位)。不同的是, subscribeOn() 的线程切换发生在 OnSubscribe 中,即在它通知上一级 OnSubscribe 时,这时事件还没有开始发送,因此 subscribeOn() 的线程控制可以从事件发出的开端就造成影响;而 observeOn() 的线程切换则发生在它内建的 Subscriber 中,即发生在它即将给下一级 Subscriber 发送事件时,因此 observeOn() 控制的是它后面的线程。

6、最后,我用一张图来解释当多个 subscribeOn() 和 observeOn() 混合使用时,线程调度是怎么发生的(由于图中对象较多,相对于上面的图对结构做了一些简化调整)

图中共有 5 处含有对事件的操作。由图中可以看出,①和②两处受第一个 subscribeOn() 影响,运行在红色线程;③和④处受第一个 observeOn() 的影响,运行在绿色线程;⑤处受第二个 observeOn() 影响,运行在紫色线程;而第二个 subscribeOn() ,由于在通知过程中线程就被第一个 subscribeOn() 截断,因此对整个流程并没有任何影响。

所以:当使用了多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用,作用于 subscribe()函数。每指定一次observeOn()线程则实现了一次线程的切换工作,在observeOn()之后的转换工作或者Subscriber都运行在observeOn()指定的线程中。

关于RxJava相关的拓展

1、原文链接:http://gank.io/post/560e15be2dca930e00da1083
2、Rxjava2:https://www.jianshu.com/p/0cd258eecf60
3、关于背压:https://www.jianshu.com/p/2c4799fa91a4
4、GitHub:https://github.com/ReactiveX/RxJava

上一篇 下一篇

猜你喜欢

热点阅读