RxJava2工作原理及线程切换
序言
RxJava是现在最流行的响应式函数编程框架,之前的项目中一直使用RxJava,结合Retrofit+OkHttp搭建网络请求框架,很是好用。
后来RxJava2出来了,官网表明一段时间之后不再维护RxJava,所以在新项目中,决定使用RxJava2。
对于新手来说,即使没用过RxJava,也可以直接学习RxJava2。而对于从RxJava过渡到RxJava2的同学,自然更容易上手。
响应式编程(Reactive Programming)这个词很多人都知道,但具体是什么含义可能没多少人能解释清楚。我简单说一下自己的理解:响应式编程可基于任何事物(数据、用户行为、时间、对象)创建事件流,并且框架提供一个强大的函数库来操作事件流,包括合并、过滤、转换、切换线程、监听...,流是响应式的核心。
RxJava就是这样一个响应式编程框架,今天我们主要来介绍RxJava2的事件处理流程和线程切换原理。本文并不是一篇新手指引教程,而是一篇进阶教程。如果想入门RxJava2,可以看一下这篇文章。
用户登录场景
下面以用户登录场景为例,介绍怎样通过RxJava2进行事件流处理,登录代码如下:
/**
* 用户登录操作
*/
public void login(final String userName, final String password) {
Observable.create(new ObservableOnSubscribe<CommonApiBean<UserInfo>>() {
@Override
public void subscribe(ObservableEmitter<CommonApiBean<UserInfo>> e) throws Exception {
e.onNext(loginApi(userName, password));
}
})
.map(new Function<CommonApiBean<UserInfo>, UserInfo>() {
@Override
public UserInfo apply(CommonApiBean<UserInfo> bean) throws Exception {
if (bean != null && bean.body != null) {
return bean.body;
}
return new UserInfo();
}
})
.doOnNext(new Consumer<UserInfo>() {
@Override
public void accept(UserInfo userInfo) throws Exception {
saveUserInfo(userInfo);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<UserInfo>() {
@Override
public void accept(UserInfo userInfo) throws Exception {
//登录成功,跳转页面
loginSuccess(userInfo);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
//登录失败提示用户
loginFailed();
}
});
}
上面一段代码是RxJava2常规的使用方式,能够满足多数网络请求场景,下面我们就针对这段代码进行分析。
分析事件流
RxJava2处理事件流分为3个步骤:
- 构建操作符对应的Observable
- 逐级生成Observer,逆向订阅Observable
- 逐级调用Observer的onNext方法
下面我们就分别来介绍这三个流程。
1. 构建操作符对应的Observable
上面一段代码中,使用的操作符包括create、map、doOnNext、subscribeOn、observeOn
,我们依次看这些操作符做了什么事情。
Observable
要了解RxJava2原理,必须先了解Observable,Observable是RxJava2事件流的入口类,也可以叫做事件源。
public abstract class Observalbe<T> implements ObservableSource<T> {
//交由子类实现的抽象方法
@Override
protected abstract void subscribeActual(Observer observer) ;
//实现了ObservableSource的方法
@Override
public final void subscribe(Observer<? super T> observer) {
//省略一堆判空等处理
subscribeActual(observer);
}
//省略一堆静态操作符方法
}
从Observable的定义可知,它实现了ObservableSource接口,并定义了一个subscribeActual抽象方法,调用Observable的subscribe方法实际上是做了一些基础判断后,调用subscribeActual方法。Observable的每个子类需要需要实现自己的subscribeActual方法。
create
跟踪到Observable的create方法:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
第一句代码对source进行判空,如果为空,会抛出异常。接着生成一个ObservableCreate对象,把这个对象传入RxJavaPlugin进行组装。
RxJavaPlugin提供了一系列的Hook Function,通过这种函数对RxJava的标准进行加工,如果我们不配置这些方法,默认直接返回原对象,即ObservableCreate。
注:下面介绍其他操作符,就不再解释判空操作和RxJavaPlugin。
接着,我们看一下ObservableCretae的定义:
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
//省略其他方法
}
很简单,ObservableCreate继承Observable,并且在构造方法中保存了传入的ObservableOnSubscribe对象。
总结:create()构建了一个ObservableCreate对象,该对象继承Observable。
map
同样,跟踪到Observable的map方法中:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
这里创建并返回一个ObservalbeMap对象:
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
//省略其他方法
}
ObservableMap继承AbstractObservableWithUpstream类:
/**
* Base class for operators with a source consumable.
*/
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
protected final ObservableSource<T> source;
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
@Override
public final ObservableSource<T> source() {
return source;
}
}
AbstractObservableWithUpstream是所有接收上一级输入操作符的基类。
总结:map()构建了一个ObservableMap对象。
doOnNext
根据上面两个操作符的源码,我们猜测这里也会返回一个Observable子类对象,进入源码验证一下:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> doOnNext(Consumer<? super T> onNext) {
return doOnEach(onNext, Functions.emptyConsumer(), Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
private Observable<T> doOnEach(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Action onAfterTerminate) {
//省略判空操作
return RxJavaPlugins.onAssembly(new ObservableDoOnEach<T>(this, onNext, onError, onComplete, onAfterTerminate));
}
果不其然,doOnNext最后返回了一个ObservableDoOnEach对象:
public final class ObservableDoOnEach<T> extends AbstractObservableWithUpstream<T, T> {
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Action onAfterTerminate;
public ObservableDoOnEach(ObservableSource<T> source, Consumer<? super T> onNext,
Consumer<? super Throwable> onError,
Action onComplete,
Action onAfterTerminate) {
super(source);
this.onNext = onNext;
this.onError = onError;
this.onComplete = onComplete;
this.onAfterTerminate = onAfterTerminate;
}
//省略其他代码
}
doOnNext也需要接收上流传来的Observable作为source,所以也继承了AbstractObservableWithUpstream。
总结:doOnNext()构建了一个ObservableDoOnEach对象。
subscribeOn
用过RxJava的同学都知道,subscribeOn是用来切换线程的,用于指定被观察者执行的线程。
不着急,怎样切换线程我们后面会分析,先看一下它的源码:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
这里返回了一个ObservableSubscribeOn对象,继续跟踪:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
//省略其他代码
}
ObservableSubscribeOn中保存了事件源source和线程调度器scheduler,而这个scheduler是我们传入的Schedulers.io()
。
总结:subscribeOn()构建了一个ObservableSubscribeOn对象。
observeOn
各位同学肯定也知道,observeOn用于指定观察者执行的线程,至于怎样实现线程切换等到后面分析。
老套路,我们跟踪到Observable.observeOn方法:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
//省略判空验证操作
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
observeOn最后返回了一个ObservableObserveOn对象:
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
//省略其他代码
}
ObservableObserveOn对象也保存了事件源source和线程调度器scheduler,这里的scheduler是我们传入的AndroidScheduler.mainThread()
。
总结:observeOn构建了一个ObservableObserveOn对象。
到这里,操作符对应的Observable构建完成,总结一下,按照操作符顺序,构建了ObservableCreate -> ObservableMap -> ObservableDoOnEach -> ObservableSubscribeOn -> ObservableObserveOn
几个Observable对象。
2. 逐级生成Observer,逆向订阅Observable
LambdaObserver
在登录场景的最后,调用了subscribe方法,传入了两个Comsumer对象,我们看一下subscribe方法的实现:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
return subscribe(onNext, onError, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
//省略判空操作
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
从代码看出,subscribe方法拿我们传入的两个Consumer构建了一个LambdaObserver对象,而两个Consumer分别对应onNext和onError,并且用LambdaObserver来订阅ObservableObserveOn。
总结:用LambdaObserver订阅ObservableObserveOn对象。
ObserveOnObserver
继续分析ObservableObserveOn.subscribe方法,上面提到过,Observable的subscribe方法实际上会调用具体子类的subscribeActual方法,所以我们跟踪ObservableObserveOn的subscribeActual方法:
//ObservableObserveOn
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
TrampolineScheduler表示是否当前线程,而我们传入的schedule是AndroidScheduler.mainThread()
,并不是TrampolineScheduler对象。
所以会走else逻辑,创建一个Scheduler.Worker,并把它作为参数构建一个ObserveOnObserver对象。
用ObserveOnObserver对象订阅source(source是我们构建ObservableObserveOn对象传入的ObservableSubscribeOn对象)。
总结:用ObserveOnObserver订阅ObservableSubscribeOn对象
SubscribeOnObserver
接着,看一下ObservableSubscribeOn的订阅逻辑:
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
首先创建一个SubscribeOnObserver对象,scheduler.scheduleDirect
从名字上看,大概用来切换线程的。在指定线程中执行SubscribeTask任务:
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
这个任务很简单,就是用parent(SubscribeOnObserver对象)来订阅source(此处的source是ObservableDoOnEach)。
总结:SubscribeOnObserver订阅ObservableDoOnEach对象。
DoOnEachObserver
老套路,我们继续看ObservableDoOnEach的subscribeActual方法:
@Override
public void subscribeActual(Observer<? super T> t) {
source.subscribe(new DoOnEachObserver<T>(t, onNext, onError, onComplete, onAfterTerminate));
}
这个方法很干脆,直接用DoOnEachObserver订阅source(此处的source是ObservableMap对象)
总结:用DoOnEachObserver订阅ObservableMap对象。
MapObserver
到这里,相信大家也可以猜到,ObservableMap的subscribeActual中,肯定也是构建一个MapObserver来订阅source,本着实事求是的精神,源码还是要看的:
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
果不其然_!!
总结:用MapObserver订阅ObservableCreate。
最后,我们要看一下ObservableCreate中的subscribeActual方法:
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
这里也出现了source.subscribe(parent)
,parent是CreateEmitter对象,那么source是什么呢?
有哪位同学能回答一下这个问题吗?
哈哈,不卖关子了,这里的source就是我们最开始创建Observable事件流传入的ObservableOnSubscribe对象,还有印象吗,没有也没关系:
public void login(final String userName, final String password) {
Observable.create(new ObservableOnSubscribe<CommonApiBean<UserInfo>>() {
@Override
public void subscribe(ObservableEmitter<CommonApiBean<UserInfo>> e) throws Exception {
e.onNext(loginApi(userName, password));
}
}).//省略后续操作代码
}
没错,就是调用此处我们实现的subscribe方法。
到这里,逆向订阅Observable的过程分析完毕了。
总结:
LambdaObserver -> ObservableObserveOn
ObserveOnObserver -> ObservableSubscribeOn
SubscribeOnObserver -> ObservableDoOnEach
DoOnEachObserver -> ObservableMap
MapObserver -> ObservableCreate
最后调用ObservableOnSubscribe的subscribe方法
有木有感觉思绪逐渐明朗起来!不急,后面还有呢!
3. 逐级调用Observer的onNext方法
MapObserver.onNext
接着上面最后一步进行分析,调用ObservableOnSubscribe的subscribe方法,而我们在subscribe中调用了e.onNext
,此处的e是CreateEmmiter对象,进入它的onNext方法:
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
逻辑很简单,对发射的数据判空,如果数据为空则抛出异常。如果没有中断事件流,则调用observer.onNext
,此处的observer是创建CreateEmitter时传入的MapObserver。
总结:调用MapObserver的onNext方法。
DoOnEachObserver
跟踪到MapObserver的onNext方法:
@Override
public void onNext(T t) {
//省略一些判断
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
先通过mapper.appply
对数据t做变换,变换之后继续调用DoOnEachObserver的onNext方法。
总结:调用DoOnEachObserver的onNext方法。
SubscribeOnObserver.onNext
我们跟到DoOnEachObserver.onNext方法:
@Override
public void onNext(T t) {
if (done) {
return;
}
try {
onNext.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
s.dispose();
onError(e);
return;
}
actual.onNext(t);
}
跟我们预料一样,继续调用下一级observer(SubscribeOnObserver)的onNext方法。
总结:调用SubscribeOnObserver的onNext方法。
ObserveOnObserver.onNext
长征的路一步一步走,接着看SubscribeOnObserver.onNext方法:
@Override
public void onNext(T t) {
actual.onNext(t);
}
这里也很干脆直接,调用上一级observer(ObserveOnObserver)的onNext方法。
总结:调用ObserveOnObserver的onNext方法。
LambdaObserver.onNext
进入ObserveOnObserver.onNext方法:
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
先把数据加入queue,然后代用schedule方法,这里涉及到线程调度,我们稍后分析,总之最后会调用到LambdaObserver.onNext方法。
到这里,逐级调用Observer的onNext方法也分析完毕了。
总结:
MapObserver.onNext -> DoOnEachObserver.onNext -> SubscribeOnObserver.onNext -> ObserveOnObserver.onNext -> LambdaObserver.onNext
到这里,RxJava整个事件流的原理分析完毕了。回顾一下,包括三个步骤:
- 构建操作符对应的Observable
- 逐级生成Observer,逆向订阅Observable
- 逐级调用Observer的onNext方法
相信各位同学很容易理解这个图片描述的流程。
线程切换原理
在理解了RxJava2操作符工作原理之后,我们需要分析subscribeOn和observeOn切换线程的原理。
1. subscribeOn
上面提到过,subscribeOn指定被观察者处理事件流所在线程,它作用在subscribe阶段(即图中逆向订阅过程),我们重新看一下ObservableSubscribeOn的订阅过程:
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
此处的scheduler,是我们出传入的Schedulers.io()
,这是个什么东西呢?
public final class Schedulers {
@NonNull
static final Scheduler IO;
@NonNull
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
static {
IO = RxJavaPlugins.initIoScheduler(new IOTask());
//省略其他几个Scheduler初始化过程
}
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
//省略其他代码
}
看到这里,我们知道了,Scheduler.io()
具体是指IoScheduler对象,IoScheduler继承Scheduler,Scheduler是所有线程调度器的父类,看一下Scheduler的实现:
public abstract class Scheduler {
@NonNull
public abstract Worker createWorker();
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
//省略其代码
}
Scheduler是个抽象类,包含一个抽象方法createWorker,返回一个Worker对象。
而它的scheduleDirect方法实际上就是调用这个Worker的schedule方法。
继续分析线程切换逻辑,代码中调用了IoScheduler.scheduleDirect方法,实际就是把SubscribeTask交给IoScheduler.createWorker构建的worker去执行。
跟到IoScheduler.createWorker方法:
@NonNull
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
返回了一个EventLoopWorker对象,进入它的schedule方法:
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
这里也不是真正执行任务的地方,那就继续跟进到ThreadWorker.scheduleActual方法:
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
到这里,总算看到了,通过线程池分配线程来执行任务。
总结:subscribeOn(Schedulers.io())
会在逆向订阅步骤中,通过线程池分配一个子线程来执行任务。
图中通过粉色和红色的箭头区分了UI线程和子线程,走到订阅ObservableSubscribeOn时,从UI线程切换到子线程,箭头从粉色变为红色,之后的逆向订阅操作都在子线程中进行。
2. observeOn
接下来分析observeOn,它是指定观察者(订阅者)处理事件所在线程。我们传入的是AndroidScheduler.mainThread()
,这又是个什么东西呢?
public final class AndroidSchedulers {
private static final class MainHolder {
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
public static Scheduler from(Looper looper) {
if (looper == null) throw new NullPointerException("looper == null");
return new HandlerScheduler(new Handler(looper));
}
private AndroidSchedulers() {
throw new AssertionError("No instances.");
}
}
看到这段代码,我们就知道了,AndroidScheduler.mainThread
指的是HandlerThread,在它构造方法中会传入一个主线程Handler,相信不用解释,各位同学明白这个Handler的作用吧。对,,,就是用来把观察者执行逻辑切换到主线程。
那么,具体是在哪个过程切换的呢?执行ObserveOnObserver的onNext阶段!
代码如下:
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
先把事件流中的数据t加入队列queue,然后执行schedule方法:
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
这个worker相信大家猜到了,就是HandlerScheduler的createWorker得到的对象,把this作为任务(ObserveOnObserver实现了Runnable),交给worker执行:
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
这里的worker是HandlerWorker对象,继续看它的schedule方法:
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
//省略一些判断
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
到这里,总算守得云开见日出。通过Handler发送消息把任务切换到主线程执行。
这个任务就是刚才提到的ObserveOnObserver,我们看一下它的run方法:
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
这里会根据outputFused走不通的逻辑,正常情况下都会走else逻辑,我们就只分析这条分支。
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
这段代码比较长,但仔细看下来,逻辑还是比较简单,就是从queue中获取数据,然后把数据交给actual(LambdaObserver)的onNext方法。到这里,经过转换的数据交给我们传入的Comsumer,在主线程中处理,observeOn切换线程的逻辑分析完毕。
还是可以看回那张图,当执行ObserveOnObserver.onNext时,就从子线程切换回UI线程,箭头变成粉色。
总结:observeOn作用于onNext阶段。
总结
这篇文章很长,相信看完的同学肯定会有所收获,对RxJava2有更好的认识。
当然,除了本文举例场景中的操作符,RxJava2还提供了很有强的而好用的操作符,各位同学可以学习学习。