RxJava->如何进行线程管理
这篇笔记主要解决一下几个问题:
1. Schedulers.newThread()做了哪些事;
2. Disposable.dispose()如何关掉当前任务;
3. 多线程通信如何用链式实现;
private void rxJavaStartTask() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {...}
})
.subscribeOn(Schedulers.newThread())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {...}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {...}
@Override
public void onNext(Integer value) {...}
@Override
public void onError(Throwable e) {...}
@Override
public void onComplete() {...}
});
}
private void rxJavaCancelTask() {
mDisposable.dispose();
}
这篇笔记主要围绕上面两段代码进行分析;
Schedulers.newThread():
public final class Schedulers {
public static Scheduler newThread() {
return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}
static final Scheduler NEW_THREAD;
static {
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
return NewThreadHolder.DEFAULT;
}
});
}
static final class NewThreadHolder {
static final Scheduler DEFAULT = NewThreadScheduler.instance();
}
}
public final class NewThreadScheduler extends Scheduler {
public static NewThreadScheduler instance() {
return INSTANCE;
}
private static final NewThreadScheduler INSTANCE = new NewThreadScheduler();
@Override
public Worker createWorker() {
return new NewThreadWorker(THREAD_FACTORY);
}
}
1、Schedulers.newThread()返回NewThreadScheduler, NewThreadScheduler实际是一个单例类;
2、NewThreadScheduler内部有一个createWorker()的方法, 待后面用到再做分析;
Observable.subscribeOn() :
public abstract class Observable<T> implements ObservableSource<T> {
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
return new ObservableSubscribeOn<T>(this, scheduler);
}
}
1、调用.subscribeOn(Schedulers.newThread())之后说明之后的操作是在子线程中执行, 并且返回ObservableSubscribeOn对象的引用;
2、调用ObservableSubscribeOn构造函数时传入的this实际指向调用ObservableCreate;
Observable.doOnNext() :
public abstract class Observable<T> implements ObservableSource<T> {
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> doOnNext(Consumer<? super T> onNext) {
return doOnEach(onNext, Functions.emptyConsumer(), Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
}
@SchedulerSupport(SchedulerSupport.NONE)
private Observable<T> doOnEach(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Action onAfterTerminate) {
return new ObservableDoOnEach<T>(this, onNext, onError, onComplete, onAfterTerminate);
}
}
public final class ObservableDoOnEach<T> extends AbstractObservableWithUpstream<T, T> {
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Action onAfterTerminate;
public ObservableDoOnEach(ObservableSource<T> source, Consumer<? super T> onNext,
Consumer<? super Throwable> onError,
Action onComplete,
Action onAfterTerminate) {
super(source);
this.onNext = onNext;
this.onError = onError;
this.onComplete = onComplete;
this.onAfterTerminate = onAfterTerminate;
}
}
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U>
调用Obeservable.doOnNext()返回ObservableDoOnEach对象, ObservableDoOnEach对象持有Observe的引用, Observe在上文Observable.subscribeOn()时指向了ObservableSubscribeOn;
Observable.observeOn(AndroidSchedulers.mainThread()):
public final class AndroidSchedulers {
private static final class MainHolder {
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
private static final Scheduler MAIN_THREAD = new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
};
public static Scheduler mainThread() {
return MAIN_THREAD;
}
public static Scheduler from(Looper looper) {
return new HandlerScheduler(new Handler(looper));
}
private AndroidSchedulers() {
throw new AssertionError("No instances.");
}
}
MAIN_THREAD实际是一个常量, 也就说不管调用Observable.observeOn(AndroidSchedulers.mainThread())多少次, 都只会创建一个HandlerScheduler, 该HandlerScheduler持有一个Handler;
Observable.observeOn(...):
public abstract class Observable<T> implements ObservableSource<T> {
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
return new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize);
}
}
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler...) {
/**
* source实际指向ObservableDoOnEach;
*/
super(source);
this.scheduler = scheduler;
}
}
Observable.subscribe(...) :
public abstract class Observable<T> implements ObservableSource<T> {
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
subscribeActual(observer);
}
protected abstract void subscribeActual(Observer<? super T> observer);
}
1、subscribeActual(...)被ObservableObserveOn_observeOn实现;
2、传入Observer指向Observer_subscribe;
ObservableObserveOn_observeOn.subscribeActual(...) :
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
@Override
protected void subscribeActual(Observer<? super T> observer) {
/**
* 1. 由前边observeOn(AndroidSchedulers.mainThread())指定线程知道createWorker()被Scheduler的子类HandlerScheduler实现;
* 所以Worker实际指向HandlerWorker;
* 2. source实际指向doOnNext(...)返回的ObservableDoOnEach, 后续为了方便,
* 在ObservableObserveOn后缀加上方法名;
*/
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
final class HandlerScheduler extends Scheduler {
private final Handler handler;
HandlerScheduler(Handler handler) {
this.handler = handler;
}
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
}
1. source实际指向ObservableDoOnEach_doOnNext;
2. 调用ObservableDoOnEach_doOnNext.subscribeActual(...)传入ObserveOnObserver_observeOn;
ObservableDoOnEach_doOnNext.subscribeActual(...) :
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
@Override
public void subscribeActual(Observer<? super T> t) {
/**
* 往上推source指向.subscribeOn(Schedulers.newThread())指定线程时返回的ObservableSubscribeOn;
*/
source.subscribe(new DoOnEachObserver<T>(t, onNext, onError, onComplete, onAfterTerminate));
}
}
1. source实际指向ObservableSubscribeOn_subscribeOn;
2. 调用ObservableSubscribeOn_subscribeOn.subscribeActual(...)传入DoOnEachObserver_doOnNext;
ObservableSubscribeOn_subscribeOn.subscribeActual(...) :
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
@Override
public void subscribeActual(final Observer<? super T> s) {
/**
* 1. 创建SubscribeOnObserver_subscribeOn对象;
* 2. DoOnEachObserver_doOnNext中传入SubscribeOnObserver_subscribeOn对象的引用;
*/
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
}
}
/**
* 1. SubscribeOnObserver.actual指向DoOnEachObserver_doOnNext;
* 2. DoOnEachObserver_doOnNext持有SubscribeOnObserver_subscribeOn的引用;
*/
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
DoOnEachObserver_doOnNext.onSubscribe() :
static final class DoOnEachObserver<T> implements Observer<T>, Disposable {
final Observer<? super T> actual;
final Consumer<? super T> onNext;
Disposable s;
/**
* this.actual指向ObserveOnObserver_observeOn(...)
*/
DoOnEachObserver(Observer<? super T> actual, Consumer<? super T> onNext...) {
this.actual = actual;
this.onNext = onNext;
...
}
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
actual.onSubscribe(this);
}
}
}
调用ObserveOnObserver_observeOn中的onSubscribe方法并传入DoOnEachObserver_doOnNext(...);
ObserveOnObserver_observeOn.onSubscribe(...) :
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
final Observer<? super T> actual;
final Scheduler.Worker worker;
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker...) {
this.actual = actual;
this.worker = worker;
...
}
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {...}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
actual.onSubscribe(this);
}
}
}
actual实际指向Observer_subscribe(...) , 即下图中的onSubscribe(...)方法;
同时也说明了onSubscribe(...){...}在主线程中执行;
继续回到ObservableSubscribeOn_subscribeOn.subscribeActual(...)中:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
@Override
public void subscribeActual(final Observer<? super T> s) {
/**
* 1. 创建SubscribeOnObserver_subscribeOn对象;
* 2. DoOnEachObserver_doOnNext中传入SubscribeOnObserver_subscribeOn对象的引用;
*/
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
/**
* 以递归的方式在主线程中调用onSubscribe(...)提供Disposable
*/
s.onSubscribe(parent);
/**
* 重点来了, 如何操作线程池, 如何实现线程间通信, 如何实现链式调用;
*/
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
}
}
NewThreadScheduler.scheduleDirect(...) :
public final class NewThreadScheduler extends Scheduler {
...
}
public abstract class Scheduler {
public Disposable scheduleDirect(Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
/**
* createWorker()被Scheduler子类NewThreadScheduler实现;
*/
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
w.schedule(new Runnable() {
@Override
public void run() {
try {
decoratedRun.run();
} finally {
w.dispose();
}
}
}, delay, unit);
return w;
}
}
public final class NewThreadScheduler extends Scheduler {
@Override
public Worker createWorker() {
return new NewThreadWorker(THREAD_FACTORY);
}
}
NewThreadWorker.schedule(...) :
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
@Override
public Disposable schedule(final Runnable action, long delayTime, TimeUnit unit) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}
return scheduleActual(action, delayTime, unit, null);
}
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {...}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {...}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
parent.remove(sr);
RxJavaPlugins.onError(ex);
}
return sr;
}
}
public final class SchedulerPoolFactory {
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
if (exec instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
POOLS.put(e, exec);
}
return exec;
}
}
1、每次调用.subscribeOn(Schedulers.newThread())时, 都会根据单例类NewThreadScheduler与创建一个NewThreadWorker的实例, 而每一个Worker内部又维护了一个ScheduledExecutorService;
2、创建的ScheduledExecutorService又会被缓存在SchedulerPoolFactory中的POOLS中, 目前有一个疑问, 缓存之后如何做到复用的? <TODO>
ObservableSubscribeOn_subscribeOn.subscribeActual(...) :
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
/**
* source指向ObservableCreate.create(...)
*/
source.subscribe(parent);
}
}));
}
}
source指向ObservableCreate.create(...);
ObservableCreate_create.subscribeActual(...) :
public final class ObservableCreate<T> extends Observable<T> {
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
/**
* source指向Observable.create(new ObservableOnSubscribe<T>())时创建的传入的ObservableOnSubscribe;
*/
source.subscribe(parent);
}
}
1. observer指向SubscribeOnObserver_subscribeOn;
2. CreateEmitter持有SubscribeOnObserver_subscribeOn的引用;
3. observer.onSubscribe(CreateEmitter)持有CreateEmitter的引用;
CreateEmitter.onNext(...) :
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
@Override
public void onNext(T t) {
if (!isDisposed()) {
observer.onNext(t);
}
}
}
observer指向SubscribeOnObserver_subscribeOn, 继续向下分析:
SubscribeOnObserver_subscribeOn.onNext(...) :
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
final Observer<? super T> actual;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
}
actual指向DoOnEachObserver_doOnNext, 继续向下分析:
DoOnEachObserver_doOnNext.onNext(...) :
static final class DoOnEachObserver<T> implements Observer<T>, Disposable {
DoOnEachObserver(Observer<? super T> actual, Consumer<? super T> onNext...) {
this.actual = actual;
this.onNext = onNext;
...
}
@Override
public void onNext(T t) {
if (done) {
return;
}
onNext.accept(t);
actual.onNext(t);
}
}
actual指向ObservableObserveOn_observeOn, 继续向下分析:
ObservableObserveOn_observeOn.onNext(...) :
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {...}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
}
HandlerWorker.schedule(...) :
private static final class HandlerWorker extends Worker {
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
}