Android-RxJava

RxJava线程调度源码分析

2019-12-17  本文已影响0人  Real_man

在官方的介绍中,很亮眼的大字就是:异步事件驱动的观察者模式。

a library for composing asynchronous and event-based programs by using observable sequences.

真正让RxJava强大的原因很明显是其异步处理能力,目前我们已经知道RxJava可以使用Scheduler自由的调度线程,那么仍然带着疑问,到底是如何实现的?

接着上一篇:RxJava源码分析继续线程调度的分析。

分析

SubscribeOn

1 来一段分析的程序

public class RxJavaSourceDemo {
    public static void main(String[] args) throws InterruptedException {
        Observable.just("Scheduler Demo")
                // 指定最开始时要运行的线程
                .subscribeOn(Schedulers.newThread())
                .subscribe(s -> System.out.println("当前线程:" + Thread.currentThread().getName()));

        Thread.sleep(1000);
        ;
    }
}
// 结果:
当前线程:RxNewThreadScheduler-1

2 subscribeOn操作符内,可以看到其创建了ObservableSubscribeOn。两个参数:

// this为ObservableJust
// scheduler为们传入的Schedulers.newThread(), 
// 其中NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
// 最终返回的结果为:new NewThreadScheduler()
new ObservableSubscribeOn<T>(this, scheduler)

3 调用subscribe方法,接着上一篇的分析,我们知道subscribe实际上调用的是subscribeActual方法。

    // ObservableSubscribeOn类
    @Override
    public void subscribeActual(final Observer<? super T> s) {
        // s为创建的LambdaObserver
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        
        // 这里我们未实现,另外这段是同步处理的,我们的案例代码中什么都没做
        s.onSubscribe(parent);

        // 真正的调度逻辑,parent=SubscribeOnObserver
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

4 可以看到ObservableSubscribeOn的subscribeActual调用的是scheduler.scheduleDirect(new SubscribeTask(parent))。parent为SubscribeOnObserver

    // scheduleDirect方法会直接走到这一步的内容。
    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        //抽象方法,对于我们这次指定的NewThreadScheduler创建的是new NewThreadWorker(threadFactory)
        final Worker w = createWorker();

        // 理解为没有做处理,返回的仍然是run内容,我们上一步传过来的是SubscribeTask
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        // 构建新的task
        DisposeTask task = new DisposeTask(decoratedRun, w);

        // worker进行调度。delay为0,单位不用关心
        w.schedule(task, delay, unit);

        return task;
    }

5 查看NewThreadWorker的schedule方法。代码比较长

    @NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        //实际为SubscribeTask
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        // parent为null
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        // 跳过
        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        Future<?> f;
        try {
            if (delayTime <= 0) {
                // 真正调用的内容
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            // 上面的ScheduledRunnable
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }

我们将上面真正执行的代码简化一下:

// parent为null,decoratedRun为早先创建的new SubscribeTask(LambdaObserver)的封装。DisposeTask
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

// executor为 Executors.newScheduledThreadPool(1, factory);
f = executor.submit((Callable<Object>)sr);
sr.setFuture(f);
retur sr;

6 因为早先返回的内容task为DisposeTask,其实最主要内容是SubscribeOnObserver,不管多少层不用关心,真正执行的代码是什么?

DisposeTask调用SubscribeTask.run方法,最后调用dispose方法。
SubscribeTask.run方法执行的为ObservableJust.subscribe方法,上一篇文章已经分析过。

7 其实来看的话就是将subscribeOn操作符之前的操作符subscribe方法使用放在另外一个线程中执行。

image-20191217154620438

ObserverOn

observerOn主要作用是在其下游的操作符都会使用ObserverOn指定的线程。

1 先来一段代码,结果和上面的subscribeOn一样。

public class RxJavaSourceDemo {
    public static void main(String[] args) throws InterruptedException {
        Observable.just("Scheduler Demo")
                // 指定最开始时要运行的线程
                .observeOn(Schedulers.newThread())
                .subscribe(s -> System.out.println("当前线程:" + Thread.currentThread().getName()));

        Thread.sleep(1000);
        ;
    }
}

2 调用observerOn之后,最终会返回new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)

// this即ObservableJust类
// scheduler传入的Schedulers.newThread()
// delayError默认为false
// bufferSize默认为Math.max(1, Integer.getInteger("rx2.buffer-size", 128)
new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)

3 查看ObservableObserveOn的subscribeActual方法。

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // TrampolineScheduler即使用当前的线程进行处理
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            // 创建Woker上面的代码已经分析过了,这里创建的是new NewThreadWorker(threadFactory)
            Scheduler.Worker w = scheduler.createWorker();

            // observer为LambdaObserver
            // w为worker
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

4 source.subscribe方法是在当前线程调用,早先说过ObservableJust的方法作用。这里就是在ObservableJust的数据传入ObserveOnObserver进行处理。

    @Override
    protected void subscribeActual(Observer<? super T> s) {
        // 构造新的Observer
        ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
        // 不处理
        s.onSubscribe(sd);
        sd.run();
    }

5 观察下ObservableObserverOn的onNext方法。

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            // 默认会加入到quene中
            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            // 使用worker.schedule(this);进行处理
            // 后续的subscribe方法与当前的observerOn在同一线程
            schedule();
        }

6 在schedule的run方法中。如果ObservableObserverOn执行完成之后会调用下游的Observable方法继续执行。

a.onNext(v);
image-20191217161112922

两者对比

两个都是RxJava的多线程操作符,其主要区别部分:

// ObservableObserveOn
@Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
            // 上游处理完成之后,当前的操作符才会进入另外一个线程。
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    
    
// ObservableSubscribeOn
    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        // 直接进入多线程,即上游的操作也会进入另外一个线程
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
image-20191217161551491

最后

分析完RxJava线程调度的源码之后,感觉神清气爽,原来内部是这样实现的啊。剩余各个操作符的功能我想在仔细看完这几个操作符之后,都大同小异,那么这里分析的结论:

RxJava中调用subscribe方法之后,其会不断的调用上游的subscribe方法,而每一层都对上游的observer做了一层封装。

在subscribeOn时,其在封装Observer时进入了另外一个线程,此时Observer逻辑还未处理,因此在真正的Observer执行逻辑时是在另外一个线程。

而ObserverOn操作,则是在上游的操作处理完之后,当前的observer进入另外一个线程进行处理,剩下观察当前操作符的下游Observer也会跟着进入另外一个线程

上一篇下一篇

猜你喜欢

热点阅读