RxJava线程调度源码分析
在官方的介绍中,很亮眼的大字就是:异步事件驱动的观察者模式。
a library for composing asynchronous and event-based programs by using observable sequences.
真正让RxJava强大的原因很明显是其异步处理能力,目前我们已经知道RxJava可以使用Scheduler自由的调度线程,那么仍然带着疑问,到底是如何实现的?
接着上一篇:RxJava源码分析继续线程调度的分析。
分析
SubscribeOn
1 来一段分析的程序
public class RxJavaSourceDemo {
public static void main(String[] args) throws InterruptedException {
Observable.just("Scheduler Demo")
// 指定最开始时要运行的线程
.subscribeOn(Schedulers.newThread())
.subscribe(s -> System.out.println("当前线程:" + Thread.currentThread().getName()));
Thread.sleep(1000);
;
}
}
// 结果:
当前线程:RxNewThreadScheduler-1
2 subscribeOn操作符内,可以看到其创建了ObservableSubscribeOn。两个参数:
- 上游的ObservableJust
- NewThreadScheduler调度器
// this为ObservableJust
// scheduler为们传入的Schedulers.newThread(),
// 其中NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
// 最终返回的结果为:new NewThreadScheduler()
new ObservableSubscribeOn<T>(this, scheduler)
3 调用subscribe方法,接着上一篇的分析,我们知道subscribe实际上调用的是subscribeActual方法。
// ObservableSubscribeOn类
@Override
public void subscribeActual(final Observer<? super T> s) {
// s为创建的LambdaObserver
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
// 这里我们未实现,另外这段是同步处理的,我们的案例代码中什么都没做
s.onSubscribe(parent);
// 真正的调度逻辑,parent=SubscribeOnObserver
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
4 可以看到ObservableSubscribeOn的subscribeActual调用的是scheduler.scheduleDirect(new SubscribeTask(parent))。parent为SubscribeOnObserver
// scheduleDirect方法会直接走到这一步的内容。
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//抽象方法,对于我们这次指定的NewThreadScheduler创建的是new NewThreadWorker(threadFactory)
final Worker w = createWorker();
// 理解为没有做处理,返回的仍然是run内容,我们上一步传过来的是SubscribeTask
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// 构建新的task
DisposeTask task = new DisposeTask(decoratedRun, w);
// worker进行调度。delay为0,单位不用关心
w.schedule(task, delay, unit);
return task;
}
5 查看NewThreadWorker的schedule方法。代码比较长
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
//实际为SubscribeTask
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// parent为null
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
// 跳过
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
// 真正调用的内容
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
// 上面的ScheduledRunnable
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
我们将上面真正执行的代码简化一下:
// parent为null,decoratedRun为早先创建的new SubscribeTask(LambdaObserver)的封装。DisposeTask
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
// executor为 Executors.newScheduledThreadPool(1, factory);
f = executor.submit((Callable<Object>)sr);
sr.setFuture(f);
retur sr;
6 因为早先返回的内容task为DisposeTask,其实最主要内容是SubscribeOnObserver,不管多少层不用关心,真正执行的代码是什么?
DisposeTask调用SubscribeTask.run方法,最后调用dispose方法。
SubscribeTask.run方法执行的为ObservableJust.subscribe方法,上一篇文章已经分析过。
7 其实来看的话就是将subscribeOn操作符之前的操作符subscribe方法使用放在另外一个线程中执行。

ObserverOn
observerOn主要作用是在其下游的操作符都会使用ObserverOn指定的线程。
1 先来一段代码,结果和上面的subscribeOn一样。
public class RxJavaSourceDemo {
public static void main(String[] args) throws InterruptedException {
Observable.just("Scheduler Demo")
// 指定最开始时要运行的线程
.observeOn(Schedulers.newThread())
.subscribe(s -> System.out.println("当前线程:" + Thread.currentThread().getName()));
Thread.sleep(1000);
;
}
}
2 调用observerOn之后,最终会返回new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)
// this即ObservableJust类
// scheduler传入的Schedulers.newThread()
// delayError默认为false
// bufferSize默认为Math.max(1, Integer.getInteger("rx2.buffer-size", 128)
new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)
3 查看ObservableObserveOn的subscribeActual方法。
@Override
protected void subscribeActual(Observer<? super T> observer) {
// TrampolineScheduler即使用当前的线程进行处理
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
// 创建Woker上面的代码已经分析过了,这里创建的是new NewThreadWorker(threadFactory)
Scheduler.Worker w = scheduler.createWorker();
// observer为LambdaObserver
// w为worker
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
4 source.subscribe方法是在当前线程调用,早先说过ObservableJust的方法作用。这里就是在ObservableJust的数据传入ObserveOnObserver进行处理。
@Override
protected void subscribeActual(Observer<? super T> s) {
// 构造新的Observer
ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
// 不处理
s.onSubscribe(sd);
sd.run();
}
5 观察下ObservableObserverOn的onNext方法。
@Override
public void onNext(T t) {
if (done) {
return;
}
// 默认会加入到quene中
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
// 使用worker.schedule(this);进行处理
// 后续的subscribe方法与当前的observerOn在同一线程
schedule();
}
6 在schedule的run方法中。如果ObservableObserverOn执行完成之后会调用下游的Observable方法继续执行。
a.onNext(v);

两者对比
两个都是RxJava的多线程操作符,其主要区别部分:
// ObservableObserveOn
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
// 上游处理完成之后,当前的操作符才会进入另外一个线程。
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
// ObservableSubscribeOn
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
// 直接进入多线程,即上游的操作也会进入另外一个线程
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

最后
分析完RxJava线程调度的源码之后,感觉神清气爽,原来内部是这样实现的啊。剩余各个操作符的功能我想在仔细看完这几个操作符之后,都大同小异,那么这里分析的结论:
RxJava中调用subscribe方法之后,其会不断的调用上游的subscribe方法,而每一层都对上游的observer做了一层封装。
在subscribeOn时,其在封装Observer时进入了另外一个线程,此时Observer逻辑还未处理,因此在真正的Observer执行逻辑时是在另外一个线程。
而ObserverOn操作,则是在上游的操作处理完之后,当前的observer进入另外一个线程进行处理,剩下观察当前操作符的下游Observer也会跟着进入另外一个线程