Rxjava操作符学习(变换操作符)
2018-10-08 本文已影响26人
设计失
转换操作符在项目中使用是很频繁的,其原理也是最难的;不过一开始我们还是先知道怎么使用吧!
转换操作符如下:
Buffer
定期将Observable中的项目收集到束中并发出这些束,而不是一次发送一个项目
FlatMap
将Observable发出的项目转换为Observables,然后将这些项目的排放量变为单个Observable
GroupBy
将Observable划分为一组Observable,每个Observable从原始Observable发出一组不同的项目,按照键值对的形式组织
Map
通过将函数应用于每个项目来转换Observable发出的项目
Scan
将函数应用于Observable发出的每个项目,按顺序,并发出每个连续的值
Window
Buffer
count使用 Skip使用将发出的Observable中的item分组发出,它有很多重构函数:
buffer.png
// 进入源码看到
int count; 每个缓存区发射的最大条数
int skip; 必须大于0, 在开始一个新的缓冲区之前需要跳过的item数量
Callable<U> bufferSupplier; 一个作为缓存集合的工厂生成收集子类的实例
long timespan; 每个缓冲区在发出之前收集项目的时间段
Observable.just(1, 2, 3, 4, 5)
.buffer(2)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<List<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List<Integer> integers) {
Log.d(TAG, integers.toString());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
// 运行结果
10-08 06:51:11.450 16974-16974/com.ellison.convert D/BufferActivity: [1, 2]
10-08 06:51:11.451 16974-16974/com.ellison.convert D/BufferActivity: [3, 4]
10-08 06:51:11.452 16974-16974/com.ellison.convert D/BufferActivity: [5]
// skip使用
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
.buffer(/*count*/2, /*skip*/1)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<List<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List<Integer> integers) {
Log.d(TAG, integers.toString());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
// 运行结果
10-08 06:56:00.844 17318-17318/com.ellison.convert D/BufferActivity: [1, 2]
[2, 3]
[3, 4]
[4, 5]
[5, 6]
[6, 7]
[7, 8]
[8]
FlatMap
flatMap.c.png// 将传入的单词转换成大写
Observable.just("android", "kotlin", "java", "oc")
.flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(final String s) throws Exception {
return new ObservableSource<String>() {
@Override
public void subscribe(Observer<? super String> observer) {
observer.onNext(s.toUpperCase());
}
};
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG, s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
// 运行结果
10-08 07:17:39.477 18619-18619/? D/BufferActivity: ANDROID
KOTLIN
JAVA
OC
// 实际场景可以由传入的URL或者URI转换成Bitmap
Observable.just("1.url", "2.url", "3.url")
.flatMap(new Function<String, ObservableSource<Bitmap>>() {
@Override
public ObservableSource<Bitmap> apply(String s) throws Exception {
return new ObservableSource<Bitmap>() {
@Override
public void subscribe(Observer<? super Bitmap> observer) {
// 进行网络请求, 或者从本地拿到Bitmap数据
Bitmap bitmap = Bitmap.createBitmap(100, 100, Bitmap.Config.ARGB_8888);
observer.onNext(bitmap);
}
};
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Bitmap>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Bitmap bitmap) {
Log.d(TAG, bitmap + "");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
// 运行结果
10-08 07:24:17.200 19109-19109/com.ellison.convert D/BufferActivity: android.graphics.Bitmap@20c97e7
android.graphics.Bitmap@94f3f94
android.graphics.Bitmap@c955a3d
GroupBy
groupBy.c.pngObservable.just(1, 2, 3, 4)
.groupBy(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
// 通过条件来判断数据分组
if(integer < 2) {
return "A";
} else {
return "B";
}
}
})
.subscribe(new Observer<GroupedObservable<String, Integer>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(GroupedObservable<String, Integer> stringIntegerGroupedObservable) {
// 得到一个被观察者组,用来订阅获取到数据
final String key = stringIntegerGroupedObservable.getKey();
stringIntegerGroupedObservable.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "当前组为:" + key + " 值为: " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
// 运行结果为:
10-08 07:34:50.328 19800-19800/com.ellison.convert D/BufferActivity: 当前组为:A 值为: 1
当前组为:B 值为: 2
当前组为:B 值为: 3
当前组为:B 值为: 4
Map
map.png// Map与FlatMap不同,它只是将其单个转换,而FlatMap则是转换成一个新的被观察者
Observable.just(1, 2, 3, 4)
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "当前的值为: " + integer.toString();
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG, s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
// 运行结果为:
10-08 07:43:08.747 20353-20353/com.ellison.convert D/BufferActivity: 当前的值为: 1
当前的值为: 2
当前的值为: 3
当前的值为: 4
scan
scan.png// 用作累加
Observable.just(1, 2, 3, 4)
.scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
Log.d(TAG, "第一个值为: " + integer + " 第二个值为:" + integer2);
return integer + integer2;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "当前的结果为:" + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
// 运行结果:
10-08 07:51:09.521 20824-20824/com.ellison.convert D/BufferActivity: 当前的结果为:1
第一个值为: 1 第二个值为:2
当前的结果为:3
第一个值为: 3 第二个值为:3
当前的结果为:6
第一个值为: 6 第二个值为:4
当前的结果为:10
window
window与buffer类似,但是window是发送被观察者,而buffer发送的是数据源
window.C.png
Observable.just(1, 2, 3, 4)
.window(2)
.subscribe(new Observer<Observable<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Observable<Integer> integerObservable) {
integerObservable.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, integer + "");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
// 运行结果为:
10-08 07:54:37.791 21075-21075/? D/BufferActivity: 1
2
3
4