RxJava架构设计
RxJava使用系列
RxJava概念与观察者设计模式
1.起点 和 终点,一旦满足 起点 和 终点 这样的需求,都可以使用RxJava来实现。
2.标准中的观察者设计模式,一个被观察者 ---- 多个观察者 多次注册。
3.RxJava是改装的观察者设计模式,一个订阅(注册) 一个观察者。
RxJava上游和下游
1.上游 Observable 被观察者, 下游 Observer 观察者。
2.ObservableEmitter<Integer> emitter 发射器 发射事件。
3.拆分来写的,链式调用。
RxJava创建型操作符
create:使用者自己发射事件
just 内部自己发射的,单一对象
fromArray 内部自己发射的,数集对象
empty:内部自己发射的 ,下游默认是Object,无法发出有值事件,只会发射 onComplete
range:内部自己发射的,start 1 累加 count 5 最后结果:1 2 3 4 5
RxJava变换型操作符
上游 -------> 变换操作(往右边流向的时候,进行变换) ----------> 下游
map
flatMap
concatMap
groupBy
buffer
RxJava过滤性型操作符
filter
take 定时器
distinct 过滤重复的元素
elementAT 指定下标输出
RxJava条件性型操作符
all 如同if 全部为true 才是true,只要有一个false,就是false
contains 包含
any 同all相反,全部为false才是false,有一个为true就是true
RxJava合并性型操作符
startWith 两个给被观察者合并,先执行后边的被观察者
concatWith 两个给被观察者合并,和startWith相反
concat 多个合并成一个 最多四个 按照顺序执行
merge 多个合并成一个 最多四个 并列执行,并发
zip 需要对应关系,对应执行,如果不对应会被忽略,最多九个被观察者
RxJava异常处理操作符
onErrorReturn 能够接受e.onError 接受异常后,中断接受上游的后续发射的事情 作用 处理记录异常 通知给下一层
onErrorResumeNext 能够接受e.onError 接受异常后,也可以返回异常标识,返回被观察者,被观察者可以再次发射事件给下游
onExceptionResumeNext 能在发生异常的时候(throw Exception)保持程序的健壮,(这种异常一定是可以接受的,才能使用)
retry 重试
RxJava线程切换
异步线程区域 上游
Schedulers.io() 代表io流操作 网络操作 耗时操作
Schedulers.Thread() 比较常规的,普普通通
Schedulers.Comptation() 代表CPU大量计算所需要的线程
给上游分配多次,只会在第一次切换,后面就不切换了(忽略)
下游
AndroidSchedulers.mainThread() 专门为Android main线程量身定做
给下游切换,每次都会去切换
如果不配置异步线程,上游发一次,下游接受一次
如果配置异步线程,就是异步的表现
传统的写法四分五裂,RxJava基于事件流编程,一条线索下来
RxJava背压模式
Flowable 背压模式 什么情况下使用 如果没有背压模式,上游不停发射线索,下游处理不过来,就会造成内存泄漏
策略
1.Error 上游不停发射消息,下游阻塞,放入缓存池,如果池子满了,抛出异常
2.Buffer 上游不停发射消息,下游阻塞,放入缓存池,如果池子满了,等待下游来处理
3.Drop 上游不停发射消息,下游阻塞,放入缓存池,如果池子满了,把后面的丢弃
4.Latest 上游不停发射消息,下游阻塞,放入缓存池,如果池子满了,只存储128个事件
如果是同步的,下游没有调用request(),会抛异常,而如果是异步的,不执行request(),上游不会等待下游,不会抛出异常,所以外界可以调用
RxJava背压模式 Flowable 讲解
Observable---Observer
Flowable---Subscriber
c的设计和Flowable一致的,只不过Flowable增加了背压模式
Flowable下游,Subscription可以传递给外部使用,request(),事件给下游使用
RxJava+Retrofit(封装了OkHttp)
Retrofit请求网络
Retrofit返回一个结果---Observable
最终的结果是RxJava中的被观察者 上游Observable
RxJava泛型高级进阶
由于RxJava大量使用泛型
限定 通配符 ? 上限 extends
下限 super
读写模式 上限 不可写 可读
下限 可写 不完全可读
RxJava手写系列
RxJava书写create操作符
源码
public class Observable {
public Observable(ObservableOnSubscibe subscibe) {
this.subscibe = subscibe;
}
private ObservableOnSubscibesubscibe;
public static Observablecreat(ObservableOnSubscibe subscibe) {
return new Observable(subscibe);
}
public void subscibe(Observer observer){
observer.onSubscibe();
subscibe.subscribe(observer);
}//调用onSubscibe订阅成功,将observer传入subscibe,将执行subscibe里面的方法
}
public interface ObservableOnSubscibe {
public void subscribe(Observer observer);
}
public interface Observer {
public void onSubscibe();
public void onNext(T item);
public void onError(Throwable e);
public void onComplete();
}
RxJava书写create增加读写操作符
什么时候是读写模式 什么时候是上限、下限模式
在方法参数中,show(Test<? extends Object>) 一定是上限和下限
在真正使用到泛型中,一定是读写模式
RxJava书写just操作符
just操作符思路和流程和create是完全一致的,使用上的区别主要是create是用户自己发射,自己调用onNext(),而just是内部自己发射。
RxJava书写map操作符
public class Observable {
public Observable(ObservableOnSubscibe subscibe) {
this.subscibe = subscibe;
}
private ObservableOnSubscibesubscibe;
public static Observablecreat(ObservableOnSubscibe subscibe) {
return new Observable(subscibe);
}
public void subscibe(Observer observer){
observer.onSubscibe();
subscibe.subscribe(observer);
}
public Observable map(Function function){
MapObservable mapObservable=new MapObservable(function,subscibe);
return new Observable(mapObservable);
}//添加map方法,链式调用返回Observable
}
public class MapObservableimplements ObservableOnSubscibe {
private Functionfunction;
private ObservableOnSubscibesource;//管理上一层
private ObserverobserverEmitter;//管理下层
public MapObservable() {
}
@Override
public void subscribe(Observer observerEmitter) {
this.observerEmitter = observerEmitter;
MapObserver mapObservable =new MapObserver(observerEmitter, source, function);
source.subscribe(mapObservable);
}
public MapObservable(Function function, ObservableOnSubscibe source) {
Log.e("fsfsgsgs","MapObservable");
this.function = function;
this.source = source;
}
class MapObserverimplements Observer {
private ObserverobserverEmitter;//管理下层
private ObservableOnSubscibesource;//管理上一层
private Functionfunction;
public MapObserver(Observer observerEmitter, ObservableOnSubscibe source, Function function) {
this.observerEmitter = observerEmitter;
this.source = source;
this.function = function;
}
@Override
public void onSubscibe() {
observerEmitter.onSubscibe();
}
@Override
public void onNext(T item) {
R type =function.apply(item);
observerEmitter.onNext(type);
}
@Override
public void onError(Throwable e) {
observerEmitter.onError(e);
}
@Override
public void onComplete() {
observerEmitter.onComplete();
}
}
}
public interface Function {
public R apply(T t);
}