RxJava 使用篇
2018-09-12 本文已影响154人
RobinYeung
一、什么是RxJava
Rx = Reactive Extension
-
Reactive 响应
- 响应式编程
- 基于观察者模式
- 注入回调
- 调用回调
- 事件序列
- 不可预知、动态离散
- 如用户点击、异步请求、Model更新
-
Extension 扩展
- 函数式编程
- 单一职责原则
- 依赖倒置原则
- 数据流
- 现成的,静态的,连续的
- 如处理字符串数据,文件数据等
- 线程调度
- 时间控制:延时、周期任务
- 线程控制:前后台调度
- 异常处理
- 重试
- onError/onErrorResumeNext/onErrorReturn
- 轮子
- 转换、过滤、防抖、组合、重复、重试等操作符
- 函数式编程
-
三种编程思想的对比 举例把大象放进冰箱
- 面向对象编程:
- 构建一个冰箱,具有开门,关门的方法
- 构建一个大象,具有走进冰箱的方法
- 实例化一个冰箱对象,实例化一个大象对象,冰箱对象调用开门方法,大象对象调用走进冰箱方法,冰箱对象调用关门方法,大象被成功装入冰箱。
- 响应式编程:
- 构建一个发射源,发送大象
- 构建一个响应器,接受到大象后关进冰箱。
- 用这个响应器监听发射源。
- 函数式编程:
- 构建一个函数,接收大象和冰箱两个参数
- 在此函数内部做实现,返回冰箱已经装入大象
- 调用此函数,将大象和冰箱作为参数传入
- 面向对象编程:
二、基本知识
观察源
观察源-
Observable / Flowable
- 基本数据流观察源
- 可发射多个onNext事件
- 可发射onError或onComplete事件来终止结束整个事件流
-
Single
- 单独发射一个onSuccess或者onError事件
- onSuccess相当于onNext
-
Completable
- 单独发射一个onComplete或onError事件,一般用于单纯的调用,而没有数据处理的逻辑
- 比Observable少了很多处理元素的操作符
- 经常使用andThen来转换流到其他观察源
-
Maybe
- Single和Completable的结合。有onSuccess、onComlete、onError三种事件,但只会发射其中一个。
事件源 Subject
- PublishSubject
- 观察者只能收到订阅之后的事件
- BehaviorSubject
- 粘性,订阅时会立即收到订阅前最后一个事件或默认事件
- ReplaySubject
- 无论什么时候订阅,都可以收到所有事件
- 当然,可以指定Replay的初始容量(默认16),上限(默认无上限),或Replay的时间上限
- AsyncSubject
- subject的onComplete被调用时,才会把事件发射给观察者
- SerialedSubject
- 串行Subject,保证发射一个事件,消费完才会发射下一个事件
背压
-
Observable / Flowable 无背压处理
- Observable数据流处理元素数量不要过多,否则容易OOM
- Flowable专门用于处理大量数据流,如解析各种流等。
- Flowable可以控制数据发出速度
-
Subject / Process 有背压处理
- Subject消费速度不要低于生产速度,否则可能出现OOM
- Process可以选择背压策略来处理消费速度低于生产速度的情况
-
背压策略
- 上游背压策略。通过 create 或 toFlowable 创建的时候可以选择5种策略
- MISSING: 背压交由下游处理(通过onBackpressureXXX)
- ERROR: 下游无法处理时,抛出MissingBackpressureException
- BUFFER: 缓存起来,直到下游可以消费掉
- DROP: 抛弃掉,如果下游无法处理
- LATEST: 只保留最新的
- 下游背压处理
- onBackpressureDrop 方法处理背压,下游实现onDrop方法
- onBackpressureLatest 方法处理背压,相当于上游选择了LATEST策略
- onBackpressureBuffer 方法处理背压,此时可以选择ERROR DROP_OLDEST DROP_LATEST三种应对策略
- 上游背压策略。通过 create 或 toFlowable 创建的时候可以选择5种策略
事件流
- 创建:创建事件流或数据流
- 组合:使用链式操作符来变换所创建的事件流
- 监听:订阅事件流并实现业务响应事件
操作符
- 创建
- create 用函数式创建观察源
- just 用常量或变量创建观察源
- formArray 用数组创建观察源,元素逐个发送
- fromIterable 用可迭代对象创建观察源,元素逐个发送
- range 用整数数列创建观察源,元素逐个发射
- timer interval 时间类观察源,此类观察源默认使用computation调度器
- merge concat 等组合类,可以合并多个观察源来创建一个观察源
- 转换
- map 变换元素
- flatMap 从元素切换到新的观察源
- 过滤
- filter 符合条件的发射到下游
- distinct 非重复的元素才发射到下游
- take 指定允许发射到下游的个数
- skip 忽略发射到下游的个数
- ofType 只允许指令类型的元素发射到下游
- 防抖
- debounce throttle 防抖或取样
- 组合
- merge concat 合并多个流,并按规则分别发射这些流的元素
- 各个流不会相互影响发射到下游的结果
- 聚合
- zip amb combineLast scan 合并多个流,并对这些流的元素合并处理后再发射到下游
- 各个流会相互影响发射到下游的结果
- 重复
- repeat onComplete后自动重新订阅
- retry onError后自动重新订阅
- 异步阻塞转同步
- blockingFirst等
线程调度
- 分类与调度规则
- Schedulers.trampoline
- 默认。当前线程
- Schedulers.single
- 一个单例的后台线程
- Schedulers.newThread
- 总是启用新线程,并在新线程执行操作。
- Schedulers.io
- I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的调度器。和newThread() 类似,区别在于 io() 实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 性能消耗更低。
- 不要把计算工作放在 io() 中,可以避免创建不必要的线程。
- Schedulers.computation
- 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 限制性能的操作,例如图形的计算,延时计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
- timer、interval等很多时间相关操作符将其作为默认调度器
- Schedulers.from()
- 通过一个指定一个Executor来担当调度器
- AndroidSchedulers.mainThread
- RxAndroid提供的,它指定的操作将在 Android 主线程运行。
- Schedulers.trampoline
三、其他
扩展 Rx相关库
- Retrofit
- 网络请求响应
- RxBinding
- View响应
- RxPermissions
- Permission状态改变响应
- RxLifeCycle
- 作者并不推崇这个库
- RxBus
- 事件总线
// RxBinding
RxView.clicks(view)
.throttleFirst(ms, TimeUnit.MILLISECONDS)
.compose(RxLifecycleAndroid.bindView(view))
.subscribe(x -> listener.onClick(view));
// RxPermissions
new RxPermissions(getActivity())
.request(Manifest.permission.CAMERA)
.subscribe(granded -> {
if(granted) {
// ...
}
});
// RxLifecycle
public class OneFragment extends RxFragment {
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(saveInstanceState);
PublishSubject.create()
.compose(bindUntilEvent(FragmentEvent.DESTROY))
.subscribe();
}
}
// RxPreferences
RxPreferences.INSTANCE
.<String>onPreferenceChanged()
.filter(p -> TextUtils.equels(p.getKey(), SOME_PREFERENCE_KEY))
.subscribe(p -> ...);
// RxNetwork
RxNetwork.INSTANCE
.onConnectionChanged()
.filter(info -> info.getNetworkType() == ConnectivityManager.TYPE_WIFI)
.filter(info -> !info.isConnected())
.observeOn(AndroidSchedulers.mainThread())
.subcribe(info -> ...);
常见坑
- 生命周期
- dispose!
- 由于很多事件流都不在主线程,避免线程泄露必须注意dispose
- dispose?
- dispose只是结束订阅事件流,首先不能立即停止最后一个异步事件,更不能停止操作符引入的线程
- RxLifeCycle?
- 其作者对这个库持怀疑态度
- 如果订阅的地方发生在没有生命周期的类中,就需要组件去获取Activity的生命周期,然而这种行为是没有保障的,当订阅失败时也是模糊的,如果不是人为去执行,往往具有不确定性
- 有些事件队列的生命周期和Activity等的生命周期不等价,依然需要手动处理,如果手动处理和自动处理并存则让人困惑
- 你的Activity和Fragment需要继承库里面的相关基类
- 建议使用AutoDispose,其比RxLifeCycle更优秀
- dispose!
- CompositeDisposable
- clear 对所有add进来的Dispsable执行dispose
- dispose 在clear的基础上,让这个CompositeDisposable无法再使用,甚至add就会dispose你的事件流
- 线程调度
- subscribeOn的坑
// a()运行在computation b()运行在io c() d()运行在主线程
Observable.create(emitter -> a())
.observeOn(Schedulers.io())
.flatMap(x -> {
b();
return Observable
.create(emitter -> c())
.subscribeOn(AndroidSchedulers.mainThread());
})
.map(p -> d())
.subscribeOn(Schedulers.computation())
.subscribe();
// 当把flatMap里面的subscribeOn移到主流程上,事情就变了
// a()运行在主线程上 b() c() d()运行在io。第二个subscribeOn不生效
Observable.create(emitter -> a())
.observeOn(Schedulers.io())
.flatMap(x -> {
b();
return Observable.create(emitter -> c())
})
.subscribeOn(AndroidSchedulers.mainThread());
.map(p -> d())
.subscribeOn(Schedulers.computation())
.subscribe();
- 背压
- 如果不注意处理背压就可能导致OOM
- UnDeliverableException
- 未复写subscribe()的onError,当上游抛出错误,整个流会直接抛异常
- 记得写onError或处理异常的操作符,如retry、onErrorReturn等
- 流已经被dispose了,此时上游抛异常
- dispose与Thread.interrupt()类似,只起到通知的作用,不起到立即结束的作用
- 判断流是否断开再抛异常
- 未复写subscribe()的onError,当上游抛出错误,整个流会直接抛异常