RxJava学习笔记
RxJava
Rxjava的GitHub官网上是这样介绍rxjava的:RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programes by using observable sequences
(RxJava是一个基于Reactive Extensions的JVM实现框架,它使用观察者模式的做法,将异步和基于事件的编程很好的结合起来),虽然是简短的一句话,但是很清晰的解释了RxJava,其中有三个关键字:观察者、异步、事件。
为什么要使用RxJava
总结来说RxJava主要有以下几点优点:
- 异步(作用):可以自由切换线程
- 观察者模式(模式)
- 响应式编程(结构)
- 逻辑简单(逻辑简洁):减少回调嵌套
响应式编程
在某种程度上,这并不是新东西。事件总线(Event buses)或常见的单机事件就是一个异步事件流,你可以观察这个流,也可以基于这个流做一些自定义的操作。响应式就是基于这种想法。你能够创建所有事物的数据流,而不仅仅只是单击和监听事件数据流。
简洁编程
假设有这样一个需求:给定一个图片目录,需要将这个目录下的所有图片显示到对应的imageview空间上,由于读取图片是一个比较耗时的过程,需要新启动一个线程来获得bitmap类型的图片,然后在主线程上显示图片,我们平时的做法是:
new Thread() {
@Override
public void run() {
super.run();
for (File folder : files) {
File[] files = folder.listFiles();
for (File file : files) {
if (file.getName().endsWith(".png")) {
final Bitmap bitmap = getBitmapFromFile(file);
MainActivity.this.runOnUiThread(new Runnable() {
@Override
public void run() {
imageCollectorView.addImage(bitmap);
}
});
}
}
}
}
}.start();
但是如果是RxJava2,实现的方式是这样的:
Observable.fromArray(files)
.flatMap(new Function<File, ObservableSource<File>>() {
@Override
public ObservableSource<File> apply(@NonNull File file) throws Exception {
return Observable.fromArray(file.listFiles());
}
})
.filter(new Predicate<File>() {
@Override
public boolean test(@NonNull File file) throws Exception {
return file.getName().endsWith(".png");
}
})
.map(new Func1<File, Bitmap>() {
@Override
public Bitmap call(File file) {
return getBitmapFromFile(file);
}
})
.subscribeOn(rx.schedulers.Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Object>() {
@Override
public void accept(@NonNull Object o) throws Exception {
if (o instanceof Bitmap){
imageView.addImage(o);
}
}
});
从代码上看,的确是RxJava的代码变多了,但是从逻辑来说,RxJava的实现是从上到下的链式调用,没有任何嵌套,这在逻辑的间接性上是具有优势的。
基本概念
RxJava使用的是观察者模式来实现的,观察者模式对于我们来说应该是比较熟悉的,我们在日常开发中经常使用到的,例如,setOnClickListener(listener)
就是一个观察者模式的应用。
- Observable: 被观察者
- Observer:观察者
基本用法
Observable(被观察者)的创建
- 使用create(),最基本的创建方式:
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Object> e) throws Exception {
e.onNext("next1");//发送字符串"next1"
e.onNext("next2");//发送字符串"next2"
e.onComplete();//发送完成
}
});
- 使用just(),将为你创建一个ObservableJust对象,并且依次发送just中的参数。
Observable.just("just1","just2");//依次发送“just1”和“just2”
- 使用fromXXX(),遍历集合,发送每个item。
List<String> list = new ArrayList<>();
list.add("from1");
list.add("from2");
list.add("from3");
Observable.fromIterable(list);//遍历发送list的每个item。
String[] strings = new String[]{"from1","from2","from3"};
Observable.fromArray(strings);//遍历发送strings的每个item。
- 使用defer(),有观察者订阅时才创建Observable,并且为每个观察者创建一个新的Observable
Observable.defer(new Callable<ObservableSource<?>>() {
@Override
public ObservableSource<?> call() throws Exception {
return Observable.just("defer");
}
});
- 使用interval(),创建一个按固定时间间隔发送整数序列的Observable,类似于计数器
Observable.interval(1, TimeUnit.SECONDS);//每个一秒发送一个数字
- 使用range(),创建一个发送特定整数序列的Observable,第一个参数为起始值,第二个为发送的个数。
Observable.rang(20,5);//发送20,21,22,23,24
- 使用timer(),创建一个Observable,它在一个给定的延迟发射一个特殊值,用做定时器。
Observable.timer(4,TimeUnit.SECONDS);//4秒后发射一个值
- 使用repeat(),创建一个重复发射特定数据的Observable
Observable.just("repeat").repeat(3);//重复发射3次
Observer(观察者)的创建
Observer observer = new Observer() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Object o) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
};
创建了Observer之后,就可以将Observable与其关联起来
Observable.subscribe(observer);//订阅
线程控制
如果订阅时不特别指定线程,那么就会在订阅时所在的线程产生事件。如果需要切换线程,就需要用到Scheduler。
- Schedulers.computation():计算所使用的Scheduler。这个Scheduler使用的固定的线程池,大小为CPU核数。不要把I/O操作放在computation()中,否则I/O操作的等待时间会浪费CPU
- Schedulers.io():I/O操作(读写文件、读写数据库、网络信息交互等)所使用的Scheduler。行为模式和newThrea()差不多,区别在于io()的内部实现是一个无上限的线程池,可以用重用空闲的线程,因此多数情况下io()比newThread()更有效率。不要把计算工作放在io()中,可以避免创建不必要的线程。
- Schedulers.newThread():总是启用新线程,并在新线程执行操作。
- Schedulers.single():在一个固定的线程中(非主线程),保证顺序性。
- Schedulers.trampoline():直接在当前线程中运行,相当于不指定线程。
- AndroidSchedulers.mainThread():指定的操作将在Android主线程运行。
有了这几个Scheduler,就可以使用subscribeOn()和observeOn()两个方法来对线程进行控制了。
- subscribeOn():指定subscribe()所发生的线程,即Observable.OnSubscribe被激活时所处的线程。
- observeOn():指定Subscriber所运行的线程。
举个例子,在I/O线程发送事件,在主线程接收事件
Observable.fromArray("from1","from2","from3")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.i("knight","S: "+ s);
}
});
变换
RxJava提供了对事件序列进行变换的支持,也就是核心的功能之一。所谓的变化,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或者事件序列。
- map():map就是将传入的数据类型,经过一些处理,转换为你需要的数据类型,例如,将输入的类型为Int类型,需要在接受事件的时候的数据类型为String。
Observable.fromArray(1,2,3)// 输入类型 Integer
.subscribeOn(Schedulers.io())
.map(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer s) throws Exception { // 参数类型 Integer
Log.i("knight","thread: "+ Thread.currentThread().getName());
return String.valueOf(s);// 返回类型 String
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {// 参数类型 String
Log.i("knight","thread: "+ Thread.currentThread().getName());
Log.i("knight","S: "+ s);
}
});
- flatMap():将list集合,逐个发送给观察者,
flatMap()
和map()
有一个相同点:它也是把传入的参数转化之后返回另一个对象。不同点:flatMap()
中返回的是个Observable
对象,并且这个Observable
对象并不是被直接发送到了Observer
的回调方法中。
假如有这样一个需求:需要得到公司每个部门的每个员工的姓名,不用flatMap()
List<Dept> depts = new ArrayList<>();
Observable.fromIterable(depts)
.subscribe(new Consumer<Dept>() {
@Override
public void accept(@NonNull Dept dept) throws Exception {
List<String> persons = dept.getPerson();
for (String person : persons) {
System.out.print(person);
}
}
});
使用flatMap()
List<Dept> depts = new ArrayList<>();
Observable.fromIterable(depts).flatMap(new Function<Dept, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull Dept dept) throws Exception {
return Observable.fromIterable(dept.getPerson());
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.print(s);
}
});