Rxjava讲解
前言
提升开发效率,降低维护成本一直是开发团队永恒不变的宗旨。近一年来国内的技术圈子中越来越多的开始提及Rx,经过一段时间的学习和探索之后我也深深的感受到了RxJava的魅力。它能帮助我们简化代码逻辑,提升代码可读性。这对于开发效率的提升、后期维护成本的降低帮助都是巨大的。个人预测RxJava一定是2016年的一个大趋势,所以也有打算将它引入到公司现有的项目中来,写这一系列的文章主要也是为了团队内部做技术分享。
由于我本人是个Android程序猿,因此这一系列文章中的场景都是基于Android平台的。如果你是个Java Web工程师或者是其它方向的那也没关系,我会尽量用通俗的语言将问题描述清楚。
响应式编程
在介绍RxJava前,我们先聊聊响应式编程。那么什么是响应式编程呢?响应式编程是一种基于异步数据流概念的编程模式。数据流就像一条河:它可以被观测,被过滤,被操作,或者为新的消费者与另外一条流合并为一条新的流。
响应式编程的一个关键概念是事件。事件可以被等待,可以触发过程,也可以触发其它事件。事件是唯一的以合适的方式将我们的现实世界映射到我们的软件中:如果屋里太热了我们就打开一扇窗户。同样的,当我们的天气app从服务端获取到新的天气数据后,我们需要更新app上展示天气信息的UI;汽车上的车道偏移系统探测到车辆偏移了正常路线就会提醒驾驶者纠正,就是是响应事件。
今天,响应式编程最通用的一个场景是UI:我们的移动App必须做出对网络调用、用户触摸输入和系统弹框的响应。在这个世界上,软件之所以是事件驱动并响应的是因为现实生活也是如此。
本章节中部分概念摘自《RxJava Essentials》一书
RxJava的来历
Rx是微软.Net的一个响应式扩展,Rx借助可观测的序列提供一种简单的方式来创建异步的,基于事件驱动的程序。2012年Netflix为了应对不断增长的业务需求开始将.NET Rx迁移到JVM上面。并于13年二月份正式向外展示了RxJava。
从语义的角度来看,RxJava就是.NET Rx。从语法的角度来看,Netflix考虑到了对应每个Rx方法,保留了Java代码规范和基本的模式。

什么是RxJava
那么到底什么是RxJava呢?我对它的定义是:RxJava本质上是一个异步操作库,是一个能让你用极其简洁的逻辑去处理繁琐复杂任务的异步事件库。
RxJava好在哪
Android平台上为已经开发者提供了AsyncTask,Handler等用来做异步操作的类库,那我们为什么还要选择RxJava呢?答案是简洁!RxJava可以用非常简洁的代码逻辑来解决复杂问题;而且即使业务逻辑的越来越复杂,它依然能够保持简洁!再配合上Lambda用简单的几行代码分分钟就解决你负责的业务问题。简直逼格爆表,拿它装逼那是极好的!
多说无益,上代码!
假设我们安居客用户App上有个需求,需要从服务端拉取上海浦东新区塘桥板块的所有小区Community[] communities,每个小区下包含多套房源List<House> houses;我们需要把塘桥板块的所有总价大于500W的房源都展示在App的房源列表页。用于从服务端拉取communities需要发起网络请求,比较耗时,因此需要在后台运行。而这些房源信息需要展示到App的页面上,因此需要在UI线程上执行。(此例子思路来源于扔物线的给Android开发者的RxJava详解一文)
new Thread() {
@Override
public void run() {
super.run();
//从服务端获取小区列表
List<Community> communities = getCommunitiesFromServer();
for (Community community : communities) {
List<House> houses = community.houses;
for (House house : houses) {
if (house.price >= 5000000) {
runOnUiThread(new Runnable() {
@Override
public void run() {
//将房子的信息添加到屏幕上
addHouseInformationToScreen(house);
}
});
}
}
}
}
}.start();
使用RxJava的写法是这样的:
Observable.from(getCommunitiesFromServer())
.flatMap(new Func1<Community, Observable<House>>() {
@Override
public Observable<House> call(Community community) {
return Observable.from(community.houses);
}
}).filter(new Func1<House, Boolean>() {
@Override
public Boolean call(House house) {
return house.price>=5000000;
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<House>() {
@Override
public void call(House house) {
//将房子的信息添加到屏幕上
addHouseInformationToScreen(house);
}
});
从上面这段代码我们可以看到:虽然代码量看起来变复杂了,但是RxJava的实现是一条链式调用,没有任何的嵌套;整个实现逻辑看起来异常简洁清晰,这对我们的编程实现和后期维护是有巨大帮助的。特别是对于那些回调嵌套的场景。配合Lambda表达式还可以简化成这样:
Observable.from(getCommunitiesFromServer())
.flatMap(community -> Observable.from(community.houses))
.filter(house -> house.price>=5000000).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::addHouseInformationToScreen);
简洁!有美感!这才是一个有情怀的程序员应该写出来的代码。
Rxjava使用介绍
观察者模式
观察者模式基于Subject这个概念,Subject是一种特殊对象,又叫做主题或者被观察者。当它改变时那些由它保存的一系列对象将会得到通知,而这一系列对象被称作Observer(观察者)。它们会对外暴漏了一个通知方法(比方说update之类的),当Subject状态发生变化时会调用的这个方法。
观察者模式很适合下面这些场景中的任何一个:
- 当你的架构有两个实体类,一个依赖另一个,你想让它们互不影响或者是独立复用它们时。
- 当一个变化的对象通知那些与它自身变化相关联的未知数量的对象时。
- 当一个变化的对象通知那些无需推断具体类型的对象时。
通常一个观察者模式的类图是这样的:
[图片上传中...(image-1f698a-1551843230888-0)]
如果你对观察者模式bu很了解,那么强烈建议你先去学习下。关于观察者模式的详细介绍可以参考我之前的文章:设计模式之观察者模式
扩展的观察者模式
在RxJava中主要有4个角色:
- Observable
- Subject
- Observer
- Subscriber
Observable和Subject是两个“生产”实体,Observer和Subscriber是两个“消费”实体。说直白点Observable对应于观察者模式中的被观察者,而Observer和Subscriber对应于观察者模式中的观察者。Subscriber其实是一个实现了Observer的抽象类,观察者Observer和被观察者Observable通过subscribe()方法实现订阅关系。从而Observable 可以在需要的时候发出事件来通知Observer。
RxJava如何使用
我自己在学习一种新技术的时候通常喜欢先去了解它是怎么用的,掌握了使用方法后再去深挖其原理。那么我们现在就来说说RxJava到底该怎么用。
**第一步:创建观察者Observer
Observer<Object> observer = new Observer<Object>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object s) {
}
};
一个观察者Observer创建了!
在普通的观察者模式中观察者一般只会提供一个update()方法用于被观察者的状态发生变化时,用于提供给被观察者调用。而在RxJava中的观察者Observer提供了:onNext()、 onCompleted()和onError()三个方法。还记得吗?开篇我们讲过RxJava是基于一种扩展的观察这模式实现,这里多出的onCompleted和onError正是对观察者模式的扩展。ps:onNext就相当于普通观察者模式中的update
RxJava中添加了普通观察者模式缺失的三个功能:
- RxJava中规定当不再有新的事件发出时,可以调用onCompleted()方法作为标示;
- 当事件处理出现异常时框架自动触发onError()方法;
- 同时Observables支持链式调用,从而避免了回调嵌套的问题。
第二步:创建被观察者Observable
Observable.create()方法可以创建一个Observable,使用crate()创建Observable需要一个OnSubscribe对象,这个对象继承Action1。当观察者订阅我们的Observable时,它作为一个参数传入并执行call()函数。
Observable<Object> observable = Observable.create(new Observable.OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> subscriber) {
}
});
除了create(),just()和from()同样可以创建Observable。看看下面两个例子:
just(T...)将传入的参数依次发送
Observable observable = Observable.just("One", "Two", "Three");
//上面这行代码会依次调用
//onNext("One");
//onNext("Two");
//onNext("Three");
//onCompleted();
from(T[])/from(Iterable<? extends T>)将传入的数组或者Iterable拆分成Java对象依次发送
String[] parameters = {"One", "Two", "Three"};
Observable observable = Observable.from(parameters);
//上面这行代码会依次调用
//onNext("One");
//onNext("Two");
//onNext("Three");
//onCompleted();
第三步:被观察者Observable订阅观察者Observer(ps:你没看错,不同于普通的观察者模式,这里是被观察者订阅观察者)
有了观察者和被观察者,我们就可以通过subscribe()l,就像这来实现二者的订阅关系了。
observable.subscribe(observer);

连在一起写就是这样:
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
subscriber.onNext(i);
}
subscriber.onCompleted();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onNext(Integer item) {
System.out.println("Item is " + item);
}
});
至此一个完整的RxJava调用就完成了。
上面的代码在使用Observable.create()创建了一个新的Observable<Integer>,并为create()方法传入了一个OnSubscribe,OnSubscribe中包含一个call()方法,一旦我们调用subscribe()订阅后就会自动触发call()方法。call()方法中的参数Subscriber其实就是subscribe()方法中的观察者Observer。我们在call()方法中调用了5次onNext()和1次
onCompleted()方法。一套流程周下来以后输出结果就是下面这样的:
Item is 0
Item is 1
Item is 2
Item is 3
Item is 4
onCompleted
注释:Subscriber和Observer的继承关系,Subscriber是Observer子类,从源码看Subscriber如下:
public abstract class Subscriber<T> implements Observer<T>, Subscription {
...
}
从源码中我们可以看到,Subscriber是Observer的一个抽象实现类,所以我首先可以肯定的是Subscriber和Observer类型是一致的接着往下我们看看subscribe()这个方法:
public final Subscription subscribe(final Observer<? super T> observer) {
//这里的if判断对于我们要分享的问题没有关联,可以先无视
if (observer instanceof Subscriber) {
return subscribe((Subscriber<? super T>)observer);
}
return subscribe(new Subscriber<T>() {
@Override
public void onCompleted() {
observer.onCompleted();
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
@Override
public void onNext(T t) {
observer.onNext(t);
}
});
}
我们看到subscribe()方法内部首先将传进来的Observer做了一层代理,将它转换成了Subscriber。我们再看看这个方法内部的subscribe()方法:
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
进一步往下追踪看看return后面这段代码到底做了什么。精简掉其他无关代码后的subscribe(subscriber, this)方法是这样的:
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
subscriber.onStart();
try {
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
return Subscriptions.unsubscribed();
}
}
我们重点看看hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber),前面这个hook.onSubscribeStart(observable, observable.onSubscribe)返回的是它自己括号内的第二个参数observable.onSubscribe,然后调用了它的call方法。而这个observable.onSubscribe正是create()方法中的Subscriber,这样整个流程就理顺了。看到这里是不是对RxJava的执行流程清晰了一点呢?这里也建议大家在学习新技术的时候多去翻一翻源码,知其然还要能知其所以然不是吗。
subscribe()的参数除了可以是Observer和Subscriber以外还可以是Action1、Action0;这是一种更简单的回调,只有一个call(T)方法;由于太简单这里就不做详细介绍了!
异步
RxJava处理异步任务。默认情况下我们在哪个线程调用subscribe()就在哪个线程生产事件,在哪个线程生产事件就在哪个线程消费事件。那怎么做到异步呢?RxJava为我们提供Scheduler用来做线程调度,我们来看看RxJava提供了哪些Scheduler。

同时RxJava还为我们提供了subscribeOn()和observeOn()两个方法来指定Observable和Observer运行的线程。
Observable.from(getCommunitiesFromServer())
.flatMap(community -> Observable.from(community.houses))
.filter(house -> house.price>=5000000)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::addHouseInformationToScreen);
subscribeOn(Schedulers.io())指定了事件源处理信息等一系列事件都是在IO线程中运行,observeOn(AndroidSchedulers.mainThread())指定了在屏幕上展示事件的操作在UI线程执行。这就做到了在子线程助理事件,主线程展示数据。
转换操作符
所谓转换操作符都作用于一个可观测序列,然后变换它发射的值,最后用一种新的形式返回它们。概念实在是不好理解,下面我们结合实际的例子一一介绍。
- Map
map()函数接受一个Func1类型的参数(就像这样map(Func1<? super T, ? extends R> func)),然后把这个Func1应用到每一个由Observable发射的值上,将发射的值转换为我们期望的值。这种狗屁定义我相信你也听不懂,我们来看一下官方给出的原理图:

假设我们需要将一组数字转换成字符串,我们可以通过map这样实现:
Observable.just(1, 2, 3, 4, 5)
.map(new Func1<Integer, String>() {
@Override
public String call(Integer i) {
return "This is " + i;
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
Func1构造函数中的两个参数分别是Observable发射值当前的类型和map转换后的类型,上面这个例子中发射值当前的类型是Integer,转换后的类型是String。
- flatMap
flatMap()函数同样也是做转换的,但是作用却不一样。flatMap不太好理解,我们直接看例子(我们公司是个房产平台,那我就拿房子举例):假设我们有一组小区Community[] communites,现在我们要输出每个小区的名字;我们可以这样实现:
Observable.from(communities)
.map(new Func1<Community, String>() {
@Override
public String call(Community community) {
return community.name;
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String name) {
System.out.println("Community name : " + name);
}
});
现在我们需求有变化,需要打印出每个小区下面所有房子的价格。于是我可以这样实现:
Community[] communities = {};
Observable.from(communities)
.subscribe(new Action1<Community>() {
@Override
public void call(Community community) {
for (House house : community.houses) {
System.out.println("House price : " + house.price);
}
}
});
如果我不想在Subscriber中使用for循环,而是希望Subscriber中直接传入单个的House对象呢(这对于代码复用很重要)?用map()显然是不行的,因为map()是一对一的转化,而我现在的要求是一对多的转化。那么我们可以使用flatMap()把一个Community转化成多个House。
Observable.from(communities)
.flatMap(new Func1<Community, Observable<House>>() {
@Override
public Observable<House> call(Community community) {
return Observable.from(community.houses);
}
})
.subscribe(new Action1<House>() {
@Override
public void call(House house) {
System.out.println("House price : " + house.price);
}
});
从前面的例子中你肯定发现了,flatMap()和map()都是把传入的参数转化之后返回另一个对象。但和map()不同的是,flatMap()中返回的是Observable对象,并且这个Observable对象并不是被直接发送到 Subscriber的回调方法中。
flatMap()的原理是这样的:
- 将传入的事件对象装换成一个Observable对象;
- 这是不会直接发送这个Observable, 而是将这个Observable激活让它自己开始发送事件;
- 每一个创建出来的Observable发送的事件,都被汇入同一个Observable,这个Observable负责将这些事件统一交给Subscriber的回调方法。
这三个步骤,把事件拆成了两级,通过一组新创建的Observable将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是flatMap()所谓的flat。
最后我们来看看flatMap的原理图:

- ConcatMap
concatMap()解决了flatMap()的交叉问题,它能够把发射的值连续在一起,就像这样:

- FlatMapIterable
flatMapIterable()和flatMap()几乎是一样的,不同的是flatMapIterable()它转化的多个Observable是使用Iterable作为源数据的。
Observable.from(communities)
.flatMapIterable(new Func1<Community, Iterable<House>>() {
@Override
public Iterable<House> call(Community community) {
return community.houses;
}
})
.subscribe(new Action1<House>() {
@Override
public void call(House house) {
}
});
- SwitchMap
switchMap()和flatMap()很像,除了一点:每当源Observable发射一个新的数据项(Observable)时,它将取消订阅并停止监视之前那个数据项产生的Observable,并开始监视当前发射的这一个。

- Scan
scan()对一个序列的数据应用一个函数,并将这个函数的结果发射出去作为下个数据应用合格函数时的第一个参数使用。

我们来看个简单的例子:
Observable.just(1, 2, 3, 4, 5)
.scan(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return integer + integer2;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.print(integer+“ ”);
}
});
输出结果为:1 3 6 10 15
- GroupBy
groupBy()将原始Observable发射的数据按照key来拆分成一些小的Observable,然后这些小Observable分别发射其所包含的的数据,和SQL中的groupBy类似。实际使用中,我们需要提供一个生成key的规则(也就是Func1中的call方法),所有key相同的数据会包含在同一个小的Observable中。另外我们还可以提供一个函数来对这些数据进行转化,有点类似于集成了flatMap。

单纯的文字描述和图片解释可能难以理解,我们来看个例子:假设我现在有一组房源List<House> houses,每套房子都属于某一个小区,现在我们需要根据小区名来对房源进行分类,然后依次将房源信息输出。
List<House> houses = new ArrayList<>();
houses.add(new House("中粮·海景壹号", "中粮海景壹号新出大平层!总价4500W起"));
houses.add(new House("竹园新村", "满五唯一,黄金地段"));
houses.add(new House("中粮·海景壹号", "毗邻汤臣一品"));
houses.add(new House("竹园新村", "顶层户型,两室一厅"));
houses.add(new House("中粮·海景壹号", "南北通透,豪华五房"));
Observable<GroupedObservable<String, House>> groupByCommunityNameObservable = Observable.from(houses)
.groupBy(new Func1<House, String>() {
@Override
public String call(House house) {
return house.communityName;
}
});
通过上面的代码我们创建了一个新的Observable:groupByCommunityNameObservable,它将会发送一个带有GroupedObservable的序列(也就是指发送的数据项的类型为GroupedObservable)。GroupedObservable是一个特殊的Observable,它基于一个分组的key,在这个例子中的key就是小区名。现在我们需要将分类后的房源依次输出:
Observable.concat(groupByCommunityNameObservable)
.subscribe(new Action1<House>() {
@Override
public void call(House house) {
System.out.println("小区:"+house.communityName+"; 房源描述:"+house.desc);
}
});
执行结果:
小区:中粮·海景壹号; 房源描述:中粮海景壹号新出大平层!总价4500W起
小区:中粮·海景壹号; 房源描述:毗邻汤臣一品
小区:中粮·海景壹号; 房源描述:南北通透,豪华五房
小区:竹园新村; 房源描述:满五唯一,黄金地段
小区:竹园新村; 房源描述:顶层户型,两室一厅
过滤操作符
过滤操作符顾名思义是把这类operators主要用于对事件数据的筛选过滤,只返回满足我们条件的数据。过滤类操作符主要包含: Filter, Take, TakeLast, TakeUntilSkip, SkipLast, ElementAt, Debounce, Distinct, DistinctUntilChanged, First, Last等等。
- Filter
filter(Func1)用来过滤观测序列中我们不想要的值,只返回满足条件的值,我们看下原理图:

还是拿前面文章中的小区Community[] communities来举例,假设我需要赛选出所有房源数大于10个的小区,我们可以这样实现:
Observable.from(communities)
.filter(new Func1<Community, Boolean>() {
@Override
public Boolean call(Community community) {
return community.houses.size()>10;
}
}).subscribe(new Action1<Community>() {
@Override
public void call(Community community) {
System.out.println(community.name);
}
});
-
Take
take(int)用一个整数n作为一个参数,从原始的序列中发射前n个元素.

现在我们需要取小区列表communities中的前10个小区
Observable.from(communities)
.take(10)
.subscribe(new Action1<Community>() {
@Override
public void call(Community community) {
System.out.println(community.name);
}
});
-
TakeLast
takeLast(int)同样用一个整数n作为参数,只不过它发射的是观测序列中后n个元素。

获取小区列表communities中的后3个小区
Observable.from(communities)
.takeLast(3)
.subscribe(new Action1<Community>() {
@Override
public void call(Community community) {
System.out.println(community.name);
}
});
-
TakeUntil
takeUntil(Observable)订阅并开始发射原始Observable,同时监视我们提供的第二个Observable。如果第二个Observable发射了一项数据或者发射了一个终止通知,takeUntil()返回的Observable会停止发射原始Observable并终止。

Observable<Long> observableA = Observable.interval(300, TimeUnit.MILLISECONDS);
Observable<Long> observableB = Observable.interval(800, TimeUnit.MILLISECONDS);
observableA.takeUntil(observableB)
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.exit(0);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Long aLong) {
System.out.println(aLong);
}
});
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
程序输出:
0
1
takeUntil(Func1)通过Func1中的call方法来判断是否需要终止发射数据。

Observable.just(1, 2, 3, 4, 5, 6, 7)
.takeUntil(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer >= 5;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer);
}
});
程序输出:
1
2
3
4
5
-
Skip
skip(int)让我们可以忽略Observable发射的前n项数据。

过滤掉小区列表communities中的前5个小区
Observable.from(communities)
.skip(5)
.subscribe(new Action1<Community>() {
@Override
public void call(Community community) {
System.out.println(community.name);
}
});
-
SkipLast
skipLast(int)忽略Observable发射的后n项数据。

- ElementAt
elementAt(int)用来获取元素Observable发射的事件序列中的第n项数据,并当做唯一的数据发射出去。

- Debounce
debounce(long, TimeUnit)过滤掉了由Observable发射的速率过快的数据;如果在一个指定的时间间隔过去了仍旧没有发射一个,那么它将发射最后的那个。通常我们用来结合RxBinding(Jake Wharton大神使用RxJava封装的Android UI组件)使用,防止button重复点击。

debounce(Func1)可以根据Func1的call方法中的函数来过滤,Func1中的中的call方法返回了一个临时的Observable,如果原始的Observable在发射一个新的数据时,上一个数据根据Func1的call方法生成的临时Observable还没结束,那么上一个数据就会被过滤掉。

-
Distinct
distinct()的过滤规则是只允许还没有发射过的数据通过,所有重复的数据项都只会发射一次。

过滤掉一段数字中的重复项:
Observable.just(2, 1, 2, 2, 3, 4, 3, 4, 5, 5)
.distinct()
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
System.out.print(i + " ");
}
});
程序输出:
2 1 3 4 5
distinct(Func1)参数中的Func1中的call方法会根据Observable发射的值生成一个Key,然后比较这个key来判断两个数据是不是相同;如果判定为重复则会和distinct()一样过滤掉重复的数据项。

假设我们要过滤掉一堆房源中小区名重复的小区:
List<House> houses = new ArrayList<>();
//House构造函数中的第一个参数为该房源所属小区名,第二个参数为房源描述
List<House> houses = new ArrayList<>();
houses.add(new House("中粮·海景壹号", "中粮海景壹号新出大平层!总价4500W起"));
houses.add(new House("竹园新村", "满五唯一,黄金地段"));
houses.add(new House("竹园新村", "一楼自带小花园"));
houses.add(new House("中粮·海景壹号", "毗邻汤臣一品"));
houses.add(new House("中粮·海景壹号", "顶级住宅,给您总统般尊贵体验"));
houses.add(new House("竹园新村", "顶层户型,两室一厅"));
houses.add(new House("中粮·海景壹号", "南北通透,豪华五房"));
Observable.from(houses)
.distinct(new Func1<House, String>() {
@Override
public String call(House house) {
return house.communityName;
}
}).subscribe(new Action1<House>() {
@Override
public void call(House house) {
System.out.println("小区:" + house.communityName + "; 房源描述:" + house.desc);
}
});
程序输出:
小区:中粮·海景壹号; 房源描述:中粮海景壹号新出大平层!总价4500W起
小区:竹园新村; 房源描述:满五唯一,黄金地段
-
DistinctUntilChanged
distinctUntilChanged()和distinct()类似,只不过它判定的是Observable发射的当前数据项和前一个数据项是否相同。

同样还是上面过滤数字的例子:
Observable.just(2, 1, 2, 2, 3, 4, 3, 4, 5, 5)
.distinctUntilChanged()
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
System.out.print(i + " ");
}
});
程序输出:
2 1 2 3 4 3 4 5
distinctUntilChanged(Func1)和distinct(Func1)一样,根据Func1中call方法产生一个Key来判断两个相邻的数据项是否相同。

我们还是拿前面的过滤房源的例子:
Observable.from(houses)
.distinctUntilChanged(new Func1<House, String>() {
@Override
public String call(House house) {
return house.communityName;
}
}).subscribe(new Action1<House>() {
@Override
public void call(House house) {
System.out.println("小区:" + house.communityName + "; 房源描述:" + house.desc);
}
});
程序输出:
小区:中粮·海景壹号; 房源描述:中粮海景壹号新出大平层!总价4500W起
小区:竹园新村; 房源描述:满五唯一,黄金地段
小区:中粮·海景壹号; 房源描述:毗邻汤臣一品
小区:竹园新村; 房源描述:顶层户型,两室一厅
小区:中粮·海景壹号; 房源描述:南北通透,豪华五房
-
First
first()顾名思义,它是的Observable只发送观测序列中的第一个数据项。
[图片上传中...(image-2d139e-1551846576196-3)]
获取房源列表houses中的第一套房源:
Observable.from(houses)
.first()
.subscribe(new Action1<House>() {
@Override
public void call(House house) {
System.out.println("小区:" + house.communityName + "; 房源描述:" + house.desc);
}
});
程序输出:
小区:中粮·海景壹号; 房源描述:中粮海景壹号新出大平层!总价4500W起
first(Func1)只发送符合条件的第一个数据项。

现在我们要获取房源列表houses中小区名为竹园新村的第一套房源。
Observable.from(houses)
.first(new Func1<House, Boolean>() {
@Override
public Boolean call(House house) {
return "竹园新村".equals(house.communityName);
}
})
.subscribe(new Action1<House>() {
@Override
public void call(House house) {
System.out.println("小区:" + house.communityName + "; 房源描述:" + house.desc);
}
});
程序输出:
小区:竹园新村; 房源描述:满五唯一,黄金地段
- Last
last()只发射观测序列中的最后一个数据项。
[图片上传中...(image-ae7e9b-1551846576195-1)]
获取房源列表中的最后一套房源:
Observable.from(houses)
.last()
.subscribe(new Action1<House>() {
@Override
public void call(House house) {
System.out.println("小区:" + house.communityName + "; 房源描述:" + house.desc);
}
});
程序输出:
小区:中粮·海景壹号; 房源描述:南北通透,豪华五房
last(Func1)只发射观测序列中符合条件的最后一个数据项。

获取房源列表houses中小区名为竹园新村的最后一套房源:
Observable.from(houses)
.last(new Func1<House, Boolean>() {
@Override
public Boolean call(House house) {
return "竹园新村".equals(house.communityName);
}
})
.subscribe(new Action1<House>() {
@Override
public void call(House house) {
System.out.println("小区:" + house.communityName + "; 房源描述:" + house.desc);
}
});
程序输出:
小区:竹园新村; 房源描述:顶层户型,两室一厅