RxJava 系列 (一)RxJava 1.0 简介
RxJava 在业内越来越受欢迎,对于老手来说,RxJava 太好用了,RxJava 简直无所不能;然而对于新手来说,RxJava 更像一座陡峭的大山,似乎不能识得 RxJava 的真面目,无法逾越。纠其原因,新手更像是只知其然不知其所以然,只停留在会用的角度是无法了解 RxJava 精髓的。
我们学习一个新概念,应该多去思考,如 RxJava 出现的背景,它解决了什么问题,它是如何解决的,它有什么优点,它的核心思想是什么,它能应用在哪些场景等。如果对这几个问题都有答案的话,对 RxJava 的基本认知就算完成了。对于 RxJava 小白来说,非常不建议一来就源码分析,否则很容易陷入代码黑洞无法自拔,源码分析可以在 RxJava 进阶阶段去做。本文将会围绕上面提出的几个问题做一些简要回答。
RxJava 背景
我们都知道 Java 是一门编程语言,那么 Rx 是什么呢?
Github 对 Rx 的介绍如下:
Reactive Extensions for Async Programming
Rx (Reactive Extension)是异步编程的响应式扩展,再具体一步说, Rx 是微软.NET 的一个响应式扩展。Rx 借助可观测的序列提供一种简单的方式来创建异步的,基于事件驱动的程序。开发者可以使用Observables模拟异步数据流,使用LINQ语法查询Observables,并且很容易管理调度器的并发。
Netflix 在2012年开始意识到他们的架构要满足他们庞大的用户群体已经变得步履维艰。因此他们决定重新设计架构来减少 REST 调用的次数。取代几十次的 REST 调用,而是让客户端自己处理需要的数据,他们决定基于客户端需求创建一个专门优化过的 REST 调用。
为了实现这一目标,他们决定尝试响应式,开始将.NET Rx 迁移到 JVM 上面。他们不想只基于 Java 语言;而是整个 JVM,从而有可能为市场上的每一种基于 JVM 的语言:如 Java、Clojure、Groovy、Scala 等等提供一种新的工具。
2013年二月份,Ben Christensen 和 Jafar Husain 发在 Netflix 技术博客的一篇文章第一次向世界展示了 RxJava。主要特点有:
- 易于并发从而更好的利用服务器的能力。
- 易于有条件的异步执行。
- 一种更好的方式来避免回调地狱。
- 一种响应式方法。
Github 对 RxJava 的介绍如下:
RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
RxJava 是一个在 JVM 上使用可观测的序列来组成异步的、基于事件的程序的库。所以要学习 RxJava 理念和精髓,重点应该是放在 Rx 上,而不是放在 Java 这门编程语言上。并且 Jake Wharton 大神在 Github 上贡献了 RxAndroid 扩展库。RxAndroid 是在 RxJava 的基础上结合 Android 系统特性而开发的一个库,如主线程、UI 事件等。
核心思想
RxJava 的核心思想:异步 和 响应式。
在响应式的世界里,为了给提供更好的用户体验,一些函数的计算、网络请求和数据库查询等操作应该异步执行,软件的使用者不应该等待这些结果。同理对于开发者而言,也不需要等待结果,而是在结果返回时通知他,在这期间开发者可以做想做的任何事情。
响应式编程不同于命令式或面向对象编程,参考以下代码:
int a = 1;
int b = 2;
int c = a + b; // c = 3
a = 4; // c 仍然是 3
对于命令式编程来说, c = a+b 意思是将表达式的结果赋值给 c ,而之后 a 和 b 的值改变均不会影响到 c 。所以在给 a 赋值为4 时,c 的值仍然是 3;但是在响应式编程中,c 的值会随着 a 或 b 的值更新而更新,当给 a 赋值为4 时,c 的值会自动更新为 6,响应式就是要关注值的变化。
举例: 一个最直观的例子就是 Excel 中,若规定 C1 = SUM(A1,B1),C1 的值一直会随着 A1 或 B1 的值变化。
响应式编程是一种基于异步数据流概念的编程模式。数据流就像一条河:它可以被观测,被过滤,被操作,或者为新的消费者与另外一条流合并为一条新的流。
响应式编程的一个关键概念是事件。事件可以被等待,可以触发过程,也可以触发其它事件。事件是唯一的以合适的方式将我们的现实世界映射到我们的软件中:如果屋里太热了我们就打开一扇窗户。同样的,当我们更改电子表(变化的传播)中的一些数值时,我们需要更新整个表格或者我们的机器人碰到墙时会转弯(响应事件)。
今天,响应式编程最通用的一个场景是 UI:我们的移动 App 必须做出对网络调用、用户触摸输入和系统弹框的响应。在这个世界上,软件之所以是事件驱动并响应的是因为现实生活也是如此。
响应式编程 文章链接:Reactive Programming
扩展的观察者模式
观察者模式
观察者模式是最常见的设计模式之一。它主要基于 Subject 这个概念,Subject 是一种特殊的对象,当他改变时,由它保存的一系列对象将会得到通知。而这一系列对象被称作 Observers 它们会对外暴露一个通知方法,当 Subject 状态变化时会调用的这个方法。
观察者模式.jpg
上图中,展示了 Subject/Observer 是怎样的一个 一对多的关系,如果有需要,一个 Subject 可以有无限多个 Observers,当 subject 状态发生变化时,这些 Observers 中的每一个都会收到通知。
观察者模式很适合下面这些场景中的任何一个:
- 当你的架构有两个实体类,一个依赖另一个,你想让它们互不影响或者是独立复用它们时。
- 当一个变化的对象通知那些与它自身变化相关联的未知数量的对象时。
- 当一个变化的对象通知那些无需推断具体是谁的对象时。
扩展
在 RxJava 中共有4个角色:
- Observable
- Observer
- Subscriber
- Subjects
Observables 和 Subjects 是两个“生产”实体,Observers 和 Subscribers 是两个“消费”实体。同时相比于观察者模式,RxJava 添加了三个观察者缺少的功能:
- 生产者在没有更多数据可用时能够发出信号通知:onCompleted() 事件。
- 生产者在发生错误时能够发出信号通知:onError() 事件。
- RxJava Observables 能够组合而不是嵌套,从而避免开发者陷入回调地狱。
核心概念
Observables
当我们异步执行一些复杂的事情,Java提供了传统的类,例如 Thread、Future、FutureTask、CompletableFuture 来处理这些问题。当复杂度提升,这些方案就会变得麻烦和难以维护。最糟糕的是,它们都不支持链式调用。
RxJava Observables 被设计用来解决这些问题。它们灵活,且易于使用,也可以链式调用,并且可以作用于单个结果程序上,更有甚者,也可以作用于序列上。无论何时你想发射单个标量值,或者一连串值,甚至是无穷个数值流,你都可以使用 Observable。
Observable的生命周期包含了三种可能的易于与Iterable生命周期事件相比较的事件,
使用Iterable时,消费者从生产者那里以同步的方式得到值,在这些值得到之前线程处于阻塞状态。相反,使用Observable时,生产者以异步的方式把值推给观察者,无论何时,这些值都是可用的。这种方法之所以更灵活是因为即便值是同步或异步方式到达,消费者在这两种场景都可以根据自己的需要来处理。
为了更好地复用Iterable接口,RxJava Observable 类扩展了GOF观察者模式的语义。引入了两个新的接口:
- onCompleted() 即通知观察者Observable没有更多的数据。
- onError() 即观察者有错误出现了。
从发射物的角度来看,有两种不同的 Observables :热的和冷的。一个"热"的 Observable 典型的只要一创建完就开始发射数据,因此所有后续订阅它的观察者能从序列中间的某个位置开始接受数据(有一些数据错过了)。一个"冷"的 Observable 会一直等待,直到有观察者订阅它才开始发射数据,因此这个观察者可以确保会收到整个数据序列。
Subject
Subject 是一个神奇的对象,它可以是一个Observable同时也可以是一个 Observer:它作为连接这两个世界的一座桥梁。一个 Subject 可以订阅一个 Observable,就像一个观察者,并且它可以发射新的数据,或者传递它接受到的数据,就像一个 Observable。很明显,作为一个Observable,观察者们或者其它 Subject 都可以订阅它。
一旦 Subject 订阅了 Observable,它将会触发 Observable开始发射。如果原始的 Observable 是“冷”的,这将会对订阅一个“热”的 Observable 变量产生影响。RxJava 提供四种不同的 Subject:
- PublishSubject,普通的 Subject 对象
- BehaviorSubject,BehaviorSubject会首先向他的订阅者发送截至订阅前最新的一个数据对象(或初始值),然后正常发送订阅后的数据流。
- ReplaySubject,会缓存它所订阅的所有数据,向任意一个订阅它的观察者重发:
- AsyncSubjec,当Observable 完成时 AsyncSubject只会发布最后一个数据给已经订阅的每一个观察者。
优势
RxJava 的优点一搜一箩筐,如提升开发效率,降低维护成本,简化逻辑代码,提升可读性等。其逆天之处在于当程序逻辑变得复杂的情况下,它已然能够保持简洁。
看一个 RxJava 保持简洁的范例(此例来源于 扔物线大神 的文章):界面上有一个自定义的视图 imageCollectorView ,它的作用是显示多张图片,并能使用 addImage(Bitmap) 方法来任意增加显示的图片。现在需要程序将一个给出的目录数组 File[] folders 中每个目录下的 png 图片都加载出来并显示在 imageCollectorView 中。需要注意的是,由于读取图片的这一过程较为耗时,需要放在后台执行,而图片的显示则必须在 UI 线程执行。贴出一种实现如下:
new Thread() {
@Override
public void run() {
super.run();
for (File folder : folders) {
File[] files = folder.listFiles();
for (File file : files) {
if (file.getName().endsWith(".png")) {
final Bitmap bitmap = getBitmapFromFile(file);
getActivity().runOnUiThread(new Runnable() {
@Override
public void run() {
imageCollectorView.addImage(bitmap);
}
});
}
}
}
}
}.start();
而用 RxJava 实现如下:
Observable.from(folders)
.flatMap(new Func1<File, Observable<File>>() {
@Override
public Observable<File> call(File file) {
return Observable.from(file.listFiles());
}
})
.filter(new Func1<File, Boolean>() {
@Override
public Boolean call(File file) {
return file.getName().endsWith(".png");
}
})
.map(new Func1<File, Bitmap>() {
@Override
public Bitmap call(File file) {
return getBitmapFromFile(file);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {
imageCollectorView.addImage(bitmap);
}
});
如果 IDE 是 Android Studio ,每次打开某个 Java 文件的时候,你会看到被自动 Lambda 化的预览,这将让你更加清晰地看到程序逻辑:
Observable.from(folders)
.flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) })
.filter((Func1) (file) -> { file.getName().endsWith(".png") })
.map((Func1) (file) -> { getBitmapFromFile(file) })
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });
流式的 API 调用风格看着很爽有木有!!! 良好的编码规范逻辑清楚,能方便我们更好的定位问题、解决问题,进而能提升我们的效率。
这里引用《代码大全》第 31 章中“把布局当做一种信仰”的一段话:
良好布局的目标:
1、准确表现代码的逻辑结构
2、始终如一的表现代码的逻辑结构
3、改善可读性
4、经得起修改
RxJava 的代码布局既准确的表示了代码的逻辑结构,又增强了可读性和可维护性,对于有代码洁癖的小伙伴来说是大大的福利啊。
使用范例
1、在项目的 build.gradle 文件中添加对 RxJava 的支持
对于 RxJava 1.x 版本,
compile 'io.reactivex:rxjava:x.y.z‘
对于 RxJava 2.x 版本
compile "io.reactivex.rxjava2:rxjava:2.x.y"
需要注意的是,在一个项目中, RxJava 1.x 和 2.x 是可以同时使用的 ,因为不同版本的依赖路径是不一样的,即对于同一个类 1.x 版本和 2.x 版本的全路径名不一样,我们在使用时只需要注意引入的包名是否正确就可以了。
2、使用范例
- 创建被观察者
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello,RxJava");
subscriber.onCompleted();
}
});
- 创建观察者
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
- 订阅
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);
RxJava 的使用还是比较简单的, 如果有更复杂的需求,可以熟悉相关操作符。 RxJava 的操作符基本能够满足所有的使用需求。
应用场景
- RxJava + Retrofit ,最热门的框架组合,实现网络请求及结果的异步处理
- RxBinding,Jake Wharton 的一个开源库,本质上为 View 设置一些 Listenter,只不过是用 Observable 实现,如界面按钮防止连续点击、Android 一些 CheckBox 点击更新响应视图等
- 线程切换,在后台线程取数据,主线程展示”的模式中看见
- 异步操作, 耗时操作,如复杂计算、网络请求、I/O操作等
- RxBus,RxBus并不是一个库,而是一种模式。其思想是用 RxJava 实现解耦,同 GreenRobot 的 EventBus 和 Otto 类似。
对于 RxJava 的基础部分就介绍就到这里了,后续文章将会介绍 RxJava 中的常用操作符及实现原理。