RxJava学习总结-过滤类操作符
2019-07-02 本文已影响0人
取了个很好听的名字
前言
过滤类操作符就是过滤掉特殊的上游发射事件。
distinct
distinct:去除重复事件
代码如下:
Observable.just(1,1,2,2,3,3,4,4)
.distinct()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("accept",integer+"");
}
});
测试结果如下:
7-02 10:36:09.637 29251-29251/com.zhqy.myrxjava E/accept: 1
07-02 10:36:09.637 29251-29251/com.zhqy.myrxjava E/accept: 2
07-02 10:36:09.637 29251-29251/com.zhqy.myrxjava E/accept: 3
07-02 10:36:09.637 29251-29251/com.zhqy.myrxjava E/accept: 4
filter
过滤掉不符合条件的
Observable.just(1,2,3,4,5)
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer>2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("accept",integer+"");
}
});
测试结果如下:
07-02 10:41:30.310 29802-29802/com.zhqy.myrxjava E/accept: 3
07-02 10:41:30.310 29802-29802/com.zhqy.myrxjava E/accept: 4
07-02 10:41:30.310 29802-29802/com.zhqy.myrxjava E/accept: 5
elementAt
只发射特定下标的事件
代码如下:
//找下标为1的事件,没有则发送-1
Observable.just(1,2,3,4,5)
.elementAt(100,-1)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("accept",integer+"");
}
});
测试结果
07-02 10:58:30.704 31487-31487/com.zhqy.myrxjava E/accept: -1
skip
忽视上游发射的前几项事件
代码如下:
Observable.just(1,2,3,4,5)
.skip(3)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("accept",integer+"");
}
});
测试结果如下:
07-02 11:05:57.808 4543-4543/com.zhqy.myrxjava E/accept: 4
07-02 11:05:57.809 4543-4543/com.zhqy.myrxjava E/accept: 5
丢弃Observable开始的那段时间发射 的数据
代码如下:
Observable.interval(500,TimeUnit.MILLISECONDS)
.skip(3,TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e("accept",aLong+"");
}
});
测试结果
07-02 11:24:26.240 11070-11115/com.zhqy.myrxjava E/accept: 5
07-02 11:24:26.740 11070-11115/com.zhqy.myrxjava E/accept: 6
07-02 11:24:27.240 11070-11115/com.zhqy.myrxjava E/accept: 7
07-02 11:24:27.740 11070-11115/com.zhqy.myrxjava E/accept: 8
07-02 11:24:28.240 11070-11115/com.zhqy.myrxjava E/accept: 9
07-02 11:24:28.740 11070-11115/com.zhqy.myrxjava E/accept: 10
....
take
只接收上游发送的n个事件
代码如下:
Observable.range(0,100)
.take(10)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("accept",integer+"");
}
});
测试结果如下:
07-02 11:29:27.824 11880-11880/com.zhqy.myrxjava E/accept: 0
07-02 11:29:27.824 11880-11880/com.zhqy.myrxjava E/accept: 1
07-02 11:29:27.824 11880-11880/com.zhqy.myrxjava E/accept: 2
07-02 11:29:27.824 11880-11880/com.zhqy.myrxjava E/accept: 3
07-02 11:29:27.824 11880-11880/com.zhqy.myrxjava E/accept: 4
07-02 11:29:27.824 11880-11880/com.zhqy.myrxjava E/accept: 5
07-02 11:29:27.824 11880-11880/com.zhqy.myrxjava E/accept: 6
07-02 11:29:27.824 11880-11880/com.zhqy.myrxjava E/accept: 7
07-02 11:29:27.824 11880-11880/com.zhqy.myrxjava E/accept: 8
07-02 11:29:27.824 11880-11880/com.zhqy.myrxjava E/accept: 9
takeLast
只接收最后n个上游发送的事件
代码如下:
Observable.range(0,100)
.takeLast(10)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("accept",integer+"");
}
});
测试结果:
07-02 11:31:57.949 12377-12377/com.zhqy.myrxjava E/accept: 90
07-02 11:31:57.949 12377-12377/com.zhqy.myrxjava E/accept: 91
07-02 11:31:57.949 12377-12377/com.zhqy.myrxjava E/accept: 92
07-02 11:31:57.949 12377-12377/com.zhqy.myrxjava E/accept: 93
07-02 11:31:57.949 12377-12377/com.zhqy.myrxjava E/accept: 94
07-02 11:31:57.949 12377-12377/com.zhqy.myrxjava E/accept: 95
07-02 11:31:57.949 12377-12377/com.zhqy.myrxjava E/accept: 96
07-02 11:31:57.949 12377-12377/com.zhqy.myrxjava E/accept: 97
07-02 11:31:57.949 12377-12377/com.zhqy.myrxjava E/accept: 98
07-02 11:31:57.949 12377-12377/com.zhqy.myrxjava E/accept: 99
ofType
对发送的事件筛选出指定的数据类型发送
代码如下:
Observable.just(1,2,3,"4","5")
.ofType(String.class)
.subscribe(new Consumer<String>() {
@Override
public void accept(String string) throws Exception {
Log.e("accept",string);
}
});
测试结果如下
07-02 14:07:03.169 19405-19405/com.zhqy.myrxjava E/accept: 4
07-02 14:07:03.170 19405-19405/com.zhqy.myrxjava E/accept: 5
debounce
根据发送的事件时间间隔做出筛选,如果两次发送事件的间隔小于设定的timeout,则会取消前一个事件的发送
代码如下:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
SystemClock.sleep(1000);
emitter.onNext(3);
emitter.onNext(4);
SystemClock.sleep(1000);
emitter.onNext(5);
}
}).debounce(500,TimeUnit.MILLISECONDS)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("accept",integer+"");
}
});
测试结果:
07-02 14:15:40.029 19952-20086/com.zhqy.myrxjava E/accept: 2
07-02 14:15:41.029 19952-20086/com.zhqy.myrxjava E/accept: 4
07-02 14:15:42.029 19952-20086/com.zhqy.myrxjava E/accept: 5
firstElement& lastElement
获取上游发送的第一个和最后一个事件
代码如下:
Observable.just(1,2,3,4,5)
.firstElement()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("accept",integer+"");
}
});
Observable.just(1,2,3,4,5)
.lastElement()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("accept",integer+"");
}
});
测试结果
07-02 14:22:25.757 20430-20430/com.zhqy.myrxjava E/accept: 1
07-02 14:23:55.439 20621-20621/com.zhqy.myrxjava E/accept: 5