RxJava-Observable分析
背景
看完Flowable的流控机制之后,对Observable的应对机制好奇 。希望在以后的应用过程中,不会犯一些不必要的错误。
环境与目的
RxJava版本信息如下:
'io.reactivex.rxjava2:rxandroid:2.0.1'
'io.reactivex.rxjava2:rxjava:2.1.0'
目的:探究Observable实现异步事件流的原理。
Obserable的构建过程
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
与Flowable构建过程类似,它也是通过ObservableCreate来构建Observable对象。而ObservableOnSubscribe实际上就是一个接口,接口中只有一个方法:
void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
用以通过发射器Emitte发射事件。
subscribe过程
该过程实际上是执行了subscribeActual,具体代码为:
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//observer.onSubscribe会首先执行
observer.onSubscribe(parent);
try {
//这个source是ObservableOnSubscribe对象
//该过程实际为通过Emitter发射事件
//parent就是Emitter,在该方法中为CreateEmitter
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
我们通过Emitter的onNext发射事件,在CreateEmitter中,具体为:
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
可以看到,每次onNext发射事件之后,由于生产和消费是在同一个线程中,马上就通过oberver.onNext进行消费。
即,每发射一个事件,该事件会立即被消费。
以上,观察者和被观察者者默认都运行在主线程中。
Observable涉及到不同运行线程时的构建流程
subscribeOn(Schedulers.newThread())
主要完成功能是将发射事件的过程置于一个新的线程中。
该方法实际返回了一个ObservableSubscribeOn对象,当然,这个类是Observable的子类:
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
其subscribeActual为:
public void subscribeActual(final Observer<? super T> s) {
//SubscribeOnObserver实现了对Observer的一层包装,将s包装成parent
//(1)
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//s 实际就是我们自己定义的Observer
//(2)
s.onSubscribe(parent);
//(3) 重点在这里
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); (3)
}
(1) 封装 s 为 parent 。
(2) 对外暴露parent,实际上s就是我们自己定义的Observer,我们可以在其onSubscribe中获取到parent。
(3) 首先我们来看一下scheduler。
我使用了一个新线程Schedulers.newThread()。该scheduler的构建过程如下:
//NEW_THREAD 是一个 Scheduler
public static Scheduler newThread() {
return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}
//进一步查看NEW_THREAD
--------->
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
//通过NewThreadTask() 的call方法,返回了NEW_THREAD
------>
public Scheduler call() throws Exception {
return NewThreadHolder.DEFAULT;
}
//DEFAULT 这个实际上就是我们使用的scheduler的本尊
------>
static final Scheduler DEFAULT = new NewThreadScheduler();
来看看它的scheduleDirect方法:
public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
try {
Future<?> f;
if (delayTime <= 0L) {
//从线程池中拿出线程执行了任务
f = executor.submit(task);
} else {
f = executor.schedule(task, delayTime, unit);
}
task.setFuture(f);
return task;
} catch (RejectedExecutionException ex) {
RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
}
}
而SubscribeTask就是一个Runnable任务,执行:
public void run() {
source.subscribe(parent);
}
以上,就是发射事件能够运行在新线程的根本,同时,后续消费事件默认也运行再该线程中(如果没有主动将消费事件的运行设置在另外一个线程中)。
我们应该注意到,对外暴露给我们的parent,是一个Disposable,它有一个dispose方法,该方法的具体实现在NewThreadWorker里:
@Override
public void dispose() {
if (!disposed) {
disposed = true;
executor.shutdownNow();
}
}
它关闭了整个线程池。
observeOn(AndroidSchedulers.mainThread())
主要作用是将消费线程置于新的独立线程中。
该方法同样将返回一个ObservableObserveOn对象,实际执行过程:
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
我们可以看到,和subscribeOn相比,这里就有了bufferSize的概念。bufferSize实际上是一个int常量128,不用关注。
它的subscribeActual方法为:
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
这里可以看到,事件的消费,实际上是通过ObserveOnObserver对象执行的。该对象的onNext方法为:
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
//将事件先存到队列中
queue.offer(t);
}
//消费事件
schedule();
}