RxJava 2 源码解析之线程切换
在分析RxJava2的线程切换源码之前,再看看在上一篇RxJava 2 源码解析之创建-订阅-变换-发布里总结的流程图:
其实对RxJava2是如何实现各种Observable-Observer之间的串联的之后,就可以先按照我们的常规理解来猜测一下它会怎么实现:
- 我们的subscribeOn()是在整个订阅链条中被最后一个最接近它的Observable调用起来的,那么是不是只需要这个Observable把这个方法的执行放到指定线程中就行了?
- 根据RxJava2的套路,执行subscribeOn()的时候,一定又会创建一个名字也会按照套路起的ObservableSubscribeOn,那么一定会是它在subscribeActual()中把它的source也就是ObservableCreate的subscribe()方法放到指定线程中去执行
- 而对于observeOn(),按照我们总结的事件发布链,除了最后一个Observable之外,每个Observable它的实际Observer都是下一个Observable的Observer,那么当我们希望我们的某个Observer在指定的线程执行的时候,是不是只要它的上一个将它放进去就行了?
而这个“上一个Observer”,不就是我们的observeOn()创建的吗?所以,控制下一个Observer执行线程的,应该就是ObserveOnObserver
。
下面看看RxJava具体究竟是如何实现的吧!
一段最简单的RxJava2线程切换的代码:
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d("RxJava2", "subscribe " + Thread.currentThread().getName());
emitter.onNext(1);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d("RxJava2", "onSubscribe " + Thread.currentThread().getName());
}
@Override
public void onNext(Integer integer) {
Log.d("RxJava2", "onNext " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
我们先看subscribeOn()过程的切换
subscribeOn
来看看它是怎么完成线程切换的:
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
嗯,还是跟之前的分析看到的一样,都是这个套路:
创建了一个ObservableXX
重点关注ObservableSubscribeOn:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
}
它有两个内部类,一个是以前的老套路:static的XXObserver,还多了一个实现了Runnable的SubscribeTask。
有了之前的经验,现在直接看subscribeActual()就好了,而且根据之前的猜测,它应该在这会有关于线程切换的内容,赶紧看看吧:
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
果然,scheduler.scheduleDirect(new SubscribeTask(parent))
这里一眼就能看出来,它专门为在某个线程执行的subscribeTask创建了一个任务,
@Override
public void run() {
source.subscribe(parent);
}
看看上面的源码,它其实就是一个Runnable嘛!那么这个scheduler就一定是负责让这个runnable执行起来的家伙了。
不过,等等,这一行是干嘛的?
s.onSubscribe(parent);
根据RxJava2的链条来说,它在这里调用了Observer的onSubscribe,那么在这个Observable之前的结点不是还有ObservableCreate吗?它也是会调用Observer的onSubscribe方法的呀,这不会造成重复调用吗?
再想想,不对,ObservableCreate调用的onSubscribe(),最终都会通过整个Observable链往下传递,那么我们这个SubscribeOnObserver不就在它的下面吗?会不会是它拦截了这个传递过程避免重复调用呢?
于是回头看看它的源码:
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
果不其然,这小子没有再往下传递了。嗯,解决了这个疑惑,接着我们之前的schedual流程:scheduler.scheduleDirect(xxx)
Schedular.java
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
OK,继续点:
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
这里的Worker是个抽象类,
public abstract static class Worker implements Disposable {
@NonNull
public Disposable schedule(@NonNull Runnable run) {
return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
@NonNull
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
public long now(@NonNull TimeUnit unit) {
return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
//... 省略
}
可以看出,它才是Scheduler中真正负责线程调度的角色。
而DisposeTask实现了Disposable和Runnable,它只是对我们的Runnable进行了一层包装。
createWorker也是个抽象方法,具体还是由我们切换的线程类型时所指定的Scheduler来实现的。
image.png可以看到RxJava2有这么多个Scheduler,我们这里
subscribeOn是在io线程,所以就去看看这个IoScheduler:
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
这个EventLoopWorker的schedule()方法:
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
//...省略
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
这里的threadWorker继承自NewThreadWorker,它也是Scheduler.Worker的子类,我们直接看看NewThreadWorker:
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
Finally! 终于,到了这里,可以看到我们的runnable最终被包装了几次之后被放入了ExecutorService中执行。
以上涉及到的一些类图关系:
rxjava2-thread-change-classes.png上图中,IoScheduler有三个内部类:EventLoopWorker,ThreadWorker, CachedThreadPool。
CachedThreadPool中保存了ThreadWorker的队列,它会被EventLoopWorker从队列中取出用来执行我们的Runnable。
其实梳理一下也很简单:
- Scheduler是各种线程调度的基类,它的各种实现是具体的调度者
- Scheduer内部的Worker是真正的调度执行者
- 而这个Worker也没做什么大事,就是从一个CachedWorkPool中,(说是WorkPool,其实它也是一个Runnable类,内部维护了一个ThreadWorker的队列)取出一个同样继承了Scheduler.Worker的ThreadWorker ,让它将我们的Runnable放入到ExecutorService中去执行。
这样,RxJava2就完成了一次subscribeOn()的线程切换。
observeOn()
OK,接下来我们再看看事件响应的实现过程。
同样的,会有一个ObserveOnObserver
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
ObservableObserveOn,呵呵,老套路,继续:
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
final Observer<? super T> actual;
final Scheduler.Worker worker;
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
}
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
//...省略很多代码
actual.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
void drainFused() {
int missed = 1;
for (;;) {
if (cancelled) {
return;
}
boolean d = done;
Throwable ex = error;
if (!delayError && d && ex != null) {
actual.onError(error);
worker.dispose();
return;
}
actual.onNext(null);
if (d) {
ex = error;
if (ex != null) {
actual.onError(ex);
} else {
actual.onComplete();
}
worker.dispose();
return;
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
//... 其他代码省略
}
}
根据RxJava2的套路,Observer的回调是从上往下层层传递的,因此,我们可以直接看ObserveOnObserver的onNext()方法,它先会把我们的回调数据存入到一个队列中,然后schedule(),在这里我们又看到了worker的身影,没错,它还是Scheduler的内部类Worker。它又来帮我们调度线程了。我们这个ObserveOnObserver是实现了Runnable接口的。进去看看:
Scheduler#Worker.java
public Disposable schedule(@NonNull Runnable run) {
return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
这里它下一步调用的schedule方法是个抽象方法,那么它的实现会是谁呢?
回到我们那个示例代码,我们使用的是主线程,它的实现是
HandlerWorker
。
看看它是什么样:
private static final class HandlerWorker extends Worker {
private final Handler handler;
HandlerWorker(Handler handler) {
this.handler = handler;
}
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
return scheduled;
}
//...省略
}
终于,我们看到了它是如何切换回主线程的,还是用到了Handler。那么这个一定是运行在主线程啦!
去AndroidSchedulers.java看看所用的Looper就知道了:
/** Android-specific Schedulers. */
public final class AndroidSchedulers {
private static final class MainHolder {
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
//...
}
就是这样,它的Looper是我们主线程的消息循环,OK,那么我们的Runnable就这样被加入到了主线程的消息队列中了。那么时机到了的时候,我们的Runnable就会被执行,那么我们的run()方法一般都会走到drainNormal()方法中,查看下,它也就是不断从我们之前放入数据的那个队列中取出数据,然后根据各种状态判断调用Observer的onNext(),onError(),onComplete()等方法。
至此,observeOn()我们就分析完了。
小结
RxJava的线程切换还有一些其他的Scheduler,其实也都大同小异,我们熟悉了它的这些套路,其他的也就都不难理解。
从以上流程中,我们也可以得出一些结论:
- 每次切换线程,其实也就是在整个链条中增加一个结点,它跟其他操作符一样,都是有ObservableXX-XXObserver一一对应的
- 多次调用subscribeOn(),最终subscribe()方法还是会回调在第一次调用subscribeOn()指定的线程里,因为第一次调用subscribeOn()创建的ObservableSubscibeOn总是会在最后一个执行
- 在我们的带回调的操作符执行之前,只有最接近的那次observeOn()会有效,其他的像这样
.observeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.computation())
.observeOn(Schedulers.single())
.subscribe(new Observer<Integer>() {...}
不只是无效而已,简直就是拉低性能,浪费生命。
好了,关于RxJa线程切换的分析就到这里。个人水平有限,如有错漏,欢迎指出,一起学习,一起进步[笑]~