RxJava2框架源码分析三(map篇)

2019-12-10  本文已影响0人  yqianqiao

1.回顾

上篇已经讲了RxJava2创建操作符create源码解析,不清楚的可以查看RxJava2框架源码分析二(Create篇)

2.Map操作符

官方定义:transform the items emitted by an Observable by applying a function to each item
拙劣的翻译:应用一个函数 转换所有的被发射的item

 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "类型转换:" + integer;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("开始采用subscribe连接");
            }

            @Override
            public void onNext(String integer) {
               System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {
                System.out.println("对Complete事件作出响应");
            }
        });

关于创建步骤这些上面文章已经分析过了,所以直接跳过了,不清楚的可以到RxJava2框架源码分析二(Create篇)观看,这里直接看到运行结果:

示意图.png

3. 源码分析

老套路,按照步骤走

步骤一:创建ObservableCreate

步骤二

//步骤二,创建ObservableMap被观察者
.map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "类型转换:" + integer;
            }
        })

//源码分析
//创建ObservableMap被观察者
 public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        //判断非空
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        //创建ObservableMap并且返回出去(注意这里构造方法中的this指的是上一个被观察者ObservableCreate)
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

//这类继承了AbstractObservableWithUpstream,该类也是Observable的子类
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;
     ...
    // 仅贴出关键源码
    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        //初始化,调用父类构造方法
        //source是指上一个被观察者,本文中为ObservableCreate
        super(source);
      //将map()接收的function传递至全局
        this.function = function;
    }
}

 //这类也是Observable的子类,主要作用是包装,扩展
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
    //上一个被观察者的引用
    protected final ObservableSource<T> source;

    AbstractObservableWithUpstream(ObservableSource<T> source) {
        //通过构造方法赋值
        this.source = source;
    }
    @Override
    public final ObservableSource<T> source() {
        //返回上一个被观察者
        return source;
    }
}
/**
  * 此接口为Observable.map()里面的参数接口
  **/
public interface Function<T, R> {
    //定义两个泛型,参数为泛型T,返回泛型R
    R apply(@NonNull T t) throws Exception;
}

步骤三:创建观察者Observer
创建Observer接口的实现类

步骤四:通过ObservableMap订阅subscribe观察者Observer

/** 
  * 源码分析:ObservableMap.subscribe(observer)
  * 说明:该方法属于 Observable 类的方法(注:传入1个 Observer 对象)
  **/  
public abstract class Observable<T> implements ObservableSource<T> {
     ...
    // 仅贴出关键源码
  @Override
  public final void subscribe(Observer<? super T> observer) {
         ...
         // 仅贴出关键源码
        //可以看到调用的是本类的下面抽象方法
         subscribeActual(observer); 
   }
    //定义了一个抽象方法当调用subscribe时会跟这个调用Observable子类的实现方法(就是调用者)
   protected abstract void subscribeActual(Observer<? super T> observer);
}

/**
*  现在我们回到先前创建的被观察者中 ObservableMap类 
**/
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    //构造方法传入的Function实现类
    final Function<? super T, ? extends U> function;
    
    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
         //super()将上游的Observable保存起来 ,用于subscribeActual()中用。
        //source是指上一个被观察者,本文中为ObservableCreate
        super(source);
      //将map()接收的function传递至全局
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        //source是AbstractObservableWithUpstream的成员变量,通过上面构造方法传入的(本例子中带代表的是ObservableCreate)
        //步骤一,创建MapObserver对象,并传入观察者(observer)以及function,这里只是创建对象,没有调用里面的方法
        //步骤二,调用source.subscribe()把MapObserver对象传入ObservableCreate中去
        source.subscribe(new MapObserver<T, U>(t, function));
    }
    /**
      *  这类是map 的包装类是Observer的子类,BasicFuseableObserver继承的是Observer,
      *  该类是可融合中间观察者的基类,里面主要实现了onSubscribe()、onError()、onComplete() 等一些抽象方法
    **/
    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        //Function实现类
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            //将观察者传递给父类downstream(就是父类的成员变量downstream)
            super(actual);
            this.mapper = mapper;
        }
        
        @Override
        public void onNext(T t) {
            // 父类成员变量,表示是否发送过onError()、onComplete()事件
            if (done) {
                return;
            }
            //默认sourceMode是0,所以跳过
            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }

            U v;

            try {
                // 讲通过调用Function接口方法,将t传入,返回u,完成类型转换
                //相当于 v =mapper.apply(t)
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            //调用观察者的onNext()
            downstream.onNext(v);
        }
             ...
         // 仅贴出关键源码
    }
}

/**
   *  熟悉的类ObservableCreate,详细的分析在上篇文章
   * 上一个类中我们调用了source.subscribe(new MapObserver<T, U>(t, function));
   *  Observable.subscribe()方法最终会回调到ObservableCreate中的subscribeActual()方法
 **/
public final class ObservableCreate<T> extends Observable<T> {
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //将MapObserver()观察者封装成CreateEmitter对象
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
      // 调用观察者(MapObserver)的onSubscribe()
        observer.onSubscribe(parent);
        try {
            //3.调用source对象的subscribe()方法(发射器中的subscribe()实现类中的onNetx()系列方法)
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            //这里传入的是MapObserver对象
            this.observer = observer;
        }
        @Override
        public void onNext(T t) {
     
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
       
            if (!isDisposed()) {
                //调用的其实是MapObserver对象中的onNext()方法
                observer.onNext(t);
            }
        }
}
  1. 通过ObservableMap调用父类Observable方法subscribe(),该方法回调父类抽象方法subscribeActual()的实现类ObservableMap中去。
  2. ObservableMap类中重写subscribeActual()方法中创建了MapObserver观察者对象,并通过调用父类Observable方法subscribe()MapObserver对象传递给上一个被观察者ObservableCreate中的subscribeActual()实现方法中。
  3. 创建1个CreateEmitter对象(封装成一个Disposable对象)。
  4. 调用观察者MapObserveronSubscribe(CreateEmitter parent )使其可以取消订阅。
  5. 调用source对象的subscribe(CreateEmitter parent)方法,通过 parent发送事件回调。

4. 源码总结

  1. 创建被观察者,通过Observable.create()方法创建了ObservableCreate对象,然后通过ObservableCreate对象又创建了ObservableMap
  2. 订阅,ObservableMap通过调用父类方法subscribe()方法回调到ObservableMap重写父类subscribeActual()方法中,该方法创建MapObserver对象,并实现Observer中的onNext(),里面调用接口Function实现数据类型转换。
  3. ObservableMap.subscribeActual()方法中调用上一个被观察者ObservableCreate.subscribe(),回调到ObservableCreate.subscribeActual()中去,然后创建发射器调用MapObserver.onNext()
上一篇下一篇

猜你喜欢

热点阅读