2019-11-08 Rxjava 源码解析<3>
上一篇,我们看了如何使用.subscribeOn(Schedulers.io())方法就能使subscribe在子线程中运行,接下来继续看,如何在切回主线程
var sources = object : ObservableOnSubscribe<String> {
override fun subscribe(emitter: ObservableEmitter<String>) {
emitter.onNext("下一步")
emitter.onComplete()
}
}
var observable = Observable.create(sources)
var observable1 = observable.subscribeOn(Schedulers.io())
var observable2 = observable1.observeOn(AndroidSchedulers.mainThread())
var observer = object :Observer<String>{
override fun onComplete() {
}
override fun onSubscribe(d: Disposable) {
}
override fun onNext(t: String) {
}
override fun onError(e: Throwable) {
}
}
observable2.subscribe(observer)
前面都分析了,看observable1.observeOn(AndroidSchedulers.mainThread())做了什么
public final Observable<T> observeOn(Scheduler scheduler) {
//bufferSize() 缓冲大小
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
//做判断
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
//返回ObservableObserveOn对象
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
所以observable1.observeOn(AndroidSchedulers.mainThread())返回的是ObservableObserveOn对象,那么当调用 observable2.subscribe(observer)方法时,就是调用ObservableObserveOn.subscribe()方法,根据前两篇可以知道,就会进入ObservableObserveOn的subscribeActual()方法中
@Override
protected void subscribeActual(Observer<? super T> observer) {
//判断scheduler的类型
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
首先会判断scheduler的类型,scheduler的类型就是AndroidSchedulers.mainThread()返回的对象,看这个返回什么
public static Scheduler mainThread() {
//返回MAIN_THREAD的值
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
//这里会调用call方法,所以返回MainHolder.DEFAULT
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
//静态内部类,所以返回的是HandlerScheduler对象
private static final class MainHolder {
static final Scheduler DEFAULT
= new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}
通过上面可以知道scheduler是HandlerScheduler对象,其中HandlerScheduler是直接继承Scheduler,所以scheduler不是TrampolineScheduler类型,回到subscribeActual()方法中
@Override
protected void subscribeActual(Observer<? super T> observer) {
//前面分析了,这里是false,会进入else分支
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//通过scheduler生成Worker
Scheduler.Worker w = scheduler.createWorker();
//绑定,其中source是上一层的observable,在这里就是observable1
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
根据前面的分析source.subscribe()就会进入ObserveOnObserver的onSubscribe方法中
//ObserveOnObserver的onSubscribe方法
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
//....
//初始化queue
queue = new SpscLinkedArrayQueue<T>(bufferSize);
//调用observer的onSubscribe方法,downstream就是我们的observer
downstream.onSubscribe(this);
}
}
这个方法主要就是初始化queue,根据前面,可以知道,当调用onNext,也会来到这里的onNext
//ObserveOnObserver的onNext
@Override
public void onNext(T t) {
//判断是否停止,为false
if (done) {
return;
}
//判断是否为不是异步
if (sourceMode != QueueDisposable.ASYNC) {
//将该消息放入queue中
queue.offer(t);
}
//调度
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
//调用worker.schedule()方法
worker.schedule(this);
}
}
又回到了worker.schedule方法,其中worker是通过HandlerScheduler.createWorker()生成的
@Override
public Worker createWorker() {
//返回HandlerWorker对象
return new HandlerWorker(handler, async);
}
当前worker是HandlerWorker对象,看schedule()方法
//注意,这里是三个参数的方法,调用的时候是调用一个参数的方法
//是因为在父类中一个参数的方法会调用三个参数的方法,所以来到这里
@Override
@SuppressLint("NewApi") // Async will only be true when the API is available to call.
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
//做判断
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
//判断是否解绑
if (disposed) {
return Disposables.disposed();
}
//检验run
run = RxJavaPlugins.onSchedule(run);
//创建ScheduledRunnable对象,其中ScheduledRunnable是实现Runnable
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
//生成Message
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
//async为false
if (async) {
message.setAsynchronous(true);
}
//通过handler发送消息
handler.sendMessageDelayed(message, unit.toMillis(delay));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
通过handler发送一个消息,所以会走到ScheduledRunnable的run方法中,注意,这里的handler的通过Looper.getMainLooper()方法获取,是主线程的handler,所以run是运行在主线程中的,这样就实现了从其他线程切换到主线程中
//ScheduledRunnable的run方法
@Override
public void run() {
try {
//调用delegate的run方法
delegate.run();
} catch (Throwable t) {
RxJavaPlugins.onError(t);
}
}
其中delegate就是我们传过来的run对象,也就是worker.schedule(this);传过来的this,所以又会回到ObserveOnObserver的run方法中
@Override
public void run() {
//outputFused默认为false
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
void drainNormal() {
int missed = 1;
//获取queue,是在onSubscribe方法中初始化的
final SimpleQueue<T> q = queue;
//获取Observer,也就是我们的observer
final Observer<? super T> a = downstream;
//死循环
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
//双重死循环
for (;;) {
//判断是否已经停止
boolean d = done;
T v;
try {
//获取onNext(T t)中传过来的参数
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
disposed = true;
upstream.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
//判断是否获取到
boolean empty = v == null;
//如果没有获取到,则调用observer.onComplete()方法,并跳出循环
if (checkTerminated(d, empty, a)) {
return;
}
//没有获取到,跳出循环
if (empty) {
break;
}
//调用observer.onNext并将参数传过去
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
之后就走到我们自己的observer的onNext方法中,并且,此时是运行在主线程中的,这样就实现了从其他线程切换到主线程的功能。