RxJava_map操作符源码解析
2017-06-13 本文已影响63人
未见哥哥
map示例图RxJava 中 map 操作符是用于对数据源发送的数据进行类型转换的。例如下面的代码示例中事件源发送了两个整形数据 1 和 2,通过 map 操作符将其转换为 String 类型。下面是官方给出的 map 操作符的示例图,图所示功能就是将圆形数据通过 map 转换过 方形数据。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onComplete();
}
}).map(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return integer+"-zeal";
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e("zeal","s="+s);
}
});
map
- Function
Function 是一个接口,内部提供一个 apply 方法,根据下面的源码可以知道,它具备一个类型转换的功能。Funtion 这个接口定义了泛型 T 和 R 两种类型,它的功能就是将 T 类型的数据转化为 R 类型的数据。T 表示输入值,R 表示输出值。
public interface Function<T, R> {
/**
* Apply some calculation to the input value and return some other value.
* @param t the input value
* @return the output value
* @throws Exception on error
*/
@NonNull
R apply(@NonNull T t) throws Exception;
}
- map(function)
map 方法内部创建了一个 ObservableMap 对象,这个类是 Observable 的子类,依据上一节博客的分析中,当该 ObservableMap 和 Observer 发生订阅关系时,它最终会调用 subscribeActual 方法的实现即可。
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
//判空操作
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
- ObservableMap#subscribeActual
上一节分析过 RxJava 的整体调用流程可以知道,Observer 订阅事件源是一层层往上订阅的,并且每次订阅都会调用 subscribeActual 方法,在该方法中一般都会去调用上一级的 observable 去发生订阅关系。
@Override
public void subscribeActual(Observer<? super U> t) {
//上一级的 observable 去发生订阅关系
source.subscribe(new MapObserver<T, U>(t, function));
}
- 事件的传递 MapObserver#onNext(T)
在该方法中的变量 mapper 就是外界创建的 Function 对象。这里是通过 mapper.apply(t) 将输入值 t 进行转换操作,最终得到结果 v,然后调用 actual.onNext(v) 将转换后的 v 值向下传递给下一级的 observer。
.map(new Function<Integer, String>() {
//类型转换操作
@Override
public String apply(@NonNull Integer integer) throws Exception {
return integer+"-zeal";
}
})
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
}
@Override
public void onNext(T t) {
...
//定义一个需要转换后的类型 U 的变量 v
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}