【Android】【框架】【RxJava】

2019-08-18  本文已影响0人  徐乙_

本文探索2个方向

  1. 发布/订阅机制的实现架构
  2. 如何实现的一行代码切换线程

此外对于操作符的使用,我建议结合其他库一起学习,比如Retrofit+RxJava使用flatMap解决嵌套请求,比如结合Room、LifeCycle,比如RxBus

发布/订阅

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {}

            @Override
            public void onNext(Integer value) {}

            @Override
            public void onError(Throwable e) {}

            @Override
            public void onComplete() {}
        });

当调用了Observable.create后,目前的Observable结构是这样的

image.png

可以把它理解为发布者,它接受订阅者的参数
当我们调用了subscribe方法后,他会执行获取数据的逻辑,然后传递数据给订阅者

image.png

总结来说就是装饰者模式+观察者模式的灵活使用,每一级都返回Observable,并对之前的callback进行保存或执行

线程切换

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).observeOn(Schedulers.io())
          .subscribeOn(AndroidSchedulers.mainThread())
          .subscribe(new Observer<Integer>() {
              @Override
              public void onSubscribe(Disposable d) {}

              @Override
              public void onNext(Integer value) {}

              @Override
              public void onError(Throwable e) {}

              @Override
              public void onComplete() {}
          });

observeOn后,现在Observable的封装如下

image.png

subscribeOn同理,封装后如下

image.png

ObservableObserveOn是Observable,他对于subscribe有自己的实现
它的实现就是根据具体传入的Scheduler在对应的线程执行逻辑
可以参考如下代码

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker(); // 创建对应的Worker

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

举个例子,newThread的对应实现

@NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        Future<?> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }

可以看到内置了一个线程池,在线程池里执行实际逻辑

总结

RxJava的函数式编程核心就是通过装饰者+观察者实现的,十分巧妙
这一点很像JS的Promise

后记

有什么写得错误、让人费解或遗漏的地方,希望可以不吝赐教,我会马上更改

学习自

https://www.jianshu.com/p/88aacbed8aa5
http://ju.outofmemory.cn/entry/358094

上一篇下一篇

猜你喜欢

热点阅读