RxJava2.0入门系列二:Map操作符源码解析
2019-01-29 本文已影响0人
Mr_BingBing
一、应用场景
RxJava 中 map 操作符:对数据源发送的数据进行类型转换。
二、代码示例
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
})
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return " Map操作符 Integer类型" + integer + " 变换成 字符串类型" + integer;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(new Observer<String>() {
@Override
public void onNext(String s) {
Log.d(TAG, "onNext:" + s);
}
@Override
public void onComplete() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onSubscribe(Disposable d) {
}
});
上面的代码示例中事件源发送了三个int类型数据,分别是 1 、2、3,通过 map 操作符将其转换为 String 类型。
下面通过分析源码的方式来讲解map操作符是怎么进行数据类型转换的。
首先来看看map操作符源码长啥样?
/**
* Returns an Observable that applies a specified function to each item emitted by the source ObservableSource and
* emits the results of these function applications.
* @param <R> the output type
* @param mapper a function to apply to each item emitted by the ObservableSource
* @return an Observable that emits the items from the source ObservableSource, transformed by the specified
* function
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
//这个方法用来判空
ObjectHelper.requireNonNull(mapper, "mapper is null");
//关键还是ObservableMap
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
map 方法内部通过RxJavaPlugins.onAssembly(Observable mObservableMap)方法将传入的Function对象转换成Observable对象,而onAssembly()方法主要是进行Observable之间转换。
onAssembly()源码
/**
* Calls the associated hook function.
* @param <T> the value type
* @param source the hook's input value
* @return the value returned by the hook
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
通过以上分析最关键的是 ObservableMaple
ObservableMaple内部通过subscribeActual()方法将观察者source和被观察者MapObserver之间建立关系。
观察者MapObserver内部通过 U v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.")方法将输入的int数据类型转换成String类型u
···
···
ObservableMaple源码如下:
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
//被观察者source和观察者MapObserver建立关系
source.subscribe(new MapObserver<T, U>(t, function));
}
//观察者
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
// done Flag indicating no further onXXX event should be accepted.
if (done) {
return;
}
//sourceMode Holds the established fusion mode of the upstream
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
//核心代码 将传入的数据t转换成 v
//Apply some calculation to the input value and return some other value.
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
//actual The downstream subscriber
//观察者接收转换之后的数据v
actual.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public U poll() throws Exception {
T t = qs.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
}
apply()方法:对输入值A通过计算返回其他值B
/**
* A functional interface that takes a value and returns another value, possibly with a
* different type and allows throwing a checked exception.
*
* @param <T> the input value type
* @param <R> the output value type
*/
public interface Function<T, R> {
/**
* Apply some calculation to the input value and return some other value.
* @param t the input value
* @return the output value
* @throws Exception on error
*/
@NonNull
R apply(@NonNull T t) throws Exception;
}