RxJava 从入门到放弃再到不离不弃
作者寄语
很久之前就想写一个专题
,专写Android开发框架
,专题的名字叫 XXX 从入门到放弃
,沉淀了这么久,看过网络诸多大神的博客,静下心来开始写这个专题
,为什么叫入门到放弃
呢;相信大家学习新框架的时候,尤其是像Rxjava
或者Dagger
等等这种新的编程思想;需要一定的阅读理解能力和思维逻辑;那么本专题旨在帮助大家不要太过急功近利,不要被冗长的代码和文章,晦涩的思想所打败,相信大家只要坚持看完,一定会有所收获的;废话不多说,那么这个专题开篇就以RxJava
来讲吧,预计后面还会有几篇大型框架的讲解,想想还有点小激动;
友情提示:文章较长,请耐心看完;
使用下面方式,最后发现“OnSubscribe”还是在默认线程中运行;原因是subscribeOn这类操作后,返回的是一个新的Observable。
observable.subscribeOn(Schedulers.io());
observable.observeOn(AndroidSchedulers.mainThread());
observable .subscribe(subscribe);
可以修改为下面两种方式:
observable = observable.subscribeOn(Schedulers.io());
observable = observable.observeOn(AndroidSchedulers.mainThread());
observable .subscribe(subscribe);
//OR
observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscribe);
前面讲到了,可以利用 subscribeOn()
结合 observeOn()
来实现线程控制,让事件的产生和消费发生在不同的线程。可是在了解了 map()
flatMap()
等变换方法后,有些好事的(其实就是当初刚接触 RxJava 时的我)就问了:能不能多切换几次线程?
答案是:能。
因为 observeOn()
指定的是 Subscriber
的线程,而这个 Subscriber
并不是(严格说应该为『不一定是』,但这里不妨理解为『不是』)subscribe()
参数中的 Subscriber
,而是 observeOn()
执行时的当前 Observable
所对应的 Subscriber
,即它的直接下级 Subscriber
。换句话说,observeOn()
指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次 observeOn()
即可。上代码:
Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(mapOperator) // 新线程,由 observeOn() 指定
.observeOn(Schedulers.io())
.map(mapOperator2) // IO 线程,由 observeOn() 指定
.observeOn(AndroidSchedulers.mainThread)
.subscribe(subscriber); // Android 主线程,由 observeOn() 指定
如上,通过 observeOn() 的多次调用,程序实现了线程的多次切换。
不过,不同于 observeOn()
, subscribeOn()
的位置放在哪里都可以,但它是只能调用一次的。
又有好事的(其实还是当初的我)问了:如果我非要调用多次 subscribeOn()
呢?会有什么效果?
这个问题先放着,我们还是从 RxJava
线程控制的原理说起吧。
Scheduler 的原理
其实, subscribeOn()
和 observeOn()
的内部实现,也是用的 lift()
。具体看图(不同颜色的箭头表示不同的线程):
subscribeOn()
原理图:
observeOn()
原理图:
从图中可以看出,subscribeOn()
和 observeOn()
都做了线程切换的工作(图中的 "schedule..." 部位)。不同的是, subscribeOn()
的线程切换发生在 OnSubscribe
中,即在它通知上一级 OnSubscribe
时,这时事件还没有开始发送,因此 subscribeOn()
的线程控制可以从事件发出的开端就造成影响;而 observeOn()
的线程切换则发生在它内建的 Subscriber
中,即发生在它即将给下一级 Subscriber
发送事件时,因此 observeOn()
控制的是它后面的线程。
最后,我用一张图来解释当多个 subscribeOn()
和 observeOn()
混合使用时,线程调度是怎么发生的(由于图中对象较多,相对于上面的图对结构做了一些简化调整):
图中共有 5 处含有对事件的操作。由图中可以看出,①和②两处受第一个 subscribeOn()
影响,运行在红色线程;③和④处受第一个 observeOn()
的影响,运行在绿色线程;⑤处受第二个 onserveOn()
影响,运行在紫色线程;而第二个 subscribeOn()
,由于在通知过程中线程就被第一个 subscribeOn()
截断,因此对整个流程并没有任何影响。这里也就回答了前面的问题:当使用了多个 subscribeOn()
的时候,只有第一个 subscribeOn()
起作用。
延伸:doOnSubscribe()
doOnSubscribe()
一般用于执行一些初始化操作.
然而,虽然超过一个的 subscribeOn()
对事件处理的流程没有影响,但在流程之前却是可以利用的。
在前面讲 Subscriber
的时候,提到过 Subscriber
的 onStart()
可以用作流程开始前的初始化。然而 onStart()
由于在 subscribe()
发生时就被调用了,因此不能指定线程,而是只能执行在 subscribe()
被调用时的线程。这就导致如果 onStart()
中含有对线程有要求的代码(例如在界面上显示一个 ProgressBar
,这必须在主线程执行),将会有线程非法的风险,因为有时你无法预测 subscribe()
将会在什么线程执行。
而与 Subscriber.onStart()
相对应的,有一个方法 Observable.doOnSubscribe()
。它和 Subscriber.onStart()
同样是在 subscribe()
调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下, doOnSubscribe()
执行在 subscribe()
发生的线程;而如果在 doOnSubscribe()
之后有 subscribeOn()
的话,它将执行在离它最近的 subscribeOn()
所指定的线程。
示例:
Observable.create(onSubscribe)
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
@Override
public void call() {
progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行
}
})
.subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
如上,在 doOnSubscribe()
的后面跟一个 subscribeOn()
,就能指定准备工作的线程了。
RxJava 的适用场景和使用方式
RxJava + Retrofit
Retrofit 是 Square 的一个著名的网络请求库。对于
Retrofit
不了解的同学
可以参考我之前写的文章:全新的网络加载框架Retrofit2,上位的小三
Retrofit
除了提供了传统的 Callback
形式的 API
,还有 RxJava
版本的 Observable
形式 API
。下面我用对比的方式来介绍 Retrofit
的 RxJava
版 API
和传统版本的区别。
以获取一个 MovieEntity
对象的接口作为例子。使用Retrofit
的传统 API,你可以用这样的方式来定义请求:
@GET("top250")
Call<MovieEntity> getTopMovie(@Query("start") int start, @Query("count") int count);//正常返回Call对象
我们来写getMovie
方法的代码:
//进行网络请求
private void getMovie(){
String baseUrl = "https://api.douban.com/v2/movie/";
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(baseUrl)
.addConverterFactory(GsonConverterFactory.create())
.build();
MovieService movieService = retrofit.create(MovieService.class);
Call<MovieEntity> call = movieService.getTopMovie(0, 10);
call.enqueue(new Callback<MovieEntity>() {
@Override
public void onResponse(Call<MovieEntity> call, Response<MovieEntity> response) {
resultTV.setText(response.body().toString());
}
@Override
public void onFailure(Call<MovieEntity> call, Throwable t) {
resultTV.setText(t.getMessage());
}
});
}
以上为没有经过封装的、原生态的Retrofit
写网络请求的代码。
而使用 RxJava
形式的 API
,定义同样的请求是这样的:
@GET("top250")
Observable<MovieEntity> getTopMovie(@Query("start") int start, @Query("count") int count);//RxJava返回Observable对象
Retrofit
本身对Rxjava
提供了支持,getMovie
方法改为:
//进行网络请求
private void getMovie(){
String baseUrl = "https://api.douban.com/v2/movie/";
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(baseUrl)
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())//提供RXjava支持
.build();
MovieService movieService = retrofit.create(MovieService.class);
movieService.getTopMovie(0, 10)//返回Observable对象
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<MovieEntity>() {
@Override
public void onCompleted() {
Toast.makeText(MainActivity.this, "Get Top Movie Completed", Toast.LENGTH_SHORT).show();
}
@Override
public void onError(Throwable e) {
resultTV.setText(e.getMessage());
}
@Override
public void onNext(MovieEntity movieEntity) {
resultTV.setText(movieEntity.toString());
}
});
}
这样基本上就完成了Retrofit
和Rxjava
的结合,大家可以自己进行封装;那么用上了RxJava
,我们就可以用它强大的操作符
来对数据进行处理和操作,各位看官可以具体去实现,我在这里不做多做赘述。
参考文章:RxJava 与 Retrofit 结合的最佳实践
RxBinding
RxBinding
是 Jake Wharton
的一个开源库,它提供了一套在 Android
平台上的基于 RxJava
的 Binding API
。所谓 Binding
,就是类似设置 OnClickListener
、设置 TextWatcher
这样的注册绑定对象的 API
。
举个设置点击监听的例子。使用 RxBinding
,可以把事件监听用这样的方法来设置:
Button button = ...;
RxView.clickEvents(button) // 以 Observable 形式来反馈点击事件
.subscribe(new Action1<ViewClickEvent>() {
@Override
public void call(ViewClickEvent event) {
// Click handling
}
});
看起来除了形式变了没什么区别,实质上也是这样。甚至如果你看一下它的源码,你会发现它连实现都没什么惊喜:它的内部是直接用一个包裹着的 setOnClickListener()
来实现的。然而,仅仅这一个形式的改变,却恰好就是 RxBinding
的目的:扩展性。通过 RxBinding
把点击监听转换成 Observable
之后,就有了对它进行扩展的可能。扩展的方式有很多,根据需求而定。一个例子是前面提到过的 throttleFirst()
操作符,用于去抖动,也就是消除手抖导致的快速连环点击:
RxView.clickEvents(button)
.throttleFirst(500, TimeUnit.MILLISECONDS)
.subscribe(clickAction);
如果想对 RxBinding
有更多了解,可以去它的 GitHub
项目 下面看看。
RxLifecyle
RxLifecycle
配合 Activity/Fragment
生命周期来管理订阅的。 由于 RxJava
Observable
订阅后(调用 subscribe
函数),一般会在后台线程执行一些操作(比如访问网络请求数据),当后台操作返回后,调用 Observer
的 onNext
等函数,然后在 更新 UI 状态。 但是后台线程请求是需要时间的,如果用户点击刷新按钮请求新的微博信息,在刷新还没有完成的时候,用户退出了当前界面返回前面的界面,这个时候刷新的 Observable
如果不取消订阅,则会导致之前的 Activity
无法被 JVM
回收导致内存泄露。 这就是 Android
里面的生命周期管理需要注意的地方,RxLifecycle
就是用来干这事的。比如下面的示例:
myObservable
.compose(RxLifecycle.bindUntilEvent(lifecycle, ActivityEvent.DESTROY))
.subscribe();
这样Activity
在destroy
的时候就会自动取消这个observer
RxBus
RxBus
并不是一个库,而是一种模式。相信大多数开发者都使用过EventBus
或者Otto
,作为事件总线通信库,如果你的项目已经加入RxJava
和EventBus
,不妨用RxBus
代替EventBus
,以减少库的依赖。RxJava
也可以轻松实现事件总线,因为它们都依据于观察者模式。
拓展链接:
用RxJava实现事件总线(Event Bus)
[深入RxBus]:支持Sticky事件
RxPermission
RxPermission
是基于RxJava
开发的用于帮助在Android 6.0
中处理运行时权限检测的框架。在Android 6.0
中,系统新增了部分权限的运行时动态获取。而不再是在以前的版本中安装的时候授予权限。
拓展链接:
使用RxPermission框架对android6.0权限进行检测
总结
简而言之Rxjava
是一个很牛逼的库,如果你的项目中还没有使用RxJava
的话,建议可以尝试去集成使用;对大多数人而已RxJava
是一个比较难上手的库了,不亚于Dagger
的上手难度;不过当你认识学习使用过了,你就会发现RxJava
的魅力所在;如果看一遍没有看懂的童鞋,建议多看几次;动手写写代码,我想信本文可以给到你们一些帮助;你们真正的体会到什么是 从入门到放弃再到不离不弃
;这就是RxJava
的魅力所在。
拓展阅读:
给 Android 开发者的 RxJava 详解 - 抛物线
如果有疑问和见解,也欢迎大家在下面留言,我会一一回复大家
以上