RxJavaAndroid知识Android开发

Rxjava源码解读笔记:线程、map数据操作

2017-06-08  本文已影响393人  wenld_

一点牢骚:
前段时间,接到需求,旧项目要增添许多功能;旧项目是这样的:功能以及代码量就非常庞大,加上各种代码不规范、可读性很差、代码耦合度有点小高;
听到这个消息真的让我脑袋大了一圈,
如果真的要在原有架构上做开发,肯定会导致小组成员开发冲突以及众多的冗余代码,浪费时间和精力在非必要的事情上,之前自身也知道旧项目有这个问题 但由于新项目开发呀嫌弃旧项目一直没有决心去改动,这下好了完全推不了 那就改架构吧,新的模式是 组件化+Rxjava.Retrofit+MVP模式,最近一直在忙着项目代码架构调整,相对应的代码模板编写等等,虽然说改架构是被逼的,但改着改着还是有成长以及很有成就感的一件事情; 再接再厉。


说实话,rxjava的源码太难了,一直没有去时间(懒癌)去学习; 包括现在项目比较紧张,每天下班后更是不太想去学习,那么现在我就和大家一起看一下rxjava的源码吧;

1、正常简易流程;
2、带线程切换流程;
3、map之后;
4、一些总结

1、正常简易流程

基于以下这段代码查看源码

   Observable.just("11")
            .subscribe(observer);

大家应该都知道或者听过,Rxjava采用的是 增强版的观察者模式,在订阅的那一瞬间开始执行整个流程,那么现在看一下订阅方法subscribe(Observer<? super T> observer)

Observable.class
    @Override
    public final void subscribe(Observer<? super T> observer) {
        //..
        // 实际订阅
            subscribeActual(observer);
        //...
    }
    
    
RxJavaPlugins.class
    public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
        BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
        
        if (f != null) {
            return apply(f, source, observer);
        }
        return observer;
    }
    
    static <T, U, R> R apply(@NonNull BiFunction<T, U, R> f, @NonNull T t, @NonNull U u) {
        try {
            return f.apply(t, u);
        } catch (Throwable ex) {
            throw ExceptionHelper.wrapOrThrow(ex);
        }
    }

看到这里实际订阅是发生在 observable 的 subscribeActual 中 而 subscribeActual是个抽象方法; 那么我们又要去找它的实现;
这边通过Observable.just开始看

Observable.calss

  public static <T> Observable<T> just(T item) {
        //...
        return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
    }

ObservableJust.class
    
    protected void subscribeActual(Observer<? super T> s) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
        //调用 observer的 onSubsribe方法
        s.onSubscribe(sd);
        //执行
        sd.run();
    }
    
ScalarDisposable.calss
   public void run() {
   // 判断什么的
    if (get() == START && compareAndSet(START, ON_NEXT)) {
    // 
        observer.onNext(value);
        if (get() == ON_NEXT) {
            lazySet(ON_COMPLETE);
            observer.onComplete();
        }
    }
}

可以看到 run是直接执行的;
整体的一个简单正常的流程就是: observable.subscribe(Observer) -> observable.subscribeActual -> Observer.onSubscribe( Disposable ) -> ScalarDisposable.run -> observer.onNext(value) -> observer.onComplete();

简易源码流程——01

其中正常完整流程都会执行标红部分的方法;其中其它部分先放着,只是判断有没有完成完成所有数据流的发射

2、线程切换流程

基于以下这段代码查看源码

Observable.just("11")
        .subscribeOn(Schedulers.io())//指定Observable 在哪个线程上创建执行操作
        .observeOn(AndroidSchedulers.mainThread()) //在指定下一事件发生的线程
        .subscribe(observer);

2.1、 流向 Observable.subscribe 都经历了什么

先看下 Observable.subscribeOn都做了些什么

Observable.class
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        //
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
    
ObservableSubscribeOn.class    本质上继承 Observable
    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        //保存以及初始化
        super(source);
        this.scheduler = scheduler;
    }

可以看就就是转换变成了 ObservableSubscribeOn

再看下 Observable.observeOn(Scheduler scheduler) 做了些什么

Observable.class  这边应该是: ObservableSubscribeOn extends .... Observable
    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

ObservableObserveOn.class 
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
    super(source);
    this.scheduler = scheduler;
    this.delayError = delayError;
    this.bufferSize = bufferSize;
}

本质是将Observable转换成ObservableObserveOn ,在这个流程中是将 ObservableSubscribeOn 转换成ObservableObserveOn;

我们的Observable变换是这样子的,ObservableJust->ObservableSubscribeOn->ObservableObserveOn
一层一层被包含

Obserable转换

2.2、流向 -> Observer.onSubscribe 都经历了什么

那么又到了我们的 订阅方法subscribe(Observer<? super T> observer)了,只不过我们中间多了几层转换; 我们再来看一下

Observable.class
    @Override
    public final void subscribe(Observer<? super T> observer) {
        //...
        // 实际订阅
            subscribeActual(observer);//...
    }
ObservableObserveOn.class
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
        //创建一个  Scheduler.Worker
            Scheduler.Worker w = scheduler.createWorker();
        //   new一个新的 ObserveOnObserver implements Observer 再次循环  Observable.subscribe
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    
ObserveOnObserver.class .... implements Observer<T>, Runnable
    
Observable.class
    @Override
    public final void subscribe(Observer<? super T> observer) {
        //..
        // 实际订阅
            subscribeActual(observer);
        //...
    }

ObservableSubscribeOn.class 
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        //直接执行,what? Observer.onSubscribe 不能指定线程   
        // 记录一下   Observer.onSubscribe 的入口是
        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

Observer的转变是这样的 Observer->ObserveOnObserver->SubscribeOnObserver

以上面为准,先看下 s.onSubscribe(parent)所经历的事情

ObserveOnObserver.class 
    @Override
    public void onSubscribe(Disposable s) {
        if (DisposableHelper.validate(this.s, s)) {
            this.s = s;
            if (s instanceof QueueDisposable) {
                @SuppressWarnings("unchecked")
                QueueDisposable<T> qd = (QueueDisposable<T>) s;

                int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

                if (m == QueueDisposable.SYNC) {
                    sourceMode = m;
                    queue = qd;
                    done = true;
                    actual.onSubscribe(this);
                    schedule();
                    return;
                }
                if (m == QueueDisposable.ASYNC) {
                    sourceMode = m;
                    queue = qd;
                    actual.onSubscribe(this);
                    return;
                }
            }

            queue = new SpscLinkedArrayQueue<T>(bufferSize);

//   看这里  actual 其实是  Observer ;
            actual.onSubscribe(this);
        }
    }
    
Observer.class 
    onSubscribe(sd){...}

这里究竟可以看到 执行到 最初observeronSubscribe的一条完整的线路;
ObserveOnObserver.subscribeActual -> ObservableSubscribeOn.subscribeActual -> ObserveOnObserver.onSubscribe -> Observer.onSubscribe ;
不知道有没有细心的同学发现了没有,'onSubscribe'的执行没有SubscribeOnObserver什么事情,虽然说上面有一层转换成功了SubscribeOnObserver
画成图应该就是下面这样:

onSubscribe执行链

我们发现了 从订阅开始一直到执行我们的 observer.onSubscribe() 中间没有任何切换线程的影子;
所以我们得出了一个

observer的 onSubscribe 运行与订阅动作发生在同一线程,不受线程指定方法(observeOn subscribeOn)影响

2.3、流向 -> observer.next、onComplete 都经历了什么

ObservableSubscribeOn.class 
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        s.onSubscribe(parent);

//      new 出一个  SubscribeTask
//      scheduler.scheduleDirect 切换线程执行  SubscribeTask
//      SubscribeOnObserver.setDisposable方法

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

可以看到上面最后一段代码做个这样事情,一件一件去看一下:

// new 出一个 SubscribeTask
// scheduler.scheduleDirect 切换线程执行 SubscribeTask
// SubscribeOnObserver.setDisposable方法

先看一下SubscribeTaskrun 里面是干嘛的

ObservableSubscribeOn.class
    class SubscribeOnObserver
        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }
        
    class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            //其中  source 是  ObservableJust  
            source.subscribe(parent);
        }
    }

由第一节的分析我们可以知道,这边最终会执行到 SubscribeOnObserver.onNext() -> ObserveOnObserver.onNext()->Observer.onNext() 这边一层一层调用出来;

SubscribeTask.run 最终执行我们的 最初observer.onNext() onComplete(); 这边还没有涉及到线程切换

再看我们的 scheduler.scheduleDirect(new SubscribeTask)
我们上面用的是 Scheduler.IO 实际上是 IoScheduler;

IoScheduler extends Scheduler.class
    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();    

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        //指定工作线程
        w.schedule(task, delay, unit);

        return task;
    }
    @Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }
    
    EventLoopWorker extends Scheduler.Worker 
       @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }

            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    

那么这边流程就比较清晰了,拿到subscribeOn 设置的Scheduler中创建一个Worker 设定了一个 IO 线程;
看到这里 我们就该逆向地执行我们 Observer 真正的方法了;
执行到 SubscribeOnObserver.onNext()

ObservableSubscribeOn : SubscribeOnObserver<T> 
        @Override
        public void onNext(T t) {
        // actual 为 ObserveOnObserver
            actual.onNext(t);
        }
//  scheduler  这边指定为  AndroidSchedulers.mainThread()    createWorker() 这边不深究,里面转成了 handler
Scheduler.Worker worker = scheduler.createWorker();
ObservableObserveOn : ObserveOnObserver
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }
        void schedule() {
            if (getAndIncrement() == 0) {
            // 这个最终 执行在handler
                worker.schedule(this);
            }
        }

最后的流程应该是这样的


线程切换

3、map 数据操作源码

Observable.just(1)
        .map(new Function<Integer, Integer>() {
            @Override
            public Integer apply(@NonNull Integer integer) throws Exception {
                return null;
            }
        }).subscribe(integer -> out("accept:" + integer));
Observable.class
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }
    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
    
   MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            actual.onNext(v);
        }

可以看到 它是在 执行完 function.apply在执行 onNext();
配合上一节 ,流程图就变成这样了


加了map以后的流程

4、一些总结

来个总结吧: 估计源码看得很混乱。

1、对Observable指定线程、数据变换等等,都采用了一种代理包装模式; 比如 ObservableJust-> ObservableSubscribeOn -> ObservableMap -> ObservableObserveOn ; 进行了一层包装;
2、在订阅完成的那一刻起,反向调用 subscribe():subscribeActual()方法;比如 :(ObservableObserveOn.subscribe->ObservableObserveOn.subscribeActual())->(ObservableMap.subscribe->ObservableMap.subscribeActual())->(ObservableSubscribeOn.subscribe->ObservableSubscribeOn.subscribeActual())->(ObservableJust.subscribe->ObservableJust.subscribeActual())
3、Observer ,同理包装 Observer -> ObservableMap... 添加了指定 Schedulers.createWorker() ;
4、 Observer 的执行顺序是 Observer.onSubscribe() -> ObservableXX.onNext() -> ObsevableXXX.onNext() ->...-> Obsever.onNext() -> ObservableXX.OnComplete() -> ObsevableXXX.OnComplete() ->...-> Obsever.OnComplete();
5、 中间有些操作放入到了线程当中.

其实有点坑的是:原本我就知道这个流程应该是这样的,类似于事件分发机制成 U 字型的流程...... 本篇只是在 众多代码 中验证我的思路.................、

上一篇 下一篇

猜你喜欢

热点阅读