RxJava(2.0)-你可能需要知道这些
作为一个Android开发从业者,当你处理异步任务时,如果还在使用着Handler+Thread,那么你可能需要了解下RxJava这个优秀的开源框架;当然如果你正在跳槽面试,RxJava也是经常被问到的框架。
关于介绍RxJava的文章也非常多,但是很多文章基于的版本还是1.0.X,而本博文就基于2.0版本对RxJava进行一个简单的介绍和分析,也算是抛砖引玉吧。
本博文基于RxJava 2.0.0版本进行分析讲解。
参考:抛物线大神《给 Android 开发者的 RxJava 详解》
RxJava是什么?
简单的归纳为两个字:异步。
归纳毕竟是归纳,不能完全表明RxJava的概念,那么我们来看GitHub上给出的解释:
a library for composing asynchronous and event-based programs by using observable sequences.
我用我蹩脚的CET-6水平给大家翻译下,大概就是这个意思:
一个使用可观测序列来组成异步的、基于事件的程序的库。
这对于刚接触的童鞋们可能不太容易理解,RxJava的核心还是异步,其他的定语都是基于其之上,有了这个思维和认识,再去学习RxJava也能更容易接受和理解其设计。
为什么要使用RxJava?
我写溜溜的[AsyncTask / Handler / Thread/ ... ],干嘛要使用这个奇怪的RxJava啊?
还能为什么?简洁呗。
异步操作的很重要的一点就是保持程序和代码的简洁性,Android内部提供的AsyncTask以及Handlder+Thread都是为了解决异步代码编写繁琐问题,从而使编写异步代码更加简洁。在保持代码和程序简洁这个目的上,RxJava倒是更加的努力和方便,它的优点是随着程序逻辑变得越来越复杂,它仍然可以保持简洁、优雅。
口说无凭,我们来分析下面这样一个例子。
图片展示可能是我们每个Android开发者都要面对的问题,假设在我们的Activity上存在一个ListView,并且我们提供了一个addImage方法来任意添加待显示的图片。现在需要将某个目录下所有的png图片都加载并显示在ListView中,由于读取和解析图片是一个耗时过程,因此我们需要将这个过程放在后台执行;而图片的显示则必须放在主线程(UI线程)中。
那么在没有使用RxJava时,我们怎么编写这段代码呢?
new Thread() {
@Override
public void run() {
super.run();
for (File folder : folders) {
File[] files = folder.listFiles();
for (File file : files) {
if (file.getName().endsWith(".png")) {
final Bitmap bitmap = getBitmapFromFile(file);
((MainActivity) context).runOnUiThread(new Runnable() {
@Override
public void run() {
imageList.add(bitmap);
imageListAdatper.notifyDataSetChanged();
}
});
}
}
}
}
}.start();
没有对比,就没有伤害,如果我们使用RxJava的话,是如何实现的呢?
Observable.fromArray(folders)
.flatMap(new Function<File, ObservableSource<File>>() {
@Override
public ObservableSource<File> apply(File file) throws Exception {
return Observable.fromArray(file.listFiles());
}
})
.filter(new Predicate<File>() {
@Override
public boolean test(File file) throws Exception {
return file.getName().endsWith(".png");
}
})
.map(new Function<File, Bitmap>() {
@Override
public Bitmap apply(File file) throws Exception {
return getBitmapFromFile(file);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Bitmap>() {
@Override
public void accept(Bitmap bitmap) throws Exception {
imageList.add(bitmap);
imageListAdatper.notifyDataSetChanged();
}
});
这代码变简洁了吗?这代码量也没减少啊,而且这一大堆代码都是什么意思啊?完全看不懂啊。
各位看官,你先消消气,我们讲的简洁是:逻辑上的简洁,并不是单纯的代码减少(说实话,我们其实更关注这个)。
仔细看下这段代码,之前的if..else呢?之前的那么多循环呢?好像都不见了,完全是从上到下的一条链式调用,而且没有嵌套(你是不是也讨厌好多层的嵌套,反正我是),现在看起来是不是逻辑更加清楚了呢。
此时RxJava的优势还不能完全体现出来,而且看到这么多陌生的函数,你也一定有点不知其解,那么我们就带着疑惑接着往下看。
API
虽然我知道你有很强的理解和学习能力,但是我还是决定要对RxJava的一些常用的API进行介绍和说明,以便你能更顺畅的阅读全文。
1.观察者模式
RxJava的异步实现,是通过一种扩展的观察者模式来实现的。
我们来看下什么是观察者模式?
观察者模式(有时又被称为发布(publish )-订阅(Subscribe)模式、模型-视图(View)模式、源-收听者(Listener)模式或从属者模式)是软件设计模式的一种。在此种模式中,一个目标物件管理所有相依于它的观察者物件,并且在它本身的状态改变时主动发出通知。这通常透过呼叫各观察者所提供的方法来实现。此种模式通常被用来实现事件处理系统。
这是百度给出的解释,我们在日常编码中使用的点击事件的处理就采用了观察者模式。
clkBtn.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
Toast.makeText(MainActivity.this, "The button was clicked", Toast.LENGTH_LONG).show();
}
});
在典型的Click事件处理中,Button就是被观察者,而我们设置的OnClickListener就是观察者,在我们点击Button时,OnClickListener的onClick方法就会被回调。
2. RxJava的观察者模式
2.1 几个对象
我们先来了解下RxJava给我们提供的几个常用的对象。
- FLowable与Observable
在2.0版本中被观察者新的实现叫做Flowable, 同时旧的Observable也保留了。因为在 RxJava1.x 中,有很多事件不被能正确的背压,从而抛出MissingBackpressureException。
举个简单的例子,在 RxJava1.x 中的 observeOn, 因为是切换了消费者的线程,因此内部实现用队列存储事件。在 Android 中默认的 buffersize 大小是16,因此当消费比生产慢时, 队列中的数目积累到超过16个,就会抛出MissingBackpressureException, 初学者很难明白为什么会这样,使得学习曲线异常得陡峭。
而在 2.0 中,Observable 不再支持背压,而Flowable 支持非阻塞式的背压。并且规范要求,所有的操作符强制支持背压。
幸运的是,Flowable 中的操作符大多与旧有的 Observable 类似。
- Observer与Subscriber
Observer就是我们前面提到的观察者,与Observable组合使用。
Subscriber也被成为订阅者,一般与Flowable组合使用。
因为Observable不再支持背压,因此如果我们使用RxJava2.0版本,Flowable可能是你的不二人选。
基于以上的分析,本文以下的示例将采用Flowable进行说明和讲解。
2.2 回调
为什么称RxJava采用了扩展的观察者模式呢?我们知道传统的观察者回调接口中只有一个update方法,那么RxJava呢?它可不止一个,让我们来看下Subscriber的定义。
public interface Subscriber<T> {
/**
* Invoked after calling {@link Publisher#subscribe(Subscriber)}.
* <p>
* No data will start flowing until {@link Subscription#request(long)} is invoked.
* <p>
* It is the responsibility of this {@link Subscriber} instance to call {@link Subscription#request(long)} whenever more data is wanted.
* <p>
* The {@link Publisher} will send notifications only in response to {@link Subscription#request(long)}.
*
* @param s
* {@link Subscription} that allows requesting data via {@link Subscription#request(long)}
*/
public void onSubscribe(Subscription s);
/**
* Data notification sent by the {@link Publisher} in response to requests to {@link Subscription#request(long)}.
*
* @param t the element signaled
*/
public void onNext(T t);
/**
* Failed terminal state.
* <p>
* No further events will be sent even if {@link Subscription#request(long)} is invoked again.
*
* @param t the throwable signaled
*/
public void onError(Throwable t);
/**
* Successful terminal state.
* <p>
* No further events will be sent even if {@link Subscription#request(long)} is invoked again.
*/
public void onComplete();
}
RxJava的观察者接口中提供了onSubscribe、onNext、onError、onComplete四个回调方法,而传统的观察者模式中只有update一个回调方法,这也是称之为扩展的观察者模式的一部分原因。
下面我们来分析下Subscriber接口中几个方法:
-
onSubscribe
这个方法是2.0之后才有的方法,主要是给观察者提供了一个终止事件接收的机会(当然我们也可以做一些预处理),它也会首先被调用。
要终止接收事件,可以调用Subscription的cancel方法。 -
onNext
我们可以将其理解为传统观察者模式回调接口中的update方法,它可能会被调用多次。它的调用顺序在onSubscribe之后。 -
onError
在事件处理过程中出异常时,onError会被触发,同时事件队列自动终止,不会再有事件发出。 -
onComplete
在事件队列传递完毕后,该方法会被调用。
在一个正确运行的事件序列中, onComplete()和onError()有且只有一个,并且是事件序列中的最后一个。
需要注意的是,onComplete() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。
在一个正确的事件序列中,onError与onComplete互斥且唯一。
相比于传统的观察者模式,RxJava使用的扩展观察者模式好像变得复杂了,但是从另一方面来讲它也更加的丰富了,把更多的主动权和机会交给了使用者。
3. 实战
看了那么多的概念,是不是觉得有点枯燥和乏味呢,那我们就开始动手使用RxJava来体验一下吧。
3.1 引用
怎么在我们的项目中使用RxJava和RxAndroid呢?
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
// Because RxAndroid releases are few and far between, it is recommended you also
// explicitly depend on RxJava's latest version for bug fixes and new features.
compile 'io.reactivex.rxjava2:rxjava:2.1.3'
3.2 实例
- 1.0 方式
//定义被观察者
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("Hello");
e.onNext("World");
e.onNext("!");
//注意在此调用onComplete方法结束事件的处理
e.onComplete();
}
});
// 定义观察者
Observer<String> observer = new Observer<String>() {
// 该方法会在onNext方法之前调用
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe->11111");
// d.dispose();
}
@Override
public void onNext(String value) {
System.out.println(value);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete->222222");
}
};
// 订阅
observable.subscribe(observer);
- 2.0方式
//创建Flowable对象
Flowable flowable = Flowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(@NonNull FlowableEmitter e) throws Exception {
e.onNext("Hello");
e.onNext("World");
e.onNext("!");
//注意在此调用onComplete方法结束事件的处理
e.onComplete();
}
}, BackpressureStrategy.BUFFER);
// 定义观察者
Subscriber subsrciber= new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
System.out.println("onSubscribe->11111");
}
@Override
public void onNext(String s) {
System.out.println(s);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete->222222");
}
};
// 订阅
flowable.subscribe(subsrciber);
- 订阅
订阅这句代码看起来好奇怪,主要是subscribe()这个方法有点怪:它看起来是『observalbe 订阅了 observer / subscriber』而不是『observer / subscriber 订阅了 observalbe』,这看起来就像『杂志订阅了读者』一样颠倒了对象关系。这让人读起来有点别扭,不过如果把 API 设计成 observer.subscribe(observable) / subscriber.subscribe(observable) ,虽然更加符合思维逻辑,但对流式 API 的设计就造成影响了,比较起来明显是得不偿失的。
- 运行结果
分别运行上面的两段代码,运行效果相同,如下所示:
onSubscribe->11111
Hello
World
!
onComplete->222222
这可能是最简单的RxJava使用示例了。
3.3 创建被观察者
在上面的示例中,我们采用了Observable.create方法来创建被观察者,并且在subscribe方法中完成了事件的传递。
RxJava 还提供了一些方法用来快捷创建事件队列,我们一起来看一下。
- just(T...)
将传递的参数,依次发送出去。
Flowable.just("Hello", "World", "!")
// 将会依次调用:
// onNext("Hello");
// onNext("World");
// onNext("!");
// onComplete();
这句代码的效果与上面示例中的效果相同。
- from(T[]) / from(Iterable<? extends T>)
将传入的数组或 Iterable 拆分成具体对象后,依次发送出来。
String[] values = new String[]{"Hello", "Wrold", "!"};
Flowable observable = Flowable.fromArray(values);
// 将会依次调用:
// onNext("Hello");
// onNext("World");
// onNext("!");
// onComplete();
上面 just(T...) 的例子和 from(T[]) 的例子,都和之前的 create() 的例子是等价的。
3.4 灵活的事件回调定义
RxJava支持定义不完整的事件回调定义,就是我们可以抛弃Subscriber的定义,而只选择定义其中的一部分回调。
看下代码可能会更明了。
String[] values = new String[]{"Hello", "Wrold", "!"};
Consumer onNext = new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("onNext:" + s);
}
};
Consumer<? super Throwable> onError = new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
throwable.printStackTrace();
}
};
Action onComplete = new Action() {
@Override
public void run() throws Exception {
System.out.println("onComplete");
}
};
// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
Flowable.fromArray(values)
.subscribe(onNext);
// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
Flowable.fromArray(values)
.subscribe(onNext, onError);
// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
Flowable.fromArray(values)
.subscribe(onNext, onError, onComplete);
是不是很灵活?嗯,是的。
3.5 Schedulers
在 RxJava的默认规则中,事件的发出和消费都是在同一个线程的,在哪个线程调用subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。
也就是说,如果只用上面的方法,实现出来的只是一个同步的观察者模式。
观察者模式本身的目的就是『后台处理,前台回调』的异步机制,因此异步对于RxJava 是至关重要的。
而要实现异步,则需要用到 RxJava 的另一个概念: Schedulers(调度器) 。
- API
在RxJava中,Scheduler相当于线程控制器,RxJava通过它来指定每一段代码应该运行在什么样的线程。
RxJava已经内置了一些调度器,主要有以下几个:
- Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程,这是默认的Scheduler。
- Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
- Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
- Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
- AndroidSchedulers.mainThread():它指定的操作将在 Android 主线程运行,属于Android专用的调度器。
有了这几个Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。
-
subscribeOn(): 指定 subscribe() 所发生的线程,即Flowable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。
-
observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。
Flowble.just(1, 2, 3) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println("The receive num is :" + integer); } }
上面这段代码中,由于subscribeOn(Schedulers.io()) 的指定,被创建的事件的内容 1、2、3 将会在 IO线程发出;而由于 observeOn(AndroidScheculers.mainThread()) 的指定,因此 subscriber数字的打印将发生在主线程。
事实上,这种在 subscribe() 之前写上两句subscribeOn(Scheduler.io()) 和observeOn(AndroidSchedulers.mainThread()) 的使用方式非常常见,它适用于多数的 『后台线程取数据,主线程显示』的程序策略。
4.变换
RxJava提供了对事件序列进行变换的支持,这是它的核心功能之一,也是大多数人说『RxJava 真是太好用了』的最大原因。所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。
在开发中我们经常碰到这样的场景:从本地读取并加载图片。也就是说我们通常的入参是一个文件路径,而我们想要得到的是一个BitMap对象,那么如果使用RxJava我们该如何优雅的实现呢?
final String filePath = "/images/logo.png";
Flowble.just(filePath)
.map(new Function<String, Bitmap>() {
@Override
public Bitmap apply(@NonNull String s) throws Exception {
return getBitmapFromFile(new File(filePath));
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Bitmap>() {
@Override
public void accept(Bitmap bitmap) throws Exception {
showBitmap(bitmap);
}
});
就问你优雅不优雅?牛逼不牛逼?
可以看到,map()方法将参数中的String对象转换成一个Bitmap对象后返回,而在经过map()方法后,事件的参数类型也由 String转为了Bitmap。这种直接变换对象并返回的,是最常见的也最容易理解的变换。
那么常用的事件变换有那些呢?
1. map
事件对象的直接变换,具体功能上面已经介绍过,它是RxJava 最常用的变换。
在上面的例子中我们可以看到,map方法将参数中的 String对象变换为一个 Bitmap对象后返回,而在经过 map方法后,事件的参数类型也由String变为了 Bitmap。这种直接变换对象并返回的,是最常见的也最容易理解的变换。
2. flatMap
flatMap和map有共同点,都是将一个对象转换为另一个对象,不同的是map只是一对一的转换,而flatMap可以是一对多的转换,并且是转换为另外一个Flowable对象!
示例如下:
ArrayList<String[]> list = new ArrayList<>();
String[] words1 = {"Hello,", "I am", "China!"};
String[] words2 = {"Hello,", "I am", "Beijing!"};
list.add(words1);
list.add(words2);
Flowable.fromIterable(list)
.flatMap(new Function<String[], Publisher<String>>() {
@Override
public Publisher<String> apply(@NonNull String[] strings) throws Exception {
return Flowable.fromArray(strings);
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("Consumer->accept:"+s);
}
});
运行结果如下所示:
Consumer->accept:Hello,
Consumer->accept:I am
Consumer->accept:China!
Consumer->accept:Hello,
Consumer->accept:I am
Consumer->accept:Beijing!
flatMap的转换可以分解为三个过程:
- 根据传入的事件生成一个Publisher对象(其实也可以理解为Flowable)。
- 激活该Flowable对象发送事件,而不是直接发送该Flowable对象。
- 同一个Flowable对象发送的事件都会汇总到Flowable后,Flowable负责将事件统一传递给subsrciber。
3. lift
我们可以将该方法视为map与flatMap的底层调用实现,其目的就是定义我们自己的Operator来完成变换。
lift方法接收一个FlowableOperator的参数,这个FlowableOperator就是定义我们自己的转换操作。
这样解释起来可能有些不太明了,下面我们举两个简单的例子来看下怎么使用lift实现map和flatMap的效果。
- map的lift写法
Flowable.just(filePath)
.lift(new FlowableOperator<Bitmap, String>() {
@Override
public Subscriber<? super String> apply(@NonNull final Subscriber<? super Bitmap> observer) throws Exception {
return new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
observer.onSubscribe(s);
}
@Override
public void onNext(String s) {
observer.onNext(getBitmapFromFile(new File(s)));
}
@Override
public void onError(Throwable t) {
observer.onError(t);
}
@Override
public void onComplete() {
observer.onComplete();
}
};
}
})
.subscribe(new Consumer<Bitmap>() {
@Override
public void accept(Bitmap bitmap) throws Exception {
showBitmap(bitmap);
}
});
- flatMap的lift写法
Flowable.fromIterable(list)
.lift(new FlowableOperator<String, String[]>() {
@Override
public Subscriber<? super String[]> apply(@NonNull final Subscriber<? super String> observer) throws Exception {
return new Subscriber<String[]>() {
@Override
public void onSubscribe(Subscription s) {
observer.onSubscribe(s);
}
@Override
public void onNext(String[] strings) {
Flowable.fromArray(strings)
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
observer.onNext(s);
}
});
}
@Override
public void onError(Throwable t) {
observer.onError(t);
}
@Override
public void onComplete() {
observer.onComplete();
}
};
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("accept ->"+s);
}
});
4. range
该方法比较简单,用于产生int和long型数字。
Flowable.range(1,5)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
})
输出1 2 3 4 5五个数字。
5. merge
主要用户合并对象,示例如下:
ArrayList<String> list1 = new ArrayList<>();
list1.add("1");
list1.add("2");
list1.add("3");
ArrayList<String> list2 = new ArrayList<>();
list2.add("4");
list2.add("5");
list2.add("6");
Flowable.merge(Flowable.fromIterable(list1), Flowable.fromIterable(list2))
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
输出1 2 3 4 5 6。
6. compose
调解转换的作用,示例如下:
Flowable.merge(Flowable.fromIterable(list1), Flowable.fromIterable(list2))
.compose(new FlowableTransformer<String, Integer>() {
@Override
public Publisher<Integer> apply(@NonNull Flowable<String> upstream) {
return upstream.map(new Function<String, Integer>() {
@Override
public Integer apply(@NonNull String s) throws Exception {
return Integer.parseInt(s);
}
});
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer s) throws Exception {
System.out.println(s);
}
});
输出1 2 3 4 5 6 六个数字。
7. compose与lift的区别
两者都实现了变换的功能,但是变换的内容和对象却不相同。
- lift实现的是对事件和事件序列的变换。
- compose实现的是Flowable本身的变换。
5 总结
至此,我们对RxJava的使用分析告一段落,作为一个牛逼的异步框架,如果能正确的引入到我们的项目中来一定能提高我们效率,降低后期我们的维护成本。
祝各位工作愉快。