Rxjava(二)之创建操作符与转换操作符
前言
Rxjava之所以如此受欢迎,与其强大的操作符是息息相关的。它几乎能完成所有的功能需求。下面我们开始介绍常见的操作符。
创建型操作符
常见的创建型操作符有,create、just、fromArray、empty、range。
对于create我们这里就不多做介绍了。
just操作符
Observable.just(1,2,3).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG,"accept: "+integer);
}
});
just操作符可以在内部发射任意数量相同类型的元素给观察者,通过分析源码我们可以知道其实内部调用的是fromArray操作符。
fromArray操作符
String[] items = {"张三","李四","王五"};
Observable.fromArray(items).subscribe(new Consumer<String>() {
@Override
public void accept(String str) throws Exception {
Log.d(TAG,"accept: "+str);
}
});
fromArray操作符内部通过发射一个数集对象给观察者。
empty操作符
Observable.empty().subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG,"onSubscribe");
}
@Override
public void onNext(Object o) {
Log.d(TAG,"onNext: "+o);
}
@Override
public void onError(Throwable e) {
Log.d(TAG,"onError");
}
@Override
public void onComplete() {
Log.d(TAG,"onComplete");
}
});
onSubscribe
onComplete
empty操作符内部自己发射,下游默认是Object,无法发送有值事件,只会发送onComplete。
range操作符
Observable.range(30,5).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG,"accpt: "+integer);
}
});
accpt: 30
accpt: 31
accpt: 32
accpt: 33
accpt: 34
range操作符内部自己发射,从start(参数一)开始,发射count(参数二)个,每次加一。
变换操作符
变换操作符是对被观察者发送的事件进行一定的加工处理(转换)操作,然后再发送给被观察者。常见的变换操作符有map、flatMap、concatMap、groupBy、buffer。
map操作符
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(2);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
Log.d(TAG,"apply: "+integer);
return "["+integer+"}";
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String str) throws Exception {
Log.d(TAG,"accpt: "+str);
}
});
apply: 2
accpt: {2}
可以看到,被观察者发送的是一个int型的数据,但我们通过map操作符,增加了自己的逻辑,然后返回了一个String类型的数据给观察者。
flatMap操作符
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("张三");
e.onNext("李四");
e.onNext("王五");
}
}).flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String str) throws Exception {
List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add(str + " 下标:" + (1 + i));
}
return Observable.fromIterable(list).delay(5, TimeUnit.SECONDS); // 创建型操作符,创建被观察者
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String str) throws Exception {
Log.d(TAG,"accpt: "+str);
}
});
accpt: 张三 下标:1
accpt: 张三 下标:2
accpt: 张三 下标:3
accpt: 李四 下标:1
accpt: 王五 下标:1
accpt: 王五 下标:2
accpt: 王五 下标:3
accpt: 李四 下标:2
accpt: 李四 下标:3
依旧是在被观察者和订阅观察者之间添加flatMap操作符,我们可以看到flatMap操作符需要返回一个ObservableSource对象,而ObservableSource实际上是一个接口,并且Observable实现了这个接口,那么我们可以返回一个Observable对象,我们可以在apply方法中添加自己的操作。
这里我们为什么要发送这么对消息呢,因为我们要证明flatMap操作符是不排序的,由打印日志可以得到证明。
concatMap操作符
用法上和flatMap一样,它们的唯一区别就是concatMap操作符是排序的,至于代码和应用我这里就不赘述了。
groupBy操作符
Observable.just(1, 2, 100, 200, 3000, 4000).groupBy(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
if (integer < 10) {
return "small";
} else if (integer >= 10 && integer < 1000) {
return "medium";
} else {
return "big";
}
}
}).subscribe(new Consumer<GroupedObservable<String, Integer>>() {
@Override
public void accept(final GroupedObservable<String, Integer> groupedObservable) throws Exception {
groupedObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: " + groupedObservable.getKey()+"---"+integer);
}
});
}
});
accept: small---1
accept: small---2
accept: medium---100
accept: medium---200
accept: big---3000
accept: big---4000
groupBy操作符是对被观察者发送的数据进行分组,然后将分组后的数据传递给观察者。这里需要注意的是,在观察者中得到的是GroupedObservable对象,若想得到原本被观察者的值,则需要再一次进行封装。
buffer操作符
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for(int i = 0;i<100;i++){
e.onNext(i);
}
}
}).buffer(20).subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
Log.d(TAG,"accept: "+ integers);
}
});
accept: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
accept: [20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
accept: [40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59]
accept: [60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79]
accept: [80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
buffer操作符是对被观察者发送的数据进行分组,然后将分组完成后的数据封装成一个List发送给观察者。