RxJava<第十一篇>:变换操作符
(1)map
从发射数据到接收数据之间的数据变换。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "我是变换过后的" + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
以上代码的意思是,发射的数据是Integer类型的, 将Integer类型的数据经过一些处理,最后返回值是String类型。
如果以上的说法不能理解,那么就举个例子:
new Thread(new Runnable() {
@Override
public void run() {
toUp("live");
}
}).start();
private String toUp(String str){
String s = str.toUpperCase();
return s;
}
以上的列子很简单, 那么怎么写才能让代码更加优雅呢?
Observable.just("live").map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return s.toUpperCase();
}
}).subscribeOn(Schedulers.newThread()).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
(2)flatMap
FlatMap操作符使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后FlatMap合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射。
List<Integer> list = Arrays.asList(1, 2, 3);
Observable.fromIterable(list).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(final Integer integer) throws Exception {
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("第" + integer + "个主任务的第1个分支任务");
emitter.onNext("第" + integer + "个主任务的第2个分支任务");
emitter.onComplete();
}
}).subscribeOn(Schedulers.newThread());
return observable;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s+"---thread:"+ Thread.currentThread().getName());
}
});
以上代码将一个任务分成两个任务分别发射。
执行结果如下:
图片.png如果删除subscribeOn(Schedulers.newThread())代码,那么执行结果是:
图片.png显然, 在多线程的情况下,接收数据时,是线程不安全的,如果需要线程安全,那么需要使用ConcatMap。
如果以上不怎么理解,那么就举一个例子吧,请问大家有没有遇到过嵌套网络请求,当post请求成功返回数据时,这时我们需要这些数据发起新的(一个或多个)post请求。
需要注意的是,如果第一个post请求成功返回时,第二个和第三个post请求都需要第一个post请求返回的参数时,那么是否考虑线程安全来决定到底使用flatMap还是ConcatMap。
我们先用一般的代码模拟一下网络请求, 如下:
public interface INR{
void success(int result);
void failed();
}
//模拟网络请求1
private void networkRequest1(final String s1, final INR inr){
new Thread(new Runnable() {
@Override
public void run() {
if("1".equals(s1)){
int result = Integer.parseInt(s1);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
inr.failed();
}
inr.success(result);
} else{
inr.failed();
}
}
}).start();
}
//模拟网络请求2
private void networkRequest2(final String s2, final INR inr){
new Thread(new Runnable() {
@Override
public void run() {
if(s2.equals("2")){
int result = Integer.parseInt(s2);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
inr.failed();
}
inr.success(result);
} else{
inr.failed();
}
}
}).start();
}
//模拟网络请求3
private void networkRequest3(final String s3, final INR inr){
new Thread(new Runnable() {
@Override
public void run() {
if("3".equals(s3)){
int result = Integer.parseInt(s3);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
inr.failed();
}
inr.success(result);
} else{
inr.failed();
}
}
}).start();
}
networkRequest1("1", new INR() {
@Override
public void success(int result) {
String newresult = String.valueOf(result + 1);
networkRequest2(newresult, new INR() {
@Override
public void success(int result) {
String newresult = String.valueOf(result + 1);
networkRequest3(newresult, new INR() {
@Override
public void success(int result) {
System.out.println(String.valueOf(result));
}
@Override
public void failed() {
}
});
}
@Override
public void failed() {
}
});
}
@Override
public void failed() {
}
});
一个完整的网络嵌套请求需要写那么多代码,如果可读性很差,代码不优雅,那么Rxjava到底怎么实现呢?
Observable.just("1").flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String s) throws Exception {
int result = Integer.parseInt(s);
Thread.sleep(3000);
return Observable.just(result).map(new Function<Integer, String>() {
@Override
public String apply(Integer result) throws Exception {
return String.valueOf(result + 1);
}
});
}
}).flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String s) throws Exception {
int result = Integer.parseInt(s);
Thread.sleep(3000);
return Observable.just(result).map(new Function<Integer, String>() {
@Override
public String apply(Integer result) throws Exception {
return String.valueOf(result + 1);
}
});
}
}).subscribeOn(Schedulers.newThread()).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
以上代码就是Rxjava的写法了,是不是很简洁呢?
接下来介绍的ConcatMap就不举例了。
(3)ConcatMap
List<Integer> list = Arrays.asList(1, 2, 3);
Observable.fromIterable(list).concatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(final Integer integer) throws Exception {
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("第" + integer + "个主任务的第1个分支任务");
emitter.onNext("第" + integer + "个主任务的第2个分支任务");
emitter.onComplete();
}
}).subscribeOn(Schedulers.newThread());
return observable;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s+"---thread:"+ Thread.currentThread().getName());
}
});
执行结果是:
图片.png(4)flatMapIterable
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("A");
e.onNext("B");
}
}).flatMapIterable(new Function<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String s) throws Exception {
List<String> list = Arrays.asList("A", "B", "C");
return list;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
(5)switchMap
只发射最近发射的数据,也就是说,如果前一个任务还没完成时就开始了第二个任务,那么前一个任务将被终止。
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("A");
e.onNext("B");
}
}).switchMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String s) throws Exception {
return Observable.just(s, "---:"+s).subscribeOn(Schedulers.newThread());
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
日志效果如下
图片.png(6)scan
sacn操作符是遍历源Observable产生的结果,再按照自定义规则进行运算,依次输出每次计算后的结果给订阅者
Observable.range(2, 10).scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(String.valueOf(integer));
}
});
apply回掉第一个参数是上次的结算结果,第二个参数是当此的源observable的输入值
日志如下:
图片.png(7)groupBy
将1,2,3,4分组
Observable.just(1,2,3,4).groupBy(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
return integer % 2;
}
}).subscribe(new Consumer<GroupedObservable<Integer, Integer>>() {
@Override
public void accept(final GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) throws Exception {
integerIntegerGroupedObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integerIntegerGroupedObservable.getKey() + " "+integer);
}
});
}
});
日志如下:
图片.png(8)buffer
字面上是缓存的意思
假如,现在有"one", "two", "three", "four", "five"这5条数据,将将要发射的数据存放到缓存区,每个缓存区是3条数据:
缓存区1:"one", "two", "three"
缓存区2:"four", "five"
Observable.just("one", "two", "three", "four", "five")//创建了一个有5个数字的被观察者
.buffer(3)
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> strings) throws Exception {
for (String s : strings){
System.out.println(s);
}
System.out.println("---------------------");
}
});
有5个字符串,设置缓存区为3(默认跳3个字符串)
日志效果如下:
图片.png假如,现在有"one", "two", "three", "four", "five"这5条数据,将将要发射的数据存放到缓存区,每个缓存区是3条数据,设置偏移量为1
缓存区1:"one", "two", "three"
缓存区2: "two", "three", "four"
缓存区3: "three", "four", "five"
缓存区4:"four", "five"
缓存区5: "five"
Observable.just("one", "two", "three", "four", "five")//创建了一个有5个数字的被观察者
.buffer(3,1)
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> strings) throws Exception {
for (String s : strings){
System.out.println(s);
}
System.out.println("---------------------");
}
});
打印日志如下:
图片.png(9)window
设置一个window最多3条数据,将这个window封装成Observable,并将Observable发射出去。
Observable.just("one", "two", "three", "four", "five")//创建了一个有5个数字的被观察者
.window(3)
.subscribe(new Consumer<Observable<String>>() {
@Override
public void accept(Observable<String> stringObservable) throws Exception {
System.out.println("-----------------------");
stringObservable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
}
});
执行效果如下
图片.png Observable.just("one", "two", "three", "four", "five")//创建了一个有5个数字的被观察者
.window(3,1)
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Observable<String>>() {
@Override
public void accept(Observable<String> stringObservable) throws Exception {
System.out.println("-----------------------");
stringObservable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
}
});
效果如下:
图片.png需要说明的是:
- window(long count) 被分割成的每个window最大数据值
- window(long count, long skip) count 每个window最大数据值,skip步长
- window与buffer区别:window是把数据分割成了Observable,buffer是把数据分割成List
(10)cast
cast是转换操作符, 从字面上的意思是说可以实现类型的转换
Observable.range(1, 5).cast(Integer.class).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer ss) throws Exception {
System.out.println(ss);
}
});
Observable.range(1, 5).cast(Number.class).subscribe(new Consumer<Number>() {
@Override
public void accept(Number ss) throws Exception {
System.out.println(ss);
}
});
打印效果如下:
图片.png如果改成String类型
Observable.range(1, 5).cast(String.class).subscribe(new Consumer<String>() {
@Override
public void accept(String ss) throws Exception {
System.out.println(ss);
}
});
打印效果如下:
图片.png这是一个比较有疑问的地方。