RxJava设计模式与原理

2021-01-19  本文已影响0人  暮暮频顾惜

标准观察者设计模式

RxJava是一种特殊的观察者模式,首先我们先来看标准的观察者设计模式。在标准观察者模式中,存在两种对象,一种是观察者,一种是被观察者,“被观察者与“观察者之间是一对多的关系。当被观察者发出通知改变的时候,观察者才能察觉到。

Observerable.java

public interface Observerable {
    private List<Observer> observers = new ArrayList<>();

    public void addObserver(Observer observer) ;

    public void removeObserver(Observer observer);

    public void notifyObservers() ;

    public void pushMessage(String msg);
}

Observer.java

public interface Observer{
    void update();
}

RxJava Hook(钩子)

Hook技术又叫钩子函数,在系统没有调用函数之前,钩子就先捕获该消息,得到控制权。这时候钩子程序既可以改变该程序的执行,插入我们要执行的代码片段,还可以强制结束消息的传递。

RxJava中的hook的使用

RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
        @Override
        public Observable apply(Observable observable) throws Throwable {
            return observable;
        }
    });

来观察这么一个代码段:

public class MainActivity extends AppCompatActivity {

    @InjectView(R.id.tv_text)
    TextView textView;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);

        RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
            @Override
            public Observable apply(Observable observable) throws Throwable {
                System.out.println("apply : " + observable);
                return observable;
            }
        });
        testHook();
    }

    private void testHook() {
        Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Object> e) throws Throwable {
                e.onNext(null);
            }
        }).map(new Function<Object, Boolean>() {
            @Override
            public Boolean apply(Object o) throws Throwable {
                return null;
            }
        }).subscribe(subscribe(new Observer<Boolean>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Boolean aBoolean) {

            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
    }
}

运行后Logcat打印出

2021-01-18 14:18:53.452 7793-7793/com.example.anatationtest I/System.out: apply : io.reactivex.rxjava3.internal.operators.observable.ObservableCreate@1569e83
2021-01-18 14:18:53.452 7793-7793/com.example.anatationtest I/System.out: apply : io.reactivex.rxjava3.internal.operators.observable.ObservableMap@27f9800

可以看到ObservableCreateObservableMap都被hook方法捕获了,这样就可以实现RxJava的全局监听。观察create()源码:

public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
    Objects.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}

map()方法源码:

public final <R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
      Objects.requireNonNull(mapper, "mapper is null");
      return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));
  }

我们再看RxJavaPlugins.onAssembly()方法:

@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}

在这里有个onObservableAssembly,如果该变量不为空,则为Observable对象应用apply(f,source),再往前寻找,找到该变量被赋值的地方,发现就是在我们前面通过setOnObservableAssembly()函数设置全局监听,顾名思义,就是设置被观察者的装配。

public static void setOnObservableAssembly(@Nullable Function<? super Observable, ? extends Observable> onObservableAssembly) {
      if (lockdown) {
          throw new IllegalStateException("Plugins can't be changed anymore");
      }
      RxJavaPlugins.onObservableAssembly = onObservableAssembly;
  }

RxJava 观察者设计模式

两者观察者设计模式对比

在标准观察者设计模式中,“被观察者”与“观察者”是一对多的关系,并且需要被观察者发出改变通知后,所有的观察者才能观察到。在RxJava观察者设计模式中,“被观察者”与“观察者”是多对一的关系,并且需要在起点和终点订阅一次后,才能发出改变通知,也可以称之为发布订阅模式。

在标准观察者模式中,被观察者中的容器直接存放观察者的引用,耦合度更高。RxJava中,被观察者与观察者通过抽象层发射器连接,降低了耦合度。

  1. Observer源码
  2. Observerable创建过程,源码分析
  3. subscribe订阅过程,源码分析
  4. map转换流程,源码分析

Observer

//Observer是一个泛型接口,包含四个抽象方法
public interface Observer<@NonNull T> {

     //当Observer对象被Observerable对象subscribe时候马上调用
     //d 为此次订阅的事件对象,如果调用dispose,则订阅会被杀死
    void onSubscribe(@NonNull Disposable d);

     //把被观察者从上游传递来的对象给观察者
    void onNext(@NonNull T t);

     //通知观察者,被观察者发送了错误,调用此方法将不会调用onNext()和onComplete()方法
    void onError(@NonNull Throwable e);

    //通知观察者,被观察者已经发送完事件,如果抛出onError()错误则不会调用
    void onComplete();

}

Observerable创建过程

首先调用create()方法,传入自定义source作为参数

public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
        Objects.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
    }

接着先对传进来的source进行判空,如果为空,则直接抛出异常。如果不为空,则走onAssembly()方法,该方法在上面已经分析过了,所以我们再来看new ObservebleCreate<>(source)的创建过程。

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    ......
}

ObservableCreate中创建了一个source成员变量,并在创建的时候直接将传入的自定义source赋值给它。

subscribe订阅过程

observable.subscribe(new Observer(object){
  ......
})

我们来分析subcribe()函数

public final void subscribe(@NonNull Observer<? super T> observer) {
    Objects.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);
        Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
      //错误处理
      ......
    }

    protected abstract void subscribeActual(@NonNull Observer<? super T> observer);

}

前面几行代码是对observer对象进行校验,如果校验没问题,最后调用subscribeActual()方法。而该方法是一个抽象方法,那么是谁实现了该方法呢,是我们的ObservableCreate,因此我们再来看它里面subscribeActual()的实现。

@Override
   protected void subscribeActual(Observer<? super T> observer) {
       CreateEmitter<T> parent = new CreateEmitter<>(observer);
       observer.onSubscribe(parent);

       try {
           source.subscribe(parent);
       } catch (Throwable ex) {
           Exceptions.throwIfFatal(ex);
           parent.onError(ex);
       }
   }

首先CreateEmitter为我们传进来的自定义observer创建一个事件发射,并将它包裹起来,然后调用observer.onSubscribe(),这就是前文提到一调用subscribe()observer.onSubscribe()就会调用的原因。

接着调用source.subcribe(),把创建的事件发射器传进去。还记得source是什么吗,它就是我们一开始创建的自定义source,而该方法的抽象实现又回到了在我们代码最开始的地方。

Observable.create(new ObservableOnSubscribe<Object>() {
         @Override
         public void subscribe(@NonNull ObservableEmitter<Object> e) throws Throwable {
             e.onNext("");
         }
     })

然后就会调用到CreateEmitter.onNext()

public void onNext(T t) {
      if (t == null) {
          onError(ExceptionHelper.createNullPointerException("onNext called with a null value."));
          return;
      }
      if (!isDisposed()) {
          observer.onNext(t);
      }
  }

由于CreateEmitter中已经持有了observer的对象,最后就会调用到observer.onNext(t),这样完整的订阅流程就形成了。

map()源码分析

public final <R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
      Objects.requireNonNull(mapper, "mapper is null");
      return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));
  }

我们重点来ObservableMap这个类。

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        //保存function
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        ......
    }
}

首先在构造函数中,将Function保存起来,map()之后,返回的依旧是Observerable对象,并将ObserverableCreate对象保存为它的source,接下来会走到ObservableMap.subscribeActual()中,然后为observer对象t包裹一层MapObserver。这时候再调用ObserverableCreate.subscribeActual(),后续的订阅流程就跟之前一样了。

在这里,我们重点分析MapObserver中的处理。

static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
  final Function<? super T, ? extends U> mapper;

  MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
      super(actual);
      this.mapper = mapper;
  }

  @Override
  public void onNext(T t) {
      if (done) {
          return;
      }

      if (sourceMode != NONE) {
          downstream.onNext(null);
          return;
      }

      U v;

      //mapper合法性校验
      try {
         //应用function转换
          v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
      } catch (Throwable ex) {
          fail(ex);
          return;
      }
      //向下游传递
      //实际为我们自定义的observer
      downstream.onNext(v);

      @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        //异常处理
        @Nullable
        @Override
        public U poll() throws Throwable {
            T t = qd.poll();
            return t != null ? Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
        }
  }
}

由前面的流程分析,会一直调用到CreateEmitter.onNext(),然后又会进一步对MapObserver进行拆包,进而走到它的MapObserver.onNext()方法中,先对Function 对象 mapper进行合法性校验,然后调用apply()函数,这里的apply函数是抽象函数。具体实现在我们的链式调用中,最后得到返回值v,也就是经过变换后的对象,并调用downstream.onNext(v) 向下游传递。downsteam则为被包裹住的observer对象。

new Function<Object, Boolean>() {
    @Override
    public Boolean apply(Object o) throws Throwable {
        return null;
    }
}

为什么RxJava要这么写呢,从上文的分析中,我们可以看到在链式调用中,如果调用多次map(),相当于为自定义observer封装了层层包裹,发布订阅的过程就相当于是封包->拆包的过程,代码逻辑清晰,避免了函数嵌套。这就是一种装饰模型,一开始我们创建了ObserverableCreate,然后为它穿上一件件ObserverableMap的外套,然后subscribe的过程就是脱外套的过程。

总结

本篇文章通过对RxJava中Observer、Observable、subscribe源码分析,比较了其与标准观察者设计模式的差别,更深的学习了RxJava的思想和原理。

上一篇下一篇

猜你喜欢

热点阅读