RxJava 总结
Hi~ 我是RxJava
现在RxJava有1.0和2.0二个版本。这里描述的内容都是主要基于1.0版本,2.0相关内容会稍微提及。
介绍
一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。
项目主页
中文文档
https://mcxiaoke.gitbooks.io/rxdocs/content/
API文档
http://reactivex.io/RxJava/javadoc/
项目地址
https://github.com/ReactiveX/RxJava
https://github.com/ReactiveX/RxAndroid
基本概念
0.基本模式
image.png1.Observable被观察者,事件源。
在观察者模式中被观察对象。处于流程的上游。
2.Observer
观察者,接收源。在观察者模式中观察者。
需要实现以下的方法:
-
onNext(T item)
Observable调用这个方法发射数据,方法的参数就是Observable发射的数据,这个方法可能会被调用多次,取决于你的实现。 -
onError(Exception ex)
当Observable遇到错误或者无法返回期望的数据时会调用这个方法,这个调用会终止Observable,后续不会再调用onNext和onCompleted,onError方法的参数是抛出的异常。 -
onComplete
正常终止,如果没有遇到错误,Observable在最后一次调用onNext之后调用此方法。
3.Subscriber
订阅者,其实也是接收者。比Observer多了一个Subscription。
public abstract class Subscriber<T> implements Observer<T>, Subscription {
......
}
public interface Subscription {
/**
* Stops the receipt of notifications on the {@link Subscriber} that was registered when this Subscription
* was received.
* <p>
* This allows unregistering an {@link Subscriber} before it has finished receiving all events (i.e. before
* onCompleted is called).
*/
void unsubscribe();
/**
* Indicates whether this {@code Subscription} is currently unsubscribed.
*
* @return {@code true} if this {@code Subscription} is currently unsubscribed, {@code false} otherwise
*/
boolean isUnsubscribed();
}
- unsubscribe();
取消订阅 - isUnsubscribed();
判断是否订阅
取消订阅的结果会传递给这个Observable的操作符链,而且会导致这个链条上的每个环节都停止发射数据项。这些并不保证会立即发生,然而,对一个Observable来说,即使没有观察者了,它也可以在一个while循环中继续生成并尝试发射数据项。
4.Operator:操作符
几种主要的需求
直接创建一个Observable(创建操作)
- 组合多个Observable(组合操作)
- 对Observable发射的数据执行变换操作(变换操作)
- 从Observable发射的数据中取特定的值(过滤操作)
- 转发Observable的部分值(条件/布尔/过滤操作)
- 对Observable发射的数据序列求值(算术/聚合操作)
如果你想实现你自己的操作符,可以参考这里:实现自定义操作符
5.Scheduler:调度器,可以切换主线程和各个线程。
- Schedulers.immediate()
在当前线程运行,相当于不切换线程。这是默认的 Scheduler。
- Schedulers.newThread()
总是启用新线程,并在新线程执行操作。
- Schedulers.io()
I/O 操作(读写文件、数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
- Schedulers.computation()
计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
- AndroidSchedulers.mainThread()
切换到主线程,指定的操作将在Android 主线程运行。
8.ActionX:没有返回值的函数
Action0.class
public interface Action0 extends Action {
void call();
}
Action1.class
public interface Action1<T> extends Action {
void call(T t);
}
Action2.class
public interface Action2<T1, T2> extends Action {
void call(T1 t1, T2 t2);
}
9.FuncX:有返回值的函数
Func0.class
public interface Func1<T, R> extends Function {
R call(T t);
}
Func1.class
public interface Func1<T, R> extends Function {
R call(T t);
}
Func2.class
public interface Func2<T1, T2, R> extends Function {
R call(T1 t1, T2 t2);
}
与Action的区别在于是否返回参数。
操作符
创建操作
用于创建Observable的操作符
- create:通过调用观察者的方法从头创建一个Observable
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Observable Create call");
subscriber.onCompleted();
}
});
- empty:创建行为受限的特殊Observable
Observable deferObservable = Observable.empty();
- from:将其它的对象或数据结构转换为Observable
List<String> list = new ArrayList<>();
list.add("from1");
list.add("from2");
list.add("from3");
Observable fromObservable = Observable.from(list); //遍历list 每次发送一个
变换操作
这些操作符可用于对Observable发射的数据进行变换,详细解释可以看每个操作符的文档
- buffer:缓存,可以简单的理解为缓存,它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个
Observable rangeObservable = Observable.from(list).buffer(3);
- map:映射,通过对序列的每一项都应用一个函数变换Observable发射的数据,实质是对序列中的每一项执行一个函数,函数的参数就是这个数据项
Observable observable = Observable.from(list).map(new Func1<String, String>() {
@Override
public String call(String string) {
return string + " map";
}
});
过滤操作
这些操作符用于从Observable发射的数据中进行选择
- filter:过滤,过滤掉没有通过谓词测试的数据项,只发射通过测试的
Observable.just(1, 2, 3, 4, 5, 6)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer>4;
}
}).subscribe(ObserverFactory.createIntObserver());
- first:首项,只发射满足条件的第一条数据
Observable.just(1, 2, 3, 4, 5, 6)
.first(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer>2;
}
}).subscribe(ObserverFactory.createIntObserver());
算术和聚合操作
这些操作符可用于整个数据序列
Concat: 不交错的连接多个Observable的数据
Observable<Integer> source1 = Observable.just(1, 2, 5, 6);
Observable<Integer> source2 = Observable.just(7, 6, 5, 6);
Observable.concat(source1,source2).distinct().subscribe(ObserverFactory.createIntObserver());
辅助操作
一组用于处理Observable的操作符
- delay:延迟一段时间发射结果数据
Observable justObservable = Observable.just("just1","just2");//依次发送"just1"和"just2"
justObservable.delay(3000, TimeUnit.MILLISECONDS).subscribe(ObserverFactory.createObserver());
- observeOn:指定观察者观察Observable的调度程序(工作线程)
Observable justObservable = Observable.just("just1","just2");//依次发送"just1"和"just2"
justObservable.observeOn(Schedulers.immediate()).subscribe(ObserverFactory.createObserver());
更多操作符例子
https://github.com/iamludaxu/ae/tree/master/app/src/test/java/gift/witch/android/ae/rxjava