Rxjava2 线程切换(3)
一、简介
前面说了Rxjava的用法和操作符原理,Rxjava的基于事件的响应式编程面向事件本身采用流式操作,让事件处理层次变得非常的清晰。但是如果没有线程变换,只是在单一的线程中操作,那就是花拳绣腿上不了台面。
好在Rxjava2提供了四种线程切换模式,而且Rxjava2让线程切换变得异常的简单,只需要一行代码就可以实现。往往简单的这一行代码,内部的实现确实精妙的。所以这一章就来说说Rxjava2的线程变换原理
先来总结下Rxjava的线程切换用法:
.observeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
- subscribeOn是指定之前的事件流处理的线程
- observeOn 是指定之后的事件流处理的线程
并且提供了四种类型线程模式:
- AndroidSchedulers.mainThread() : 主线程
- Schedulers.io() : io操作的线程, 通常用于网络,读写文件等io密集型的操作
- Schedulers.computation() : CPU计算密集型的操作
- Schedulers.newThread() : 新建一个线程
observeOn和subscribeOn方法都是Observable中的方法,他们也是在链式调用之内,所以这两个方法同其他的变换操作符一致,必定是接受旧的Observable,包装成一个新的Observable并返回。
此外android中的子线程是采用Runnable接口,主线程是使用Handler(Looper.getMainLooper()) handler机制,所以我们可以肯定Rxjava的线程变换无非是对子线程和主线程Handler的二次封装。
二、线程调度模块
线程调度上图是以采用Handler调度的方式为例,并以此图来分析该模型。
①、Scheduler
先看下Scheduler类,此类是数据调度的抽象类,该对象可获得一个Worker对象,真实的数据调度是在Worker中的。
精简代码
public abstract class Scheduler {
// 每一个Scheduler 对应一个Worker用来实现真实的线程切换
public abstract Worker createWorker();
// 立即的执行Runnable
public Disposable scheduleDirect(Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
// 推迟delay事件调度数据
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
// 第一步 得到子类的Worker
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// 使用worker真实的执行Runnable
w.schedule(new Runnable() {
@Override
public void run() {
try {
decoratedRun.run();
} finally {
w.dispose();
}
}
}, delay, unit);
return w;
}
}
Scheduler中主要完成以下几件事:
- 创建一个真实的线程切换的执行者Worker
- 外界通过Scheduler对象执行scheduleDirect方法,来开始调度处理。
从中可以看到Scheduler只是一个抽象调度,代理机制来调度。从代码中可以看到Scheduler很简单,是出于上层的抽象类,scheduleDirect方法中,就是创建一个Worker执行者,来执行传入的Runnable。并且scheduleDirect方法若不满足,还是可以在子类中根据自己的需求重写。
②、Worker执行者
精简代码:
public abstract static class Worker implements Disposable {
public Disposable schedule(Runnable run) {
return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
public abstract Disposable schedule(Runnable run, long delay, TimeUnit unit);
}
从代码中可以看到,这两个方法和Scheduler中的方法很像,除了方法名不一样,其他的都相似。也就是Scheduler和Worker就是一个静态的代理。
问题:为什么执行会返回Disposable了?
因为在处理事件流过程中可能会取消事件流,返回的Disposable可以控制停止线程调度。
③、ScheduledRunnable被执行的对象
ScheduledRunnable是线程切换的被执行单元,这里对Runnable进行了二次封装,增加了取消的功能。
private static final class ScheduledRunnable implements Runnable, Disposable {
private final Handler handler;
private final Runnable delegate;
private volatile boolean disposed;
ScheduledRunnable(Handler handler, Runnable delegate) {
this.handler = handler;
this.delegate = delegate;
}
@Override
public void run() {
try {
delegate.run();
} catch (Throwable t) {
IllegalStateException ie =
new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
RxJavaPlugins.onError(ie);
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
}
}
@Override
public void dispose() {
disposed = true;
handler.removeCallbacks(this);
}
@Override
public boolean isDisposed() {
return disposed;
}
}
这里就不需要过多的解释,在run方法中直接执行传入的Runnable对象,因为调度是在Handler中,用Handler发送个Runnable对象,所以取消也是用Handler去remove。也就是说ScheduledRunnable中传入了Runnable和Handler两个对象。
④、HandlerWorker具体的Runnable执行者
Worker是个抽象类,具体的实现还得由子类完成。这里看下具体的HandlerWorker。
private static final class HandlerWorker extends Worker {
private final Handler handler;
private volatile boolean disposed;
HandlerWorker(Handler handler) {
this.handler = handler;
}
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
@Override
public void dispose() {
disposed = true;
handler.removeCallbacksAndMessages(this /* token */);
}
@Override
public boolean isDisposed() {
return disposed;
}
}
因为Handler是执行者,所以构造方法先把Handler传入进来。然后看下核心方法schedule。
// 1、传入的Runnable分装成ScheduledRunnable
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
// 2、创建一个可以执行Runnable的Message
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
// 3、使用Handler推迟时间执行该scheduled
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
// 4、如果取消了就用Handler移除
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
⑤、HandlerScheduler具体的调度者
final class HandlerScheduler extends Scheduler {
private final Handler handler;
HandlerScheduler(Handler handler) {
this.handler = handler;
}
@Override
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
handler.postDelayed(scheduled, Math.max(0L, unit.toMillis(delay)));
return scheduled;
}
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
HandlerScheduler中复写了scheduleDirect中方法,所以他不会去代理Worker执行Runnable。
scheduleDirect中的逻辑很少,就是用Handler处理一个定时任务。然后就结束了。
使用:
直接调用HandlerScheduler对象的scheduleDirect(Runnable run)方法,最终执行的就是HandlerScheduler中的scheduleDirect(Runnable run, long delay, TimeUnit unit)方法。本质就是使用Handler执行Runnable。
从线程切换源码可以看出本质上还是对Handler和Runnable的封装。并没从本质上更改。下面介绍下线程切换的具体使用。
三、subscribeOn
subscribeOn的意思就是指定在此之前的事件流执行的线程。从代码中可以看到,subscribeOn就是对原来的Observable封装下,转变成ObservableSubscribeOn并返回。
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
直接看ObservableSubscribeOn源码:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
}
ObservableSubscribeOn中将scheduler传入进来,作为触发线程切换的对象。重点代码再subscribeActual方法中。
- 1、先将原来的Observer包装成新的SubscribeOnObserver
- 2、调用老的Observer得onSubscribe回调方法
- 3、parent设置Disposable,保证Disposable唯一性。方法中执行的是线程的调度,也就是将 source.subscribe(parent);代码切换到了指定的线程中。即指定了事件流的最开始的触发位置的线程。
#######注意:subscribeOn方法只有第一个设置的才生效,原因很简单它的执行顺序是后面设置的先执行,会导致最开始的会将原来设置的覆盖了。
四、observeOn
observeOn的意思就是指定在此之后的事件流执行的线程。从代码中可以看到,observeOn就是对原来的Observable封装下,转变成ObservableSubscribeOn并返回。
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
核心的代码还是在ObservableObserveOn中。猜想下observeOn中是决定后面事件流的执行线程,所以肯定不是切换subscribe方法,而是在Observer中的onNext方法中切换。
首先看下ObservableObserveOn#subscribeActual方法:
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 1、如果是上游和下游是同一个线程,那么就不管,该怎么办就怎么办
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
// 2、得到切换线程的执行者
Scheduler.Worker w = scheduler.createWorker();
// 3、封装老的observer,并将worker对象传入
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
subscribeActual中先要判断上游和下游的线程是不是在同一个线程中,就分为了两种情况,如果在不同的线程中,那就需要二次加工了。接下来在看看ObserveOnObserver中的处理。
ObserveOnObserver中的代码有点长,略加删除了些。
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
private static final long serialVersionUID = 6576896619930983584L;
final Observer<? super T> actual;
final Scheduler.Worker worker;
final boolean delayError;
final int bufferSize;
SimpleQueue<T> queue;
Disposable s;
Throwable error;
volatile boolean done;
volatile boolean cancelled;
int sourceMode;
boolean outputFused;
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) s;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
actual.onSubscribe(this);
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
actual.onSubscribe(this);
return;
}
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
actual.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;
schedule();
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
schedule();
}
@Override
public void dispose() {
if (!cancelled) {
cancelled = true;
s.dispose();
worker.dispose();
if (getAndIncrement() == 0) {
queue.clear();
}
}
}
@Override
public boolean isDisposed() {
return cancelled;
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
@Override
public T poll() throws Exception {
return queue.poll();
}
@Override
public void clear() {
queue.clear();
}
@Override
public boolean isEmpty() {
return queue.isEmpty();
}
}
因为onNext之后才处理下游的事件,所以先看下onNext方法。
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
1、onNext中主要就干了两件事,一件事将Event事件插入到Queue队列中,第二件就是执行schedule();,也就是取出事件,并消费。
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
2、schedule()中处理的就更加简单了, worker.schedule(this);直接用worker切换线程,因为该类是继承自Runnable,所以最终会执行到自己的run方法。
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
3、run方法是针对不同的情况分别处理了。直接看drainNormal();
4、drainNormal方法主要就做了两件事,一件是从队列中取出一个事件,如果没有事件了就停止消费。另一个就是将获得的事件用onNext执行,因为执行的地方是在另一个线程,那么后面的事件处理也就切换了线程。
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}