Android学习之旅

RxJava线程切换-源码解析(二)

2019-10-26  本文已影响0人  pj0579

首先明确一点线程切换一定需要Handler的参与

线程切换的代码最简单的做法大概如下代码

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "subscribe" + Thread.currentThread());
            }

            // ObservableOnSubscribe 运行在工作线程
        }).subscribeOn(Schedulers.newThread()).
                // 观察者运行在主线程 这段代码获取了主线程的handler,之后的线程切换就需要他的参与
                observeOn(AndroidSchedulers.mainThread()).
                subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe" + Thread.currentThread());
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "" + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "error");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "complete");
                    }
                });

subscribeOn(Schedulers.newThread()).代码指定ObservableOnSubscribe运行在工作线程
看下代码怎么实现?

public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        // 返回的还是 ObservableSubscribeOn(是Observable 的子类)对象 也是调用里面的subscribeActual,默认是调用ObservableCreate 的subscribeActual方法
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

然后调用subscribe

public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

里面可以调用subscribeActual方法

public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        // s代表注册的Obersver还是直接执行
        s.onSubscribe(parent);
        // 这次没有直接执行代码写的subscribe方法 而是setDisposable
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

看下SubscribeTask类 实现了Runnable接口

final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            // 在工作线程执行上游subscribe
            source.subscribe(parent);
        }
    }

接着往下看 就是一些线程池 线程管理的事情

    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }

    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
       
        final Worker w = createWorker();
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); 
        DisposeTask task = new DisposeTask(decoratedRun, w);
        w.schedule(task, delay, unit);
        return task;
    }

    public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }
            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        Future<?> f;
        try {
            if (delayTime <= 0) {
                // 在这提交到了线程池
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }
        return sr;
    }
上一篇下一篇

猜你喜欢

热点阅读