RxJava线程切换-源码解析(二)
2019-10-26 本文已影响0人
pj0579
首先明确一点线程切换一定需要Handler
的参与
线程切换的代码最简单的做法大概如下代码
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "subscribe" + Thread.currentThread());
}
// ObservableOnSubscribe 运行在工作线程
}).subscribeOn(Schedulers.newThread()).
// 观察者运行在主线程 这段代码获取了主线程的handler,之后的线程切换就需要他的参与
observeOn(AndroidSchedulers.mainThread()).
subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe" + Thread.currentThread());
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "error");
}
@Override
public void onComplete() {
Log.d(TAG, "complete");
}
});
subscribeOn(Schedulers.newThread()).
代码指定ObservableOnSubscribe
运行在工作线程
看下代码怎么实现?
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
// 返回的还是 ObservableSubscribeOn(是Observable 的子类)对象 也是调用里面的subscribeActual,默认是调用ObservableCreate 的subscribeActual方法
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
然后调用subscribe
到
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
里面可以调用subscribeActual
方法
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
// s代表注册的Obersver还是直接执行
s.onSubscribe(parent);
// 这次没有直接执行代码写的subscribe方法 而是setDisposable
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
看下SubscribeTask类 实现了Runnable接口
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
// 在工作线程执行上游subscribe
source.subscribe(parent);
}
}
接着往下看 就是一些线程池 线程管理的事情
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
// 在这提交到了线程池
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}