RxJava入门(5):功能性操作符
2018-02-23 本文已影响18人
tmyzh
新年快乐,开工大吉
filter 过滤范围外的数据
Observable.just(1,2,3,4,5)
.filter(new Predicate() {
@Override
public boolean test(Integer integer) throws Exception {
Log.e("yzh","filter");
return integer>3;
}
}).subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.e("yzh","onNext--"+integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
打印结果
02-07 15:35:28.549 22207-22207/com.example.issuser.rxtest E/yzh: filter
02-07 15:35:28.549 22207-22207/com.example.issuser.rxtest E/yzh: filter
02-07 15:35:28.549 22207-22207/com.example.issuser.rxtest E/yzh: filter
02-07 15:35:28.549 22207-22207/com.example.issuser.rxtest E/yzh: filter
02-07 15:35:28.549 22207-22207/com.example.issuser.rxtest E/yzh: onNext--4
02-07 15:35:28.549 22207-22207/com.example.issuser.rxtest E/yzh: filter
02-07 15:35:28.549 22207-22207/com.example.issuser.rxtest E/yzh: onNext--5
ofType 过滤掉某些类型的数据
Observable.just("1",2,"abv",5,"qqq")
.ofType(Integer.class)
.subscribe(new Consumer() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("yzh","accept--"+integer);
}
}) ;
打印结果
02-07 15:40:03.320 22735-22735/com.example.issuser.rxtest E/yzh: accept--2
02-07 15:40:03.320 22735-22735/com.example.issuser.rxtest E/yzh: accept--5
skip去掉第一个数据 skiplast去掉最后一个数据
Observable.intervalRange(0,5,1,1, TimeUnit.SECONDS)
.skip(1)
.skipLast(1)
.subscribe(new Consumer() {
@Override
public void accept(Long integer)throws Exception {
Log.e("yzh","accept--"+integer);
}
});
打印结果
02-07 15:59:26.198 24724-24765/com.example.issuser.rxtest E/yzh: accept--1
02-07 15:59:27.198 24724-24765/com.example.issuser.rxtest E/yzh: accept--2
02-07 15:59:28.198 24724-24765/com.example.issuser.rxtest E/yzh: accept--3
distinct去掉重复数据
Observable.just(1,2,3,2,1)
.distinct()
.subscribe(new Consumer() {
@Override
public void accept(Integer integer)throws Exception {
Log.e("yzh","accept--"+integer);
}
});
打印结果
accept--1
accept--2
accept--3
delay 使得被观察者延迟一段时间发送
Observable.just(1,2,3)
.delay(3, TimeUnit.SECONDS)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("yzh","onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e("yzh","onNext--"+integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
打印结果
02-23 15:21:24.146 9436-9436/com.example.issuser.rxtest E/yzh: onSubscribe
02-23 15:21:27.148 9436-9478/com.example.issuser.rxtest E/yzh: onNext--1
02-23 15:21:27.148 9436-9478/com.example.issuser.rxtest E/yzh: onNext--2
02-23 15:21:27.148 9436-9478/com.example.issuser.rxtest E/yzh: onNext--3
do 在某个事件的生命周期中调用
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new Throwable("发生错误了"));
}
})
//当Observable每发送1次数据事件就会调用1次
.doOnEach(new Consumer<Notification<Integer>>() {
@Override
public void accept(Notification<Integer> integerNotification) throws Exception {
Log.e("yzh","doOnEach---"+integerNotification.getValue());
}
})
//执行Next事件前调用
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("yzh","doOnNext--"+integer);
}
})
//执行Next事件后调用
.doAfterNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("yzh","doAfterNext--"+integer);
}
})
//observable发送事件完毕后调用
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
Log.e("yzh","doOnComplete");
}
})
//observable发送错误事件时调用
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e("yzh","doOnError--"+throwable.getMessage());
}
})
//观察者订阅时调用
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.e("yzh","donOnSubscribe");
}
})
//observable发送事件完毕后调用 无论正常发送完毕/异常终止
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
Log.e("yzh","doAfterTerminate");
}
})
//最后执行
.doFinally(new Action() {
@Override
public void run() throws Exception {
Log.e("yzh","doFinally");
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.e("yzh","onNext--"+integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
打印结果
02-23 11:27:09.946 31016-31016/com.example.issuser.rxtest E/yzh: donOnSubscribe
02-23 11:27:09.947 31016-31016/com.example.issuser.rxtest E/yzh: doOnEach---1
02-23 11:27:09.947 31016-31016/com.example.issuser.rxtest E/yzh: doOnNext--1
02-23 11:27:09.947 31016-31016/com.example.issuser.rxtest E/yzh: onNext--1
02-23 11:27:09.947 31016-31016/com.example.issuser.rxtest E/yzh: doAfterNext--1
02-23 11:27:09.947 31016-31016/com.example.issuser.rxtest E/yzh: doOnEach---2
02-23 11:27:09.947 31016-31016/com.example.issuser.rxtest E/yzh: doOnNext--2
02-23 11:27:09.947 31016-31016/com.example.issuser.rxtest E/yzh: onNext--2
02-23 11:27:09.947 31016-31016/com.example.issuser.rxtest E/yzh: doAfterNext--2
02-23 11:27:09.947 31016-31016/com.example.issuser.rxtest E/yzh: doOnEach---3
02-23 11:27:09.947 31016-31016/com.example.issuser.rxtest E/yzh: doOnNext--3
02-23 11:27:09.947 31016-31016/com.example.issuser.rxtest E/yzh: onNext--3
02-23 11:27:09.947 31016-31016/com.example.issuser.rxtest E/yzh: doAfterNext--3
02-23 11:27:09.947 31016-31016/com.example.issuser.rxtest E/yzh: doOnEach---null
02-23 11:27:09.947 31016-31016/com.example.issuser.rxtest E/yzh: doOnError--发生错误了
02-23 11:27:09.947 31016-31016/com.example.issuser.rxtest E/yzh: doFinally
02-23 11:27:09.947 31016-31016/com.example.issuser.rxtest E/yzh: doAfterTerminate
onErrorReturn 遇到错误时,发送1个特殊事件 ,然后 正常终止
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Throwable("发生错误了"));
}
})
//遇到错误时,发送一个特殊事件&正常终止
.onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) throws Exception {
Log.e("yzh","onErrorReturn--"+throwable.getMessage());
return 123;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.e("yzh","onNext--"+integer);
}
@Override
public void onError(Throwable e) {
Log.e("yzh","onError--"+e.toString());
}
@Override
public void onComplete() {
Log.e("yzh","onComplete");
}
});
打印结果
02-23 11:46:07.620 2761-2761/com.example.issuser.rxtest E/yzh: onNext--1
02-23 11:46:07.620 2761-2761/com.example.issuser.rxtest E/yzh: onNext--2
02-23 11:46:07.620 2761-2761/com.example.issuser.rxtest E/yzh: onErrorReturn--发生错误了
02-23 11:46:07.620 2761-2761/com.example.issuser.rxtest E/yzh: onNext--123
02-23 11:46:07.620 2761-2761/com.example.issuser.rxtest E/yzh: onComplete
onErrorResumeNext 发送一个新的observable
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Exception("发生错误了"));
}
})
//遇到错误时,发送1个新的Observable 不分Throwable 和Exception
.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {
Log.e("yzh","onErrorResumeNext--"+throwable.getMessage());
return Observable.just(3,4);
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.e("yzh","onNext--"+integer);
}
@Override
public void onError(Throwable e) {
Log.e("yzh","onError--"+e.getMessage());
}
@Override
public void onComplete() {
}
});
打印结果
注意 被观察者中发送onError事件时,不区分Throwable和Exception
02-23 11:59:30.439 5596-5596/? E/yzh: onNext--1
02-23 11:59:30.439 5596-5596/? E/yzh: onNext--2
02-23 11:59:30.439 5596-5596/? E/yzh: onErrorResumeNext--发生错误了
02-23 11:59:30.439 5596-5596/? E/yzh: onNext--3
02-23 11:59:30.439 5596-5596/? E/yzh: onNext--4
onExceptionResumeNext与onErrorResumeNext 差不多,但是区分Throwable和Exception
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Exception("发生错误了"));
}
})
.onExceptionResumeNext(new ObservableSource<Integer>() {
@Override
public void subscribe(Observer<? super Integer> observer) {
observer.onNext(11);
observer.onNext(22);
observer.onComplete();
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.e("yzh","onNext--"+integer);
}
@Override
public void onError(Throwable e) {
Log.e("yzh","onError--"+e.getMessage());
}
@Override
public void onComplete() {
Log.e("yzh","onComplete");
}
});
打印结果
注意区分Throwable和Exception的不同
Throwable
02-23 14:38:58.163 30725-30725/com.example.issuser.rxtest E/yzh: onNext--1
02-23 14:38:58.163 30725-30725/com.example.issuser.rxtest E/yzh: onNext--2
02-23 14:38:58.163 30725-30725/com.example.issuser.rxtest E/yzh: onError--发生错误了
Exception
onNext--1
onNext--2
onNext--1
onNext--2
onComplete
retry 重试,即当出现错误时,让被观察者(Observable)重新发射数据,类似作用还有repeat
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Exception("发生错误了"));
e.onNext(3);
}
})
.retry()
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.e("yzh","onNext-"+integer);
}
@Override
public void onError(Throwable e) {
Log.e("yzh","onError--"+e.getMessage());
}
@Override
public void onComplete() {
Log.e("yzh","onComplete");
}
});
打印结果
3 14:45:31.130 31188-31188/com.example.issuser.rxtest E/yzh: onNext-1
02-23 14:45:31.130 31188-31188/com.example.issuser.rxtest E/yzh: onNext-2
02-23 14:45:31.130 31188-31188/com.example.issuser.rxtest E/yzh: onNext-1
02-23 14:45:31.130 31188-31188/com.example.issuser.rxtest E/yzh: onNext-2
.....无限循环