RxJava->如何进行线程管理

2018-01-27  本文已影响60人  冉桓彬
这篇笔记主要解决一下几个问题:
1. Schedulers.newThread()做了哪些事;
2. Disposable.dispose()如何关掉当前任务;
3. 多线程通信如何用链式实现;
private void rxJavaStartTask() {
    Observable
            .create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {...}
            })
            .subscribeOn(Schedulers.newThread())
            .doOnNext(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {...}
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {...}

                @Override
                public void onNext(Integer value) {...}

                @Override
                public void onError(Throwable e) {...}

                @Override
                public void onComplete() {...}
            });
    }

private void rxJavaCancelTask() {
    mDisposable.dispose();
}

  这篇笔记主要围绕上面两段代码进行分析;

Schedulers.newThread():
public final class Schedulers {
    public static Scheduler newThread() {
        return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
    }
    static final Scheduler NEW_THREAD;
    static {
        NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new Callable<Scheduler>() {
            @Override
            public Scheduler call() throws Exception {
                return NewThreadHolder.DEFAULT;
            }
        });
    }
    static final class NewThreadHolder {
        static final Scheduler DEFAULT = NewThreadScheduler.instance();
    }
}

public final class NewThreadScheduler extends Scheduler {
    public static NewThreadScheduler instance() {
        return INSTANCE;
    }
    private static final NewThreadScheduler INSTANCE = new NewThreadScheduler();
    @Override
    public Worker createWorker() {
        return new NewThreadWorker(THREAD_FACTORY);
    }
}

  1、Schedulers.newThread()返回NewThreadScheduler, NewThreadScheduler实际是一个单例类;
  2、NewThreadScheduler内部有一个createWorker()的方法, 待后面用到再做分析;

Observable.subscribeOn() :
public abstract class Observable<T> implements ObservableSource<T> {
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        return new ObservableSubscribeOn<T>(this, scheduler);
    }
}

  1、调用.subscribeOn(Schedulers.newThread())之后说明之后的操作是在子线程中执行, 并且返回ObservableSubscribeOn对象的引用;
  2、调用ObservableSubscribeOn构造函数时传入的this实际指向调用ObservableCreate;

Observable.doOnNext() :
public abstract class Observable<T> implements ObservableSource<T> {
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Observable<T> doOnNext(Consumer<? super T> onNext) {
        return doOnEach(onNext, Functions.emptyConsumer(), Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
    }
    @SchedulerSupport(SchedulerSupport.NONE)
    private Observable<T> doOnEach(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Action onAfterTerminate) {
        return new ObservableDoOnEach<T>(this, onNext, onError, onComplete, onAfterTerminate);
    }
}

public final class ObservableDoOnEach<T> extends AbstractObservableWithUpstream<T, T> {
    final Consumer<? super T> onNext;
    final Consumer<? super Throwable> onError;
    final Action onComplete;
    final Action onAfterTerminate;

    public ObservableDoOnEach(ObservableSource<T> source, Consumer<? super T> onNext,
                              Consumer<? super Throwable> onError,
                              Action onComplete,
                              Action onAfterTerminate) {
        super(source);
        this.onNext = onNext;
        this.onError = onError;
        this.onComplete = onComplete;
        this.onAfterTerminate = onAfterTerminate;
    }
}

abstract class AbstractObservableWithUpstream<T, U> extends Observable<U>

  调用Obeservable.doOnNext()返回ObservableDoOnEach对象, ObservableDoOnEach对象持有Observe的引用, Observe在上文Observable.subscribeOn()时指向了ObservableSubscribeOn;

Observable.observeOn(AndroidSchedulers.mainThread()):
public final class AndroidSchedulers {

    private static final class MainHolder {

        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = new Callable<Scheduler>() {
                                                     @Override public Scheduler call() throws Exception {
                                                         return MainHolder.DEFAULT;
                                                     }
                                                 };

    public static Scheduler mainThread() {
        return MAIN_THREAD;
    }
    public static Scheduler from(Looper looper) {
        return new HandlerScheduler(new Handler(looper));
    }

    private AndroidSchedulers() {
        throw new AssertionError("No instances.");
    }
}

  MAIN_THREAD实际是一个常量, 也就说不管调用Observable.observeOn(AndroidSchedulers.mainThread())多少次, 都只会创建一个HandlerScheduler, 该HandlerScheduler持有一个Handler;

Observable.observeOn(...):
public abstract class Observable<T> implements ObservableSource<T> {
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        return new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize);
    }
}
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler...) {
        /**
         * source实际指向ObservableDoOnEach;
         */
        super(source);
        this.scheduler = scheduler;
    }
}
Observable.subscribe(...) :
public abstract class Observable<T> implements ObservableSource<T> {
    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        subscribeActual(observer);
    }
    protected abstract void subscribeActual(Observer<? super T> observer);
}

  1、subscribeActual(...)被ObservableObserveOn_observeOn实现;
  2、传入Observer指向Observer_subscribe;

ObservableObserveOn_observeOn.subscribeActual(...) :
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        /**
         * 1. 由前边observeOn(AndroidSchedulers.mainThread())指定线程知道createWorker()被Scheduler的子类HandlerScheduler实现;
         *    所以Worker实际指向HandlerWorker;
         * 2. source实际指向doOnNext(...)返回的ObservableDoOnEach, 后续为了方便, 
         *    在ObservableObserveOn后缀加上方法名;
         */
        Scheduler.Worker w = scheduler.createWorker();
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

final class HandlerScheduler extends Scheduler {
    private final Handler handler;

    HandlerScheduler(Handler handler) {
        this.handler = handler;
    }

    @Override
    public Worker createWorker() {
        return new HandlerWorker(handler);
    }
}

  1. source实际指向ObservableDoOnEach_doOnNext;
  2. 调用ObservableDoOnEach_doOnNext.subscribeActual(...)传入ObserveOnObserver_observeOn;

ObservableDoOnEach_doOnNext.subscribeActual(...) :
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    @Override
    public void subscribeActual(Observer<? super T> t) {
        /**
         * 往上推source指向.subscribeOn(Schedulers.newThread())指定线程时返回的ObservableSubscribeOn;
         */
        source.subscribe(new DoOnEachObserver<T>(t, onNext, onError, onComplete, onAfterTerminate));
    }
}

  1. source实际指向ObservableSubscribeOn_subscribeOn;
  2. 调用ObservableSubscribeOn_subscribeOn.subscribeActual(...)传入DoOnEachObserver_doOnNext;

ObservableSubscribeOn_subscribeOn.subscribeActual(...) :
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    @Override
    public void subscribeActual(final Observer<? super T> s) {
        /**
         * 1. 创建SubscribeOnObserver_subscribeOn对象;
         * 2. DoOnEachObserver_doOnNext中传入SubscribeOnObserver_subscribeOn对象的引用;
         */
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        
        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
            @Override
            public void run() {
                source.subscribe(parent);
            }
        }));
    }
}
/**
 * 1. SubscribeOnObserver.actual指向DoOnEachObserver_doOnNext;
 * 2. DoOnEachObserver_doOnNext持有SubscribeOnObserver_subscribeOn的引用;
 */
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

    final Observer<? super T> actual;

    final AtomicReference<Disposable> s;

    SubscribeOnObserver(Observer<? super T> actual) {
        this.actual = actual;
        this.s = new AtomicReference<Disposable>();
    }

    @Override
    public void onSubscribe(Disposable s) {
        DisposableHelper.setOnce(this.s, s);
    }

    @Override
    public void onNext(T t) {
        actual.onNext(t);
    }

    @Override
    public void onComplete() {
        actual.onComplete();
    }

    @Override
    public void dispose() {
        DisposableHelper.dispose(s);
        DisposableHelper.dispose(this);
    }

    void setDisposable(Disposable d) {
        DisposableHelper.setOnce(this, d);
    }
}
DoOnEachObserver_doOnNext.onSubscribe() :
static final class DoOnEachObserver<T> implements Observer<T>, Disposable {
    final Observer<? super T> actual;
    final Consumer<? super T> onNext;
    Disposable s;
    /**
     * this.actual指向ObserveOnObserver_observeOn(...)
     */
    DoOnEachObserver(Observer<? super T> actual, Consumer<? super T> onNext...) {
        this.actual = actual;
        this.onNext = onNext;
        ...
    }

    @Override
    public void onSubscribe(Disposable s) {
        if (DisposableHelper.validate(this.s, s)) {
            this.s = s;
            actual.onSubscribe(this);
        }
    }
}

  调用ObserveOnObserver_observeOn中的onSubscribe方法并传入DoOnEachObserver_doOnNext(...);

ObserveOnObserver_observeOn.onSubscribe(...) :
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {
    final Observer<? super T> actual;
    final Scheduler.Worker worker;
    ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker...) {
        this.actual = actual;
        this.worker = worker;
        ...
    }
    @Override
    public void onSubscribe(Disposable s) {
        if (DisposableHelper.validate(this.s, s)) {
            this.s = s;
            if (s instanceof QueueDisposable) {...}
            queue = new SpscLinkedArrayQueue<T>(bufferSize);
            actual.onSubscribe(this);
        }
    }
}

  actual实际指向Observer_subscribe(...) , 即下图中的onSubscribe(...)方法;

image.png
  同时也说明了onSubscribe(...){...}在主线程中执行;
继续回到ObservableSubscribeOn_subscribeOn.subscribeActual(...)中:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    @Override
    public void subscribeActual(final Observer<? super T> s) {
        /**
         * 1. 创建SubscribeOnObserver_subscribeOn对象;
         * 2. DoOnEachObserver_doOnNext中传入SubscribeOnObserver_subscribeOn对象的引用;
         */
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        /**
         * 以递归的方式在主线程中调用onSubscribe(...)提供Disposable
         */
        s.onSubscribe(parent);
        /**
         * 重点来了, 如何操作线程池, 如何实现线程间通信, 如何实现链式调用;
         */
        parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
            @Override
            public void run() {
                source.subscribe(parent);
            }
        }));
    }
}
NewThreadScheduler.scheduleDirect(...) :
public final class NewThreadScheduler extends Scheduler {
    ...
}

public abstract class Scheduler {
    public Disposable scheduleDirect(Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }

    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        /**
         * createWorker()被Scheduler子类NewThreadScheduler实现;
         */
        final Worker w = createWorker();
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        w.schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    decoratedRun.run();
                } finally {
                    w.dispose();
                }
            }
        }, delay, unit);

        return w;
    }
}

public final class NewThreadScheduler extends Scheduler {
    @Override
    public Worker createWorker() {
        return new NewThreadWorker(THREAD_FACTORY);
    }
}
NewThreadWorker.schedule(...) :
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }
    @Override
    public Disposable schedule(final Runnable action, long delayTime, TimeUnit unit) {
        if (disposed) {
            return EmptyDisposable.INSTANCE;
        }
        return scheduleActual(action, delayTime, unit, null);
    }

    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
        if (parent != null) {...}
        Future<?> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {...}
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            parent.remove(sr);
            RxJavaPlugins.onError(ex);
        }
        return sr;
    }
}

public final class SchedulerPoolFactory {
    public static ScheduledExecutorService create(ThreadFactory factory) {
        final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
        if (exec instanceof ScheduledThreadPoolExecutor) {
            ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
            POOLS.put(e, exec);
        }
        return exec;
    }
}

  1、每次调用.subscribeOn(Schedulers.newThread())时, 都会根据单例类NewThreadScheduler与创建一个NewThreadWorker的实例, 而每一个Worker内部又维护了一个ScheduledExecutorService;
  2、创建的ScheduledExecutorService又会被缓存在SchedulerPoolFactory中的POOLS中, 目前有一个疑问, 缓存之后如何做到复用的? <TODO>

ObservableSubscribeOn_subscribeOn.subscribeActual(...) :
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
            @Override
            public void run() {
                /**
                 * source指向ObservableCreate.create(...)
                 */
                source.subscribe(parent);
            }
        }));
    }
}

  source指向ObservableCreate.create(...);

ObservableCreate_create.subscribeActual(...) :
public final class ObservableCreate<T> extends Observable<T> {
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
        /**
         * source指向Observable.create(new ObservableOnSubscribe<T>())时创建的传入的ObservableOnSubscribe;
         */
        source.subscribe(parent);
    }
}

  1. observer指向SubscribeOnObserver_subscribeOn;
  2. CreateEmitter持有SubscribeOnObserver_subscribeOn的引用;
  3. observer.onSubscribe(CreateEmitter)持有CreateEmitter的引用;

CreateEmitter.onNext(...) :
static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
    @Override
    public void onNext(T t) {
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }
}

  observer指向SubscribeOnObserver_subscribeOn, 继续向下分析:

SubscribeOnObserver_subscribeOn.onNext(...) :
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
    final Observer<? super T> actual;
    SubscribeOnObserver(Observer<? super T> actual) {
        this.actual = actual;
    }
    @Override
    public void onNext(T t) {
        actual.onNext(t);
    }
}

  actual指向DoOnEachObserver_doOnNext, 继续向下分析:

DoOnEachObserver_doOnNext.onNext(...) :
static final class DoOnEachObserver<T> implements Observer<T>, Disposable {
    DoOnEachObserver(Observer<? super T> actual, Consumer<? super T> onNext...) {
        this.actual = actual;
        this.onNext = onNext;
        ...
    }
    @Override
    public void onNext(T t) {
        if (done) {
            return;
        }
        onNext.accept(t);
        actual.onNext(t);
    }
}

  actual指向ObservableObserveOn_observeOn, 继续向下分析:

ObservableObserveOn_observeOn.onNext(...) :
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {
    @Override
    public void onNext(T t) {
        if (done) {
            return;
        }
        if (sourceMode != QueueDisposable.ASYNC) {...}
        schedule();
    }
    void schedule() {
        if (getAndIncrement() == 0) {
            worker.schedule(this);
        }
    }
}
HandlerWorker.schedule(...) :
private static final class HandlerWorker extends Worker {
    @Override
    public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
        if (disposed) {
            return Disposables.disposed();
        }
        run = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        Message message = Message.obtain(handler, scheduled);
        message.obj = this; // Used as token for batch disposal of this worker's runnables.
        handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
        if (disposed) {
            handler.removeCallbacks(scheduled);
            return Disposables.disposed();
        }
        return scheduled;
    }
}
上一篇下一篇

猜你喜欢

热点阅读